mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Alerting: Move ruleRoutine to be a method on ruleInfo (#83866)
* Move ruleRoutine to ruleInfo file * Move tests as well * swap ruleInfo and scheduler parameters on ruleRoutine * Fix linter complaint, receiver name
This commit is contained in:
parent
c88accdf99
commit
1bb38e8f95
@ -2,8 +2,20 @@ package schedule
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
context "context"
|
context "context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/grafana/grafana/pkg/services/datasources"
|
||||||
|
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
||||||
|
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||||
|
"github.com/grafana/grafana/pkg/services/ngalert/state"
|
||||||
|
"github.com/grafana/grafana/pkg/services/org"
|
||||||
|
"github.com/grafana/grafana/pkg/services/user"
|
||||||
"github.com/grafana/grafana/pkg/util"
|
"github.com/grafana/grafana/pkg/util"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/codes"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
)
|
)
|
||||||
|
|
||||||
type alertRuleInfo struct {
|
type alertRuleInfo struct {
|
||||||
@ -68,3 +80,272 @@ func (a *alertRuleInfo) update(lastVersion ruleVersionAndPauseStatus) bool {
|
|||||||
func (a *alertRuleInfo) stop(reason error) {
|
func (a *alertRuleInfo) stop(reason error) {
|
||||||
a.stopFn(reason)
|
a.stopFn(reason)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//nolint:gocyclo
|
||||||
|
func (a *alertRuleInfo) ruleRoutine(key ngmodels.AlertRuleKey, sch *schedule) error {
|
||||||
|
grafanaCtx := ngmodels.WithRuleKey(a.ctx, key)
|
||||||
|
logger := sch.log.FromContext(grafanaCtx)
|
||||||
|
logger.Debug("Alert rule routine started")
|
||||||
|
|
||||||
|
orgID := fmt.Sprint(key.OrgID)
|
||||||
|
evalTotal := sch.metrics.EvalTotal.WithLabelValues(orgID)
|
||||||
|
evalDuration := sch.metrics.EvalDuration.WithLabelValues(orgID)
|
||||||
|
evalTotalFailures := sch.metrics.EvalFailures.WithLabelValues(orgID)
|
||||||
|
processDuration := sch.metrics.ProcessDuration.WithLabelValues(orgID)
|
||||||
|
sendDuration := sch.metrics.SendDuration.WithLabelValues(orgID)
|
||||||
|
|
||||||
|
notify := func(states []state.StateTransition) {
|
||||||
|
expiredAlerts := state.FromAlertsStateToStoppedAlert(states, sch.appURL, sch.clock)
|
||||||
|
if len(expiredAlerts.PostableAlerts) > 0 {
|
||||||
|
sch.alertsSender.Send(grafanaCtx, key, expiredAlerts)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
resetState := func(ctx context.Context, isPaused bool) {
|
||||||
|
rule := sch.schedulableAlertRules.get(key)
|
||||||
|
reason := ngmodels.StateReasonUpdated
|
||||||
|
if isPaused {
|
||||||
|
reason = ngmodels.StateReasonPaused
|
||||||
|
}
|
||||||
|
states := sch.stateManager.ResetStateByRuleUID(ctx, rule, reason)
|
||||||
|
notify(states)
|
||||||
|
}
|
||||||
|
|
||||||
|
evaluate := func(ctx context.Context, f fingerprint, attempt int64, e *evaluation, span trace.Span, retry bool) error {
|
||||||
|
logger := logger.New("version", e.rule.Version, "fingerprint", f, "attempt", attempt, "now", e.scheduledAt).FromContext(ctx)
|
||||||
|
start := sch.clock.Now()
|
||||||
|
|
||||||
|
evalCtx := eval.NewContextWithPreviousResults(ctx, SchedulerUserFor(e.rule.OrgID), sch.newLoadedMetricsReader(e.rule))
|
||||||
|
if sch.evaluatorFactory == nil {
|
||||||
|
panic("evalfactory nil")
|
||||||
|
}
|
||||||
|
ruleEval, err := sch.evaluatorFactory.Create(evalCtx, e.rule.GetEvalCondition())
|
||||||
|
var results eval.Results
|
||||||
|
var dur time.Duration
|
||||||
|
if err != nil {
|
||||||
|
dur = sch.clock.Now().Sub(start)
|
||||||
|
logger.Error("Failed to build rule evaluator", "error", err)
|
||||||
|
} else {
|
||||||
|
results, err = ruleEval.Evaluate(ctx, e.scheduledAt)
|
||||||
|
dur = sch.clock.Now().Sub(start)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("Failed to evaluate rule", "error", err, "duration", dur)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
evalTotal.Inc()
|
||||||
|
evalDuration.Observe(dur.Seconds())
|
||||||
|
|
||||||
|
if ctx.Err() != nil { // check if the context is not cancelled. The evaluation can be a long-running task.
|
||||||
|
span.SetStatus(codes.Error, "rule evaluation cancelled")
|
||||||
|
logger.Debug("Skip updating the state because the context has been cancelled")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil || results.HasErrors() {
|
||||||
|
evalTotalFailures.Inc()
|
||||||
|
|
||||||
|
// Only retry (return errors) if this isn't the last attempt, otherwise skip these return operations.
|
||||||
|
if retry {
|
||||||
|
// The only thing that can return non-nil `err` from ruleEval.Evaluate is the server side expression pipeline.
|
||||||
|
// This includes transport errors such as transient network errors.
|
||||||
|
if err != nil {
|
||||||
|
span.SetStatus(codes.Error, "rule evaluation failed")
|
||||||
|
span.RecordError(err)
|
||||||
|
return fmt.Errorf("server side expressions pipeline returned an error: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the pipeline executed successfully but have other types of errors that can be retryable, we should do so.
|
||||||
|
if !results.HasNonRetryableErrors() {
|
||||||
|
span.SetStatus(codes.Error, "rule evaluation failed")
|
||||||
|
span.RecordError(err)
|
||||||
|
return fmt.Errorf("the result-set has errors that can be retried: %w", results.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If results is nil, we assume that the error must be from the SSE pipeline (ruleEval.Evaluate) which is the only code that can actually return an `err`.
|
||||||
|
if results == nil {
|
||||||
|
results = append(results, eval.NewResultFromError(err, e.scheduledAt, dur))
|
||||||
|
}
|
||||||
|
|
||||||
|
// If err is nil, we assume that the SSS pipeline succeeded and that the error must be embedded in the results.
|
||||||
|
if err == nil {
|
||||||
|
err = results.Error()
|
||||||
|
}
|
||||||
|
|
||||||
|
span.SetStatus(codes.Error, "rule evaluation failed")
|
||||||
|
span.RecordError(err)
|
||||||
|
} else {
|
||||||
|
logger.Debug("Alert rule evaluated", "results", results, "duration", dur)
|
||||||
|
span.AddEvent("rule evaluated", trace.WithAttributes(
|
||||||
|
attribute.Int64("results", int64(len(results))),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
start = sch.clock.Now()
|
||||||
|
processedStates := sch.stateManager.ProcessEvalResults(
|
||||||
|
ctx,
|
||||||
|
e.scheduledAt,
|
||||||
|
e.rule,
|
||||||
|
results,
|
||||||
|
state.GetRuleExtraLabels(logger, e.rule, e.folderTitle, !sch.disableGrafanaFolder),
|
||||||
|
)
|
||||||
|
processDuration.Observe(sch.clock.Now().Sub(start).Seconds())
|
||||||
|
|
||||||
|
start = sch.clock.Now()
|
||||||
|
alerts := state.FromStateTransitionToPostableAlerts(processedStates, sch.stateManager, sch.appURL)
|
||||||
|
span.AddEvent("results processed", trace.WithAttributes(
|
||||||
|
attribute.Int64("state_transitions", int64(len(processedStates))),
|
||||||
|
attribute.Int64("alerts_to_send", int64(len(alerts.PostableAlerts))),
|
||||||
|
))
|
||||||
|
if len(alerts.PostableAlerts) > 0 {
|
||||||
|
sch.alertsSender.Send(ctx, key, alerts)
|
||||||
|
}
|
||||||
|
sendDuration.Observe(sch.clock.Now().Sub(start).Seconds())
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
evalRunning := false
|
||||||
|
var currentFingerprint fingerprint
|
||||||
|
defer sch.stopApplied(key)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
// used by external services (API) to notify that rule is updated.
|
||||||
|
case ctx := <-a.updateCh:
|
||||||
|
if currentFingerprint == ctx.Fingerprint {
|
||||||
|
logger.Info("Rule's fingerprint has not changed. Skip resetting the state", "currentFingerprint", currentFingerprint)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Info("Clearing the state of the rule because it was updated", "isPaused", ctx.IsPaused, "fingerprint", ctx.Fingerprint)
|
||||||
|
// clear the state. So the next evaluation will start from the scratch.
|
||||||
|
resetState(grafanaCtx, ctx.IsPaused)
|
||||||
|
currentFingerprint = ctx.Fingerprint
|
||||||
|
// evalCh - used by the scheduler to signal that evaluation is needed.
|
||||||
|
case ctx, ok := <-a.evalCh:
|
||||||
|
if !ok {
|
||||||
|
logger.Debug("Evaluation channel has been closed. Exiting")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if evalRunning {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
func() {
|
||||||
|
evalRunning = true
|
||||||
|
defer func() {
|
||||||
|
evalRunning = false
|
||||||
|
sch.evalApplied(key, ctx.scheduledAt)
|
||||||
|
}()
|
||||||
|
|
||||||
|
for attempt := int64(1); attempt <= sch.maxAttempts; attempt++ {
|
||||||
|
isPaused := ctx.rule.IsPaused
|
||||||
|
f := ruleWithFolder{ctx.rule, ctx.folderTitle}.Fingerprint()
|
||||||
|
// Do not clean up state if the eval loop has just started.
|
||||||
|
var needReset bool
|
||||||
|
if currentFingerprint != 0 && currentFingerprint != f {
|
||||||
|
logger.Debug("Got a new version of alert rule. Clear up the state", "fingerprint", f)
|
||||||
|
needReset = true
|
||||||
|
}
|
||||||
|
// We need to reset state if the loop has started and the alert is already paused. It can happen,
|
||||||
|
// if we have an alert with state and we do file provision with stateful Grafana, that state
|
||||||
|
// lingers in DB and won't be cleaned up until next alert rule update.
|
||||||
|
needReset = needReset || (currentFingerprint == 0 && isPaused)
|
||||||
|
if needReset {
|
||||||
|
resetState(grafanaCtx, isPaused)
|
||||||
|
}
|
||||||
|
currentFingerprint = f
|
||||||
|
if isPaused {
|
||||||
|
logger.Debug("Skip rule evaluation because it is paused")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
fpStr := currentFingerprint.String()
|
||||||
|
utcTick := ctx.scheduledAt.UTC().Format(time.RFC3339Nano)
|
||||||
|
tracingCtx, span := sch.tracer.Start(grafanaCtx, "alert rule execution", trace.WithAttributes(
|
||||||
|
attribute.String("rule_uid", ctx.rule.UID),
|
||||||
|
attribute.Int64("org_id", ctx.rule.OrgID),
|
||||||
|
attribute.Int64("rule_version", ctx.rule.Version),
|
||||||
|
attribute.String("rule_fingerprint", fpStr),
|
||||||
|
attribute.String("tick", utcTick),
|
||||||
|
))
|
||||||
|
|
||||||
|
// Check before any execution if the context was cancelled so that we don't do any evaluations.
|
||||||
|
if tracingCtx.Err() != nil {
|
||||||
|
span.SetStatus(codes.Error, "rule evaluation cancelled")
|
||||||
|
span.End()
|
||||||
|
logger.Error("Skip evaluation and updating the state because the context has been cancelled", "version", ctx.rule.Version, "fingerprint", f, "attempt", attempt, "now", ctx.scheduledAt)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
retry := attempt < sch.maxAttempts
|
||||||
|
err := evaluate(tracingCtx, f, attempt, ctx, span, retry)
|
||||||
|
// This is extremely confusing - when we exhaust all retry attempts, or we have no retryable errors
|
||||||
|
// we return nil - so technically, this is meaningless to know whether the evaluation has errors or not.
|
||||||
|
span.End()
|
||||||
|
if err == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Error("Failed to evaluate rule", "version", ctx.rule.Version, "fingerprint", f, "attempt", attempt, "now", ctx.scheduledAt, "error", err)
|
||||||
|
select {
|
||||||
|
case <-tracingCtx.Done():
|
||||||
|
logger.Error("Context has been cancelled while backing off", "version", ctx.rule.Version, "fingerprint", f, "attempt", attempt, "now", ctx.scheduledAt)
|
||||||
|
return
|
||||||
|
case <-time.After(retryDelay):
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
case <-grafanaCtx.Done():
|
||||||
|
// clean up the state only if the reason for stopping the evaluation loop is that the rule was deleted
|
||||||
|
if errors.Is(grafanaCtx.Err(), errRuleDeleted) {
|
||||||
|
// We do not want a context to be unbounded which could potentially cause a go routine running
|
||||||
|
// indefinitely. 1 minute is an almost randomly chosen timeout, big enough to cover the majority of the
|
||||||
|
// cases.
|
||||||
|
ctx, cancelFunc := context.WithTimeout(context.Background(), time.Minute)
|
||||||
|
defer cancelFunc()
|
||||||
|
states := sch.stateManager.DeleteStateByRuleUID(ngmodels.WithRuleKey(ctx, key), key, ngmodels.StateReasonRuleDeleted)
|
||||||
|
notify(states)
|
||||||
|
}
|
||||||
|
logger.Debug("Stopping alert rule routine")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// evalApplied is only used on tests.
|
||||||
|
func (sch *schedule) evalApplied(alertDefKey ngmodels.AlertRuleKey, now time.Time) {
|
||||||
|
if sch.evalAppliedFunc == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
sch.evalAppliedFunc(alertDefKey, now)
|
||||||
|
}
|
||||||
|
|
||||||
|
// stopApplied is only used on tests.
|
||||||
|
func (sch *schedule) stopApplied(alertDefKey ngmodels.AlertRuleKey) {
|
||||||
|
if sch.stopAppliedFunc == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
sch.stopAppliedFunc(alertDefKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
func SchedulerUserFor(orgID int64) *user.SignedInUser {
|
||||||
|
return &user.SignedInUser{
|
||||||
|
UserID: -1,
|
||||||
|
IsServiceAccount: true,
|
||||||
|
Login: "grafana_scheduler",
|
||||||
|
OrgID: orgID,
|
||||||
|
OrgRole: org.RoleAdmin,
|
||||||
|
Permissions: map[int64]map[string][]string{
|
||||||
|
orgID: {
|
||||||
|
datasources.ActionQuery: []string{
|
||||||
|
datasources.ScopeAll,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -1,7 +1,9 @@
|
|||||||
package schedule
|
package schedule
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
context "context"
|
context "context"
|
||||||
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"runtime"
|
"runtime"
|
||||||
@ -9,8 +11,18 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
alertingModels "github.com/grafana/alerting/models"
|
||||||
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||||
|
definitions "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
|
||||||
|
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
||||||
models "github.com/grafana/grafana/pkg/services/ngalert/models"
|
models "github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||||
|
"github.com/grafana/grafana/pkg/services/ngalert/state"
|
||||||
"github.com/grafana/grafana/pkg/util"
|
"github.com/grafana/grafana/pkg/util"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"github.com/prometheus/client_golang/prometheus/testutil"
|
||||||
|
prometheusModel "github.com/prometheus/common/model"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
mock "github.com/stretchr/testify/mock"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -227,3 +239,468 @@ func TestAlertRuleInfo(t *testing.T) {
|
|||||||
wg.Wait()
|
wg.Wait()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRuleRoutine(t *testing.T) {
|
||||||
|
createSchedule := func(
|
||||||
|
evalAppliedChan chan time.Time,
|
||||||
|
senderMock *SyncAlertsSenderMock,
|
||||||
|
) (*schedule, *fakeRulesStore, *state.FakeInstanceStore, prometheus.Gatherer) {
|
||||||
|
ruleStore := newFakeRulesStore()
|
||||||
|
instanceStore := &state.FakeInstanceStore{}
|
||||||
|
|
||||||
|
registry := prometheus.NewPedanticRegistry()
|
||||||
|
sch := setupScheduler(t, ruleStore, instanceStore, registry, senderMock, nil)
|
||||||
|
sch.evalAppliedFunc = func(key models.AlertRuleKey, t time.Time) {
|
||||||
|
evalAppliedChan <- t
|
||||||
|
}
|
||||||
|
return sch, ruleStore, instanceStore, registry
|
||||||
|
}
|
||||||
|
|
||||||
|
// normal states do not include NoData and Error because currently it is not possible to perform any sensible test
|
||||||
|
normalStates := []eval.State{eval.Normal, eval.Alerting, eval.Pending}
|
||||||
|
allStates := [...]eval.State{eval.Normal, eval.Alerting, eval.Pending, eval.NoData, eval.Error}
|
||||||
|
|
||||||
|
for _, evalState := range normalStates {
|
||||||
|
// TODO rewrite when we are able to mock/fake state manager
|
||||||
|
t.Run(fmt.Sprintf("when rule evaluation happens (evaluation state %s)", evalState), func(t *testing.T) {
|
||||||
|
evalAppliedChan := make(chan time.Time)
|
||||||
|
sch, ruleStore, instanceStore, reg := createSchedule(evalAppliedChan, nil)
|
||||||
|
|
||||||
|
rule := models.AlertRuleGen(withQueryForState(t, evalState))()
|
||||||
|
ruleStore.PutRule(context.Background(), rule)
|
||||||
|
folderTitle := ruleStore.getNamespaceTitle(rule.NamespaceUID)
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
t.Cleanup(cancel)
|
||||||
|
ruleInfo := newAlertRuleInfo(ctx)
|
||||||
|
go func() {
|
||||||
|
_ = ruleInfo.ruleRoutine(rule.GetKey(), sch)
|
||||||
|
}()
|
||||||
|
|
||||||
|
expectedTime := time.UnixMicro(rand.Int63())
|
||||||
|
|
||||||
|
ruleInfo.evalCh <- &evaluation{
|
||||||
|
scheduledAt: expectedTime,
|
||||||
|
rule: rule,
|
||||||
|
folderTitle: folderTitle,
|
||||||
|
}
|
||||||
|
|
||||||
|
actualTime := waitForTimeChannel(t, evalAppliedChan)
|
||||||
|
require.Equal(t, expectedTime, actualTime)
|
||||||
|
|
||||||
|
t.Run("it should add extra labels", func(t *testing.T) {
|
||||||
|
states := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
|
||||||
|
for _, s := range states {
|
||||||
|
assert.Equal(t, rule.UID, s.Labels[alertingModels.RuleUIDLabel])
|
||||||
|
assert.Equal(t, rule.NamespaceUID, s.Labels[alertingModels.NamespaceUIDLabel])
|
||||||
|
assert.Equal(t, rule.Title, s.Labels[prometheusModel.AlertNameLabel])
|
||||||
|
assert.Equal(t, folderTitle, s.Labels[models.FolderTitleLabel])
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("it should process evaluation results via state manager", func(t *testing.T) {
|
||||||
|
// TODO rewrite when we are able to mock/fake state manager
|
||||||
|
states := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
|
||||||
|
require.Len(t, states, 1)
|
||||||
|
s := states[0]
|
||||||
|
require.Equal(t, rule.UID, s.AlertRuleUID)
|
||||||
|
require.Len(t, s.Results, 1)
|
||||||
|
var expectedStatus = evalState
|
||||||
|
if evalState == eval.Pending {
|
||||||
|
expectedStatus = eval.Alerting
|
||||||
|
}
|
||||||
|
require.Equal(t, expectedStatus.String(), s.Results[0].EvaluationState.String())
|
||||||
|
require.Equal(t, expectedTime, s.Results[0].EvaluationTime)
|
||||||
|
})
|
||||||
|
t.Run("it should save alert instances to storage", func(t *testing.T) {
|
||||||
|
// TODO rewrite when we are able to mock/fake state manager
|
||||||
|
states := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
|
||||||
|
require.Len(t, states, 1)
|
||||||
|
s := states[0]
|
||||||
|
|
||||||
|
var cmd *models.AlertInstance
|
||||||
|
for _, op := range instanceStore.RecordedOps() {
|
||||||
|
switch q := op.(type) {
|
||||||
|
case models.AlertInstance:
|
||||||
|
cmd = &q
|
||||||
|
}
|
||||||
|
if cmd != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NotNil(t, cmd)
|
||||||
|
t.Logf("Saved alert instances: %v", cmd)
|
||||||
|
require.Equal(t, rule.OrgID, cmd.RuleOrgID)
|
||||||
|
require.Equal(t, expectedTime, cmd.LastEvalTime)
|
||||||
|
require.Equal(t, rule.UID, cmd.RuleUID)
|
||||||
|
require.Equal(t, evalState.String(), string(cmd.CurrentState))
|
||||||
|
require.Equal(t, s.Labels, data.Labels(cmd.Labels))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("it reports metrics", func(t *testing.T) {
|
||||||
|
// duration metric has 0 values because of mocked clock that do not advance
|
||||||
|
expectedMetric := fmt.Sprintf(
|
||||||
|
`# HELP grafana_alerting_rule_evaluation_duration_seconds The time to evaluate a rule.
|
||||||
|
# TYPE grafana_alerting_rule_evaluation_duration_seconds histogram
|
||||||
|
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.01"} 1
|
||||||
|
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.1"} 1
|
||||||
|
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.5"} 1
|
||||||
|
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="1"} 1
|
||||||
|
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="5"} 1
|
||||||
|
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="10"} 1
|
||||||
|
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="15"} 1
|
||||||
|
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="30"} 1
|
||||||
|
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="60"} 1
|
||||||
|
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="120"} 1
|
||||||
|
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="180"} 1
|
||||||
|
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="240"} 1
|
||||||
|
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="300"} 1
|
||||||
|
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="+Inf"} 1
|
||||||
|
grafana_alerting_rule_evaluation_duration_seconds_sum{org="%[1]d"} 0
|
||||||
|
grafana_alerting_rule_evaluation_duration_seconds_count{org="%[1]d"} 1
|
||||||
|
# HELP grafana_alerting_rule_evaluation_failures_total The total number of rule evaluation failures.
|
||||||
|
# TYPE grafana_alerting_rule_evaluation_failures_total counter
|
||||||
|
grafana_alerting_rule_evaluation_failures_total{org="%[1]d"} 0
|
||||||
|
# HELP grafana_alerting_rule_evaluations_total The total number of rule evaluations.
|
||||||
|
# TYPE grafana_alerting_rule_evaluations_total counter
|
||||||
|
grafana_alerting_rule_evaluations_total{org="%[1]d"} 1
|
||||||
|
# HELP grafana_alerting_rule_process_evaluation_duration_seconds The time to process the evaluation results for a rule.
|
||||||
|
# TYPE grafana_alerting_rule_process_evaluation_duration_seconds histogram
|
||||||
|
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="0.01"} 1
|
||||||
|
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="0.1"} 1
|
||||||
|
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="0.5"} 1
|
||||||
|
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="1"} 1
|
||||||
|
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="5"} 1
|
||||||
|
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="10"} 1
|
||||||
|
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="15"} 1
|
||||||
|
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="30"} 1
|
||||||
|
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="60"} 1
|
||||||
|
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="120"} 1
|
||||||
|
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="180"} 1
|
||||||
|
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="240"} 1
|
||||||
|
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="300"} 1
|
||||||
|
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="+Inf"} 1
|
||||||
|
grafana_alerting_rule_process_evaluation_duration_seconds_sum{org="%[1]d"} 0
|
||||||
|
grafana_alerting_rule_process_evaluation_duration_seconds_count{org="%[1]d"} 1
|
||||||
|
# HELP grafana_alerting_rule_send_alerts_duration_seconds The time to send the alerts to Alertmanager.
|
||||||
|
# TYPE grafana_alerting_rule_send_alerts_duration_seconds histogram
|
||||||
|
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="0.01"} 1
|
||||||
|
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="0.1"} 1
|
||||||
|
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="0.5"} 1
|
||||||
|
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="1"} 1
|
||||||
|
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="5"} 1
|
||||||
|
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="10"} 1
|
||||||
|
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="15"} 1
|
||||||
|
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="30"} 1
|
||||||
|
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="60"} 1
|
||||||
|
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="120"} 1
|
||||||
|
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="180"} 1
|
||||||
|
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="240"} 1
|
||||||
|
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="300"} 1
|
||||||
|
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="+Inf"} 1
|
||||||
|
grafana_alerting_rule_send_alerts_duration_seconds_sum{org="%[1]d"} 0
|
||||||
|
grafana_alerting_rule_send_alerts_duration_seconds_count{org="%[1]d"} 1
|
||||||
|
`, rule.OrgID)
|
||||||
|
|
||||||
|
err := testutil.GatherAndCompare(reg, bytes.NewBufferString(expectedMetric), "grafana_alerting_rule_evaluation_duration_seconds", "grafana_alerting_rule_evaluations_total", "grafana_alerting_rule_evaluation_failures_total", "grafana_alerting_rule_process_evaluation_duration_seconds", "grafana_alerting_rule_send_alerts_duration_seconds")
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("should exit", func(t *testing.T) {
|
||||||
|
t.Run("and not clear the state if parent context is cancelled", func(t *testing.T) {
|
||||||
|
stoppedChan := make(chan error)
|
||||||
|
sch, _, _, _ := createSchedule(make(chan time.Time), nil)
|
||||||
|
|
||||||
|
rule := models.AlertRuleGen()()
|
||||||
|
_ = sch.stateManager.ProcessEvalResults(context.Background(), sch.clock.Now(), rule, eval.GenerateResults(rand.Intn(5)+1, eval.ResultGen(eval.WithEvaluatedAt(sch.clock.Now()))), nil)
|
||||||
|
expectedStates := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
|
||||||
|
require.NotEmpty(t, expectedStates)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
ruleInfo := newAlertRuleInfo(ctx)
|
||||||
|
go func() {
|
||||||
|
err := ruleInfo.ruleRoutine(models.AlertRuleKey{}, sch)
|
||||||
|
stoppedChan <- err
|
||||||
|
}()
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
err := waitForErrChannel(t, stoppedChan)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, len(expectedStates), len(sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)))
|
||||||
|
})
|
||||||
|
t.Run("and clean up the state if delete is cancellation reason for inner context", func(t *testing.T) {
|
||||||
|
stoppedChan := make(chan error)
|
||||||
|
sch, _, _, _ := createSchedule(make(chan time.Time), nil)
|
||||||
|
|
||||||
|
rule := models.AlertRuleGen()()
|
||||||
|
_ = sch.stateManager.ProcessEvalResults(context.Background(), sch.clock.Now(), rule, eval.GenerateResults(rand.Intn(5)+1, eval.ResultGen(eval.WithEvaluatedAt(sch.clock.Now()))), nil)
|
||||||
|
require.NotEmpty(t, sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID))
|
||||||
|
|
||||||
|
ruleInfo := newAlertRuleInfo(context.Background())
|
||||||
|
go func() {
|
||||||
|
err := ruleInfo.ruleRoutine(rule.GetKey(), sch)
|
||||||
|
stoppedChan <- err
|
||||||
|
}()
|
||||||
|
|
||||||
|
ruleInfo.stop(errRuleDeleted)
|
||||||
|
err := waitForErrChannel(t, stoppedChan)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Empty(t, sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("when a message is sent to update channel", func(t *testing.T) {
|
||||||
|
rule := models.AlertRuleGen(withQueryForState(t, eval.Normal))()
|
||||||
|
folderTitle := "folderName"
|
||||||
|
ruleFp := ruleWithFolder{rule, folderTitle}.Fingerprint()
|
||||||
|
|
||||||
|
evalAppliedChan := make(chan time.Time)
|
||||||
|
|
||||||
|
sender := NewSyncAlertsSenderMock()
|
||||||
|
sender.EXPECT().Send(mock.Anything, rule.GetKey(), mock.Anything).Return()
|
||||||
|
|
||||||
|
sch, ruleStore, _, _ := createSchedule(evalAppliedChan, sender)
|
||||||
|
ruleStore.PutRule(context.Background(), rule)
|
||||||
|
sch.schedulableAlertRules.set([]*models.AlertRule{rule}, map[models.FolderKey]string{rule.GetFolderKey(): folderTitle})
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
t.Cleanup(cancel)
|
||||||
|
ruleInfo := newAlertRuleInfo(ctx)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
_ = ruleInfo.ruleRoutine(rule.GetKey(), sch)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// init evaluation loop so it got the rule version
|
||||||
|
ruleInfo.evalCh <- &evaluation{
|
||||||
|
scheduledAt: sch.clock.Now(),
|
||||||
|
rule: rule,
|
||||||
|
folderTitle: folderTitle,
|
||||||
|
}
|
||||||
|
|
||||||
|
waitForTimeChannel(t, evalAppliedChan)
|
||||||
|
|
||||||
|
// define some state
|
||||||
|
states := make([]*state.State, 0, len(allStates))
|
||||||
|
for _, s := range allStates {
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
states = append(states, &state.State{
|
||||||
|
AlertRuleUID: rule.UID,
|
||||||
|
CacheID: util.GenerateShortUID(),
|
||||||
|
OrgID: rule.OrgID,
|
||||||
|
State: s,
|
||||||
|
StartsAt: sch.clock.Now(),
|
||||||
|
EndsAt: sch.clock.Now().Add(time.Duration(rand.Intn(25)+5) * time.Second),
|
||||||
|
Labels: rule.Labels,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sch.stateManager.Put(states)
|
||||||
|
|
||||||
|
states = sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
|
||||||
|
expectedToBeSent := 0
|
||||||
|
for _, s := range states {
|
||||||
|
if s.State == eval.Normal || s.State == eval.Pending {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
expectedToBeSent++
|
||||||
|
}
|
||||||
|
require.Greaterf(t, expectedToBeSent, 0, "State manager was expected to return at least one state that can be expired")
|
||||||
|
|
||||||
|
t.Run("should do nothing if version in channel is the same", func(t *testing.T) {
|
||||||
|
ruleInfo.updateCh <- ruleVersionAndPauseStatus{ruleFp, false}
|
||||||
|
ruleInfo.updateCh <- ruleVersionAndPauseStatus{ruleFp, false} // second time just to make sure that previous messages were handled
|
||||||
|
|
||||||
|
actualStates := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
|
||||||
|
require.Len(t, actualStates, len(states))
|
||||||
|
|
||||||
|
sender.AssertNotCalled(t, "Send", mock.Anything, mock.Anything)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("should clear the state and expire firing alerts if version in channel is greater", func(t *testing.T) {
|
||||||
|
ruleInfo.updateCh <- ruleVersionAndPauseStatus{ruleFp + 1, false}
|
||||||
|
|
||||||
|
require.Eventually(t, func() bool {
|
||||||
|
return len(sender.Calls()) > 0
|
||||||
|
}, 5*time.Second, 100*time.Millisecond)
|
||||||
|
|
||||||
|
require.Empty(t, sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID))
|
||||||
|
sender.AssertNumberOfCalls(t, "Send", 1)
|
||||||
|
args, ok := sender.Calls()[0].Arguments[2].(definitions.PostableAlerts)
|
||||||
|
require.Truef(t, ok, fmt.Sprintf("expected argument of function was supposed to be 'definitions.PostableAlerts' but got %T", sender.Calls()[0].Arguments[2]))
|
||||||
|
require.Len(t, args.PostableAlerts, expectedToBeSent)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("when evaluation fails", func(t *testing.T) {
|
||||||
|
rule := models.AlertRuleGen(withQueryForState(t, eval.Error))()
|
||||||
|
rule.ExecErrState = models.ErrorErrState
|
||||||
|
|
||||||
|
evalAppliedChan := make(chan time.Time)
|
||||||
|
|
||||||
|
sender := NewSyncAlertsSenderMock()
|
||||||
|
sender.EXPECT().Send(mock.Anything, rule.GetKey(), mock.Anything).Return()
|
||||||
|
|
||||||
|
sch, ruleStore, _, reg := createSchedule(evalAppliedChan, sender)
|
||||||
|
sch.maxAttempts = 3
|
||||||
|
ruleStore.PutRule(context.Background(), rule)
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
t.Cleanup(cancel)
|
||||||
|
ruleInfo := newAlertRuleInfo(ctx)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
_ = ruleInfo.ruleRoutine(rule.GetKey(), sch)
|
||||||
|
}()
|
||||||
|
|
||||||
|
ruleInfo.evalCh <- &evaluation{
|
||||||
|
scheduledAt: sch.clock.Now(),
|
||||||
|
rule: rule,
|
||||||
|
}
|
||||||
|
|
||||||
|
waitForTimeChannel(t, evalAppliedChan)
|
||||||
|
|
||||||
|
t.Run("it should increase failure counter", func(t *testing.T) {
|
||||||
|
// duration metric has 0 values because of mocked clock that do not advance
|
||||||
|
expectedMetric := fmt.Sprintf(
|
||||||
|
`# HELP grafana_alerting_rule_evaluation_duration_seconds The time to evaluate a rule.
|
||||||
|
# TYPE grafana_alerting_rule_evaluation_duration_seconds histogram
|
||||||
|
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.01"} 3
|
||||||
|
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.1"} 3
|
||||||
|
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.5"} 3
|
||||||
|
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="1"} 3
|
||||||
|
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="5"} 3
|
||||||
|
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="10"} 3
|
||||||
|
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="15"} 3
|
||||||
|
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="30"} 3
|
||||||
|
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="60"} 3
|
||||||
|
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="120"} 3
|
||||||
|
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="180"} 3
|
||||||
|
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="240"} 3
|
||||||
|
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="300"} 3
|
||||||
|
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="+Inf"} 3
|
||||||
|
grafana_alerting_rule_evaluation_duration_seconds_sum{org="%[1]d"} 0
|
||||||
|
grafana_alerting_rule_evaluation_duration_seconds_count{org="%[1]d"} 3
|
||||||
|
# HELP grafana_alerting_rule_evaluation_failures_total The total number of rule evaluation failures.
|
||||||
|
# TYPE grafana_alerting_rule_evaluation_failures_total counter
|
||||||
|
grafana_alerting_rule_evaluation_failures_total{org="%[1]d"} 3
|
||||||
|
# HELP grafana_alerting_rule_evaluations_total The total number of rule evaluations.
|
||||||
|
# TYPE grafana_alerting_rule_evaluations_total counter
|
||||||
|
grafana_alerting_rule_evaluations_total{org="%[1]d"} 3
|
||||||
|
# HELP grafana_alerting_rule_process_evaluation_duration_seconds The time to process the evaluation results for a rule.
|
||||||
|
# TYPE grafana_alerting_rule_process_evaluation_duration_seconds histogram
|
||||||
|
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="0.01"} 1
|
||||||
|
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="0.1"} 1
|
||||||
|
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="0.5"} 1
|
||||||
|
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="1"} 1
|
||||||
|
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="5"} 1
|
||||||
|
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="10"} 1
|
||||||
|
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="15"} 1
|
||||||
|
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="30"} 1
|
||||||
|
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="60"} 1
|
||||||
|
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="120"} 1
|
||||||
|
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="180"} 1
|
||||||
|
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="240"} 1
|
||||||
|
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="300"} 1
|
||||||
|
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="+Inf"} 1
|
||||||
|
grafana_alerting_rule_process_evaluation_duration_seconds_sum{org="%[1]d"} 0
|
||||||
|
grafana_alerting_rule_process_evaluation_duration_seconds_count{org="%[1]d"} 1
|
||||||
|
# HELP grafana_alerting_rule_send_alerts_duration_seconds The time to send the alerts to Alertmanager.
|
||||||
|
# TYPE grafana_alerting_rule_send_alerts_duration_seconds histogram
|
||||||
|
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="0.01"} 1
|
||||||
|
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="0.1"} 1
|
||||||
|
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="0.5"} 1
|
||||||
|
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="1"} 1
|
||||||
|
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="5"} 1
|
||||||
|
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="10"} 1
|
||||||
|
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="15"} 1
|
||||||
|
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="30"} 1
|
||||||
|
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="60"} 1
|
||||||
|
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="120"} 1
|
||||||
|
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="180"} 1
|
||||||
|
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="240"} 1
|
||||||
|
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="300"} 1
|
||||||
|
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="+Inf"} 1
|
||||||
|
grafana_alerting_rule_send_alerts_duration_seconds_sum{org="%[1]d"} 0
|
||||||
|
grafana_alerting_rule_send_alerts_duration_seconds_count{org="%[1]d"} 1
|
||||||
|
`, rule.OrgID)
|
||||||
|
|
||||||
|
err := testutil.GatherAndCompare(reg, bytes.NewBufferString(expectedMetric), "grafana_alerting_rule_evaluation_duration_seconds", "grafana_alerting_rule_evaluations_total", "grafana_alerting_rule_evaluation_failures_total", "grafana_alerting_rule_process_evaluation_duration_seconds", "grafana_alerting_rule_send_alerts_duration_seconds")
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("it should send special alert DatasourceError", func(t *testing.T) {
|
||||||
|
sender.AssertNumberOfCalls(t, "Send", 1)
|
||||||
|
args, ok := sender.Calls()[0].Arguments[2].(definitions.PostableAlerts)
|
||||||
|
require.Truef(t, ok, fmt.Sprintf("expected argument of function was supposed to be 'definitions.PostableAlerts' but got %T", sender.Calls()[0].Arguments[2]))
|
||||||
|
assert.Len(t, args.PostableAlerts, 1)
|
||||||
|
assert.Equal(t, state.ErrorAlertName, args.PostableAlerts[0].Labels[prometheusModel.AlertNameLabel])
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("when there are alerts that should be firing", func(t *testing.T) {
|
||||||
|
t.Run("it should call sender", func(t *testing.T) {
|
||||||
|
// eval.Alerting makes state manager to create notifications for alertmanagers
|
||||||
|
rule := models.AlertRuleGen(withQueryForState(t, eval.Alerting))()
|
||||||
|
|
||||||
|
evalAppliedChan := make(chan time.Time)
|
||||||
|
|
||||||
|
sender := NewSyncAlertsSenderMock()
|
||||||
|
sender.EXPECT().Send(mock.Anything, rule.GetKey(), mock.Anything).Return()
|
||||||
|
|
||||||
|
sch, ruleStore, _, _ := createSchedule(evalAppliedChan, sender)
|
||||||
|
ruleStore.PutRule(context.Background(), rule)
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
t.Cleanup(cancel)
|
||||||
|
ruleInfo := newAlertRuleInfo(ctx)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
_ = ruleInfo.ruleRoutine(rule.GetKey(), sch)
|
||||||
|
}()
|
||||||
|
|
||||||
|
ruleInfo.evalCh <- &evaluation{
|
||||||
|
scheduledAt: sch.clock.Now(),
|
||||||
|
rule: rule,
|
||||||
|
}
|
||||||
|
|
||||||
|
waitForTimeChannel(t, evalAppliedChan)
|
||||||
|
|
||||||
|
sender.AssertNumberOfCalls(t, "Send", 1)
|
||||||
|
args, ok := sender.Calls()[0].Arguments[2].(definitions.PostableAlerts)
|
||||||
|
require.Truef(t, ok, fmt.Sprintf("expected argument of function was supposed to be 'definitions.PostableAlerts' but got %T", sender.Calls()[0].Arguments[2]))
|
||||||
|
|
||||||
|
require.Len(t, args.PostableAlerts, 1)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("when there are no alerts to send it should not call notifiers", func(t *testing.T) {
|
||||||
|
rule := models.AlertRuleGen(withQueryForState(t, eval.Normal))()
|
||||||
|
|
||||||
|
evalAppliedChan := make(chan time.Time)
|
||||||
|
|
||||||
|
sender := NewSyncAlertsSenderMock()
|
||||||
|
sender.EXPECT().Send(mock.Anything, rule.GetKey(), mock.Anything).Return()
|
||||||
|
|
||||||
|
sch, ruleStore, _, _ := createSchedule(evalAppliedChan, sender)
|
||||||
|
ruleStore.PutRule(context.Background(), rule)
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
t.Cleanup(cancel)
|
||||||
|
ruleInfo := newAlertRuleInfo(ctx)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
_ = ruleInfo.ruleRoutine(rule.GetKey(), sch)
|
||||||
|
}()
|
||||||
|
|
||||||
|
ruleInfo.evalCh <- &evaluation{
|
||||||
|
scheduledAt: sch.clock.Now(),
|
||||||
|
rule: rule,
|
||||||
|
}
|
||||||
|
|
||||||
|
waitForTimeChannel(t, evalAppliedChan)
|
||||||
|
|
||||||
|
sender.AssertNotCalled(t, "Send", mock.Anything, mock.Anything)
|
||||||
|
|
||||||
|
require.NotEmpty(t, sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
@ -2,27 +2,20 @@ package schedule
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/url"
|
"net/url"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/benbjohnson/clock"
|
"github.com/benbjohnson/clock"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
|
||||||
"go.opentelemetry.io/otel/codes"
|
|
||||||
"go.opentelemetry.io/otel/trace"
|
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/infra/log"
|
"github.com/grafana/grafana/pkg/infra/log"
|
||||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||||
"github.com/grafana/grafana/pkg/services/datasources"
|
|
||||||
"github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
|
"github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
|
||||||
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
||||||
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
||||||
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
|
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||||
"github.com/grafana/grafana/pkg/services/ngalert/state"
|
"github.com/grafana/grafana/pkg/services/ngalert/state"
|
||||||
"github.com/grafana/grafana/pkg/services/org"
|
|
||||||
"github.com/grafana/grafana/pkg/services/user"
|
|
||||||
"github.com/grafana/grafana/pkg/util/ticker"
|
"github.com/grafana/grafana/pkg/util/ticker"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -256,7 +249,7 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.
|
|||||||
|
|
||||||
if newRoutine && !invalidInterval {
|
if newRoutine && !invalidInterval {
|
||||||
dispatcherGroup.Go(func() error {
|
dispatcherGroup.Go(func() error {
|
||||||
return sch.ruleRoutine(key, ruleInfo)
|
return ruleInfo.ruleRoutine(key, sch)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -343,272 +336,3 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.
|
|||||||
sch.deleteAlertRule(toDelete...)
|
sch.deleteAlertRule(toDelete...)
|
||||||
return readyToRun, registeredDefinitions, updatedRules
|
return readyToRun, registeredDefinitions, updatedRules
|
||||||
}
|
}
|
||||||
|
|
||||||
//nolint:gocyclo
|
|
||||||
func (sch *schedule) ruleRoutine(key ngmodels.AlertRuleKey, ruleInfo *alertRuleInfo) error {
|
|
||||||
grafanaCtx := ngmodels.WithRuleKey(ruleInfo.ctx, key)
|
|
||||||
logger := sch.log.FromContext(grafanaCtx)
|
|
||||||
logger.Debug("Alert rule routine started")
|
|
||||||
|
|
||||||
orgID := fmt.Sprint(key.OrgID)
|
|
||||||
evalTotal := sch.metrics.EvalTotal.WithLabelValues(orgID)
|
|
||||||
evalDuration := sch.metrics.EvalDuration.WithLabelValues(orgID)
|
|
||||||
evalTotalFailures := sch.metrics.EvalFailures.WithLabelValues(orgID)
|
|
||||||
processDuration := sch.metrics.ProcessDuration.WithLabelValues(orgID)
|
|
||||||
sendDuration := sch.metrics.SendDuration.WithLabelValues(orgID)
|
|
||||||
|
|
||||||
notify := func(states []state.StateTransition) {
|
|
||||||
expiredAlerts := state.FromAlertsStateToStoppedAlert(states, sch.appURL, sch.clock)
|
|
||||||
if len(expiredAlerts.PostableAlerts) > 0 {
|
|
||||||
sch.alertsSender.Send(grafanaCtx, key, expiredAlerts)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
resetState := func(ctx context.Context, isPaused bool) {
|
|
||||||
rule := sch.schedulableAlertRules.get(key)
|
|
||||||
reason := ngmodels.StateReasonUpdated
|
|
||||||
if isPaused {
|
|
||||||
reason = ngmodels.StateReasonPaused
|
|
||||||
}
|
|
||||||
states := sch.stateManager.ResetStateByRuleUID(ctx, rule, reason)
|
|
||||||
notify(states)
|
|
||||||
}
|
|
||||||
|
|
||||||
evaluate := func(ctx context.Context, f fingerprint, attempt int64, e *evaluation, span trace.Span, retry bool) error {
|
|
||||||
logger := logger.New("version", e.rule.Version, "fingerprint", f, "attempt", attempt, "now", e.scheduledAt).FromContext(ctx)
|
|
||||||
start := sch.clock.Now()
|
|
||||||
|
|
||||||
evalCtx := eval.NewContextWithPreviousResults(ctx, SchedulerUserFor(e.rule.OrgID), sch.newLoadedMetricsReader(e.rule))
|
|
||||||
if sch.evaluatorFactory == nil {
|
|
||||||
panic("evalfactory nil")
|
|
||||||
}
|
|
||||||
ruleEval, err := sch.evaluatorFactory.Create(evalCtx, e.rule.GetEvalCondition())
|
|
||||||
var results eval.Results
|
|
||||||
var dur time.Duration
|
|
||||||
if err != nil {
|
|
||||||
dur = sch.clock.Now().Sub(start)
|
|
||||||
logger.Error("Failed to build rule evaluator", "error", err)
|
|
||||||
} else {
|
|
||||||
results, err = ruleEval.Evaluate(ctx, e.scheduledAt)
|
|
||||||
dur = sch.clock.Now().Sub(start)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error("Failed to evaluate rule", "error", err, "duration", dur)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
evalTotal.Inc()
|
|
||||||
evalDuration.Observe(dur.Seconds())
|
|
||||||
|
|
||||||
if ctx.Err() != nil { // check if the context is not cancelled. The evaluation can be a long-running task.
|
|
||||||
span.SetStatus(codes.Error, "rule evaluation cancelled")
|
|
||||||
logger.Debug("Skip updating the state because the context has been cancelled")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil || results.HasErrors() {
|
|
||||||
evalTotalFailures.Inc()
|
|
||||||
|
|
||||||
// Only retry (return errors) if this isn't the last attempt, otherwise skip these return operations.
|
|
||||||
if retry {
|
|
||||||
// The only thing that can return non-nil `err` from ruleEval.Evaluate is the server side expression pipeline.
|
|
||||||
// This includes transport errors such as transient network errors.
|
|
||||||
if err != nil {
|
|
||||||
span.SetStatus(codes.Error, "rule evaluation failed")
|
|
||||||
span.RecordError(err)
|
|
||||||
return fmt.Errorf("server side expressions pipeline returned an error: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the pipeline executed successfully but have other types of errors that can be retryable, we should do so.
|
|
||||||
if !results.HasNonRetryableErrors() {
|
|
||||||
span.SetStatus(codes.Error, "rule evaluation failed")
|
|
||||||
span.RecordError(err)
|
|
||||||
return fmt.Errorf("the result-set has errors that can be retried: %w", results.Error())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If results is nil, we assume that the error must be from the SSE pipeline (ruleEval.Evaluate) which is the only code that can actually return an `err`.
|
|
||||||
if results == nil {
|
|
||||||
results = append(results, eval.NewResultFromError(err, e.scheduledAt, dur))
|
|
||||||
}
|
|
||||||
|
|
||||||
// If err is nil, we assume that the SSS pipeline succeeded and that the error must be embedded in the results.
|
|
||||||
if err == nil {
|
|
||||||
err = results.Error()
|
|
||||||
}
|
|
||||||
|
|
||||||
span.SetStatus(codes.Error, "rule evaluation failed")
|
|
||||||
span.RecordError(err)
|
|
||||||
} else {
|
|
||||||
logger.Debug("Alert rule evaluated", "results", results, "duration", dur)
|
|
||||||
span.AddEvent("rule evaluated", trace.WithAttributes(
|
|
||||||
attribute.Int64("results", int64(len(results))),
|
|
||||||
))
|
|
||||||
}
|
|
||||||
start = sch.clock.Now()
|
|
||||||
processedStates := sch.stateManager.ProcessEvalResults(
|
|
||||||
ctx,
|
|
||||||
e.scheduledAt,
|
|
||||||
e.rule,
|
|
||||||
results,
|
|
||||||
state.GetRuleExtraLabels(logger, e.rule, e.folderTitle, !sch.disableGrafanaFolder),
|
|
||||||
)
|
|
||||||
processDuration.Observe(sch.clock.Now().Sub(start).Seconds())
|
|
||||||
|
|
||||||
start = sch.clock.Now()
|
|
||||||
alerts := state.FromStateTransitionToPostableAlerts(processedStates, sch.stateManager, sch.appURL)
|
|
||||||
span.AddEvent("results processed", trace.WithAttributes(
|
|
||||||
attribute.Int64("state_transitions", int64(len(processedStates))),
|
|
||||||
attribute.Int64("alerts_to_send", int64(len(alerts.PostableAlerts))),
|
|
||||||
))
|
|
||||||
if len(alerts.PostableAlerts) > 0 {
|
|
||||||
sch.alertsSender.Send(ctx, key, alerts)
|
|
||||||
}
|
|
||||||
sendDuration.Observe(sch.clock.Now().Sub(start).Seconds())
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
evalRunning := false
|
|
||||||
var currentFingerprint fingerprint
|
|
||||||
defer sch.stopApplied(key)
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
// used by external services (API) to notify that rule is updated.
|
|
||||||
case ctx := <-ruleInfo.updateCh:
|
|
||||||
if currentFingerprint == ctx.Fingerprint {
|
|
||||||
logger.Info("Rule's fingerprint has not changed. Skip resetting the state", "currentFingerprint", currentFingerprint)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Info("Clearing the state of the rule because it was updated", "isPaused", ctx.IsPaused, "fingerprint", ctx.Fingerprint)
|
|
||||||
// clear the state. So the next evaluation will start from the scratch.
|
|
||||||
resetState(grafanaCtx, ctx.IsPaused)
|
|
||||||
currentFingerprint = ctx.Fingerprint
|
|
||||||
// evalCh - used by the scheduler to signal that evaluation is needed.
|
|
||||||
case ctx, ok := <-ruleInfo.evalCh:
|
|
||||||
if !ok {
|
|
||||||
logger.Debug("Evaluation channel has been closed. Exiting")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if evalRunning {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
func() {
|
|
||||||
evalRunning = true
|
|
||||||
defer func() {
|
|
||||||
evalRunning = false
|
|
||||||
sch.evalApplied(key, ctx.scheduledAt)
|
|
||||||
}()
|
|
||||||
|
|
||||||
for attempt := int64(1); attempt <= sch.maxAttempts; attempt++ {
|
|
||||||
isPaused := ctx.rule.IsPaused
|
|
||||||
f := ruleWithFolder{ctx.rule, ctx.folderTitle}.Fingerprint()
|
|
||||||
// Do not clean up state if the eval loop has just started.
|
|
||||||
var needReset bool
|
|
||||||
if currentFingerprint != 0 && currentFingerprint != f {
|
|
||||||
logger.Debug("Got a new version of alert rule. Clear up the state", "fingerprint", f)
|
|
||||||
needReset = true
|
|
||||||
}
|
|
||||||
// We need to reset state if the loop has started and the alert is already paused. It can happen,
|
|
||||||
// if we have an alert with state and we do file provision with stateful Grafana, that state
|
|
||||||
// lingers in DB and won't be cleaned up until next alert rule update.
|
|
||||||
needReset = needReset || (currentFingerprint == 0 && isPaused)
|
|
||||||
if needReset {
|
|
||||||
resetState(grafanaCtx, isPaused)
|
|
||||||
}
|
|
||||||
currentFingerprint = f
|
|
||||||
if isPaused {
|
|
||||||
logger.Debug("Skip rule evaluation because it is paused")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
fpStr := currentFingerprint.String()
|
|
||||||
utcTick := ctx.scheduledAt.UTC().Format(time.RFC3339Nano)
|
|
||||||
tracingCtx, span := sch.tracer.Start(grafanaCtx, "alert rule execution", trace.WithAttributes(
|
|
||||||
attribute.String("rule_uid", ctx.rule.UID),
|
|
||||||
attribute.Int64("org_id", ctx.rule.OrgID),
|
|
||||||
attribute.Int64("rule_version", ctx.rule.Version),
|
|
||||||
attribute.String("rule_fingerprint", fpStr),
|
|
||||||
attribute.String("tick", utcTick),
|
|
||||||
))
|
|
||||||
|
|
||||||
// Check before any execution if the context was cancelled so that we don't do any evaluations.
|
|
||||||
if tracingCtx.Err() != nil {
|
|
||||||
span.SetStatus(codes.Error, "rule evaluation cancelled")
|
|
||||||
span.End()
|
|
||||||
logger.Error("Skip evaluation and updating the state because the context has been cancelled", "version", ctx.rule.Version, "fingerprint", f, "attempt", attempt, "now", ctx.scheduledAt)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
retry := attempt < sch.maxAttempts
|
|
||||||
err := evaluate(tracingCtx, f, attempt, ctx, span, retry)
|
|
||||||
// This is extremely confusing - when we exhaust all retry attempts, or we have no retryable errors
|
|
||||||
// we return nil - so technically, this is meaningless to know whether the evaluation has errors or not.
|
|
||||||
span.End()
|
|
||||||
if err == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Error("Failed to evaluate rule", "version", ctx.rule.Version, "fingerprint", f, "attempt", attempt, "now", ctx.scheduledAt, "error", err)
|
|
||||||
select {
|
|
||||||
case <-tracingCtx.Done():
|
|
||||||
logger.Error("Context has been cancelled while backing off", "version", ctx.rule.Version, "fingerprint", f, "attempt", attempt, "now", ctx.scheduledAt)
|
|
||||||
return
|
|
||||||
case <-time.After(retryDelay):
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
case <-grafanaCtx.Done():
|
|
||||||
// clean up the state only if the reason for stopping the evaluation loop is that the rule was deleted
|
|
||||||
if errors.Is(grafanaCtx.Err(), errRuleDeleted) {
|
|
||||||
// We do not want a context to be unbounded which could potentially cause a go routine running
|
|
||||||
// indefinitely. 1 minute is an almost randomly chosen timeout, big enough to cover the majority of the
|
|
||||||
// cases.
|
|
||||||
ctx, cancelFunc := context.WithTimeout(context.Background(), time.Minute)
|
|
||||||
defer cancelFunc()
|
|
||||||
states := sch.stateManager.DeleteStateByRuleUID(ngmodels.WithRuleKey(ctx, key), key, ngmodels.StateReasonRuleDeleted)
|
|
||||||
notify(states)
|
|
||||||
}
|
|
||||||
logger.Debug("Stopping alert rule routine")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// evalApplied is only used on tests.
|
|
||||||
func (sch *schedule) evalApplied(alertDefKey ngmodels.AlertRuleKey, now time.Time) {
|
|
||||||
if sch.evalAppliedFunc == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
sch.evalAppliedFunc(alertDefKey, now)
|
|
||||||
}
|
|
||||||
|
|
||||||
// stopApplied is only used on tests.
|
|
||||||
func (sch *schedule) stopApplied(alertDefKey ngmodels.AlertRuleKey) {
|
|
||||||
if sch.stopAppliedFunc == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
sch.stopAppliedFunc(alertDefKey)
|
|
||||||
}
|
|
||||||
|
|
||||||
func SchedulerUserFor(orgID int64) *user.SignedInUser {
|
|
||||||
return &user.SignedInUser{
|
|
||||||
UserID: -1,
|
|
||||||
IsServiceAccount: true,
|
|
||||||
Login: "grafana_scheduler",
|
|
||||||
OrgID: orgID,
|
|
||||||
OrgRole: org.RoleAdmin,
|
|
||||||
Permissions: map[int64]map[string][]string{
|
|
||||||
orgID: {
|
|
||||||
datasources.ActionQuery: []string{
|
|
||||||
datasources.ScopeAll,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -11,11 +11,8 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/benbjohnson/clock"
|
"github.com/benbjohnson/clock"
|
||||||
alertingModels "github.com/grafana/alerting/models"
|
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/client_golang/prometheus/testutil"
|
"github.com/prometheus/client_golang/prometheus/testutil"
|
||||||
prometheusModel "github.com/prometheus/common/model"
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/mock"
|
"github.com/stretchr/testify/mock"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
@ -26,14 +23,12 @@ import (
|
|||||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||||
datasources "github.com/grafana/grafana/pkg/services/datasources/fakes"
|
datasources "github.com/grafana/grafana/pkg/services/datasources/fakes"
|
||||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||||
"github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
|
|
||||||
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
||||||
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
||||||
"github.com/grafana/grafana/pkg/services/ngalert/models"
|
"github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||||
"github.com/grafana/grafana/pkg/services/ngalert/state"
|
"github.com/grafana/grafana/pkg/services/ngalert/state"
|
||||||
"github.com/grafana/grafana/pkg/services/pluginsintegration/pluginstore"
|
"github.com/grafana/grafana/pkg/services/pluginsintegration/pluginstore"
|
||||||
"github.com/grafana/grafana/pkg/setting"
|
"github.com/grafana/grafana/pkg/setting"
|
||||||
"github.com/grafana/grafana/pkg/util"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type evalAppliedInfo struct {
|
type evalAppliedInfo struct {
|
||||||
@ -361,471 +356,6 @@ func TestProcessTicks(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSchedule_ruleRoutine(t *testing.T) {
|
|
||||||
createSchedule := func(
|
|
||||||
evalAppliedChan chan time.Time,
|
|
||||||
senderMock *SyncAlertsSenderMock,
|
|
||||||
) (*schedule, *fakeRulesStore, *state.FakeInstanceStore, prometheus.Gatherer) {
|
|
||||||
ruleStore := newFakeRulesStore()
|
|
||||||
instanceStore := &state.FakeInstanceStore{}
|
|
||||||
|
|
||||||
registry := prometheus.NewPedanticRegistry()
|
|
||||||
sch := setupScheduler(t, ruleStore, instanceStore, registry, senderMock, nil)
|
|
||||||
sch.evalAppliedFunc = func(key models.AlertRuleKey, t time.Time) {
|
|
||||||
evalAppliedChan <- t
|
|
||||||
}
|
|
||||||
return sch, ruleStore, instanceStore, registry
|
|
||||||
}
|
|
||||||
|
|
||||||
// normal states do not include NoData and Error because currently it is not possible to perform any sensible test
|
|
||||||
normalStates := []eval.State{eval.Normal, eval.Alerting, eval.Pending}
|
|
||||||
allStates := [...]eval.State{eval.Normal, eval.Alerting, eval.Pending, eval.NoData, eval.Error}
|
|
||||||
|
|
||||||
for _, evalState := range normalStates {
|
|
||||||
// TODO rewrite when we are able to mock/fake state manager
|
|
||||||
t.Run(fmt.Sprintf("when rule evaluation happens (evaluation state %s)", evalState), func(t *testing.T) {
|
|
||||||
evalAppliedChan := make(chan time.Time)
|
|
||||||
sch, ruleStore, instanceStore, reg := createSchedule(evalAppliedChan, nil)
|
|
||||||
|
|
||||||
rule := models.AlertRuleGen(withQueryForState(t, evalState))()
|
|
||||||
ruleStore.PutRule(context.Background(), rule)
|
|
||||||
folderTitle := ruleStore.getNamespaceTitle(rule.NamespaceUID)
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
t.Cleanup(cancel)
|
|
||||||
ruleInfo := newAlertRuleInfo(ctx)
|
|
||||||
go func() {
|
|
||||||
_ = sch.ruleRoutine(rule.GetKey(), ruleInfo)
|
|
||||||
}()
|
|
||||||
|
|
||||||
expectedTime := time.UnixMicro(rand.Int63())
|
|
||||||
|
|
||||||
ruleInfo.evalCh <- &evaluation{
|
|
||||||
scheduledAt: expectedTime,
|
|
||||||
rule: rule,
|
|
||||||
folderTitle: folderTitle,
|
|
||||||
}
|
|
||||||
|
|
||||||
actualTime := waitForTimeChannel(t, evalAppliedChan)
|
|
||||||
require.Equal(t, expectedTime, actualTime)
|
|
||||||
|
|
||||||
t.Run("it should add extra labels", func(t *testing.T) {
|
|
||||||
states := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
|
|
||||||
for _, s := range states {
|
|
||||||
assert.Equal(t, rule.UID, s.Labels[alertingModels.RuleUIDLabel])
|
|
||||||
assert.Equal(t, rule.NamespaceUID, s.Labels[alertingModels.NamespaceUIDLabel])
|
|
||||||
assert.Equal(t, rule.Title, s.Labels[prometheusModel.AlertNameLabel])
|
|
||||||
assert.Equal(t, folderTitle, s.Labels[models.FolderTitleLabel])
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("it should process evaluation results via state manager", func(t *testing.T) {
|
|
||||||
// TODO rewrite when we are able to mock/fake state manager
|
|
||||||
states := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
|
|
||||||
require.Len(t, states, 1)
|
|
||||||
s := states[0]
|
|
||||||
require.Equal(t, rule.UID, s.AlertRuleUID)
|
|
||||||
require.Len(t, s.Results, 1)
|
|
||||||
var expectedStatus = evalState
|
|
||||||
if evalState == eval.Pending {
|
|
||||||
expectedStatus = eval.Alerting
|
|
||||||
}
|
|
||||||
require.Equal(t, expectedStatus.String(), s.Results[0].EvaluationState.String())
|
|
||||||
require.Equal(t, expectedTime, s.Results[0].EvaluationTime)
|
|
||||||
})
|
|
||||||
t.Run("it should save alert instances to storage", func(t *testing.T) {
|
|
||||||
// TODO rewrite when we are able to mock/fake state manager
|
|
||||||
states := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
|
|
||||||
require.Len(t, states, 1)
|
|
||||||
s := states[0]
|
|
||||||
|
|
||||||
var cmd *models.AlertInstance
|
|
||||||
for _, op := range instanceStore.RecordedOps() {
|
|
||||||
switch q := op.(type) {
|
|
||||||
case models.AlertInstance:
|
|
||||||
cmd = &q
|
|
||||||
}
|
|
||||||
if cmd != nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
require.NotNil(t, cmd)
|
|
||||||
t.Logf("Saved alert instances: %v", cmd)
|
|
||||||
require.Equal(t, rule.OrgID, cmd.RuleOrgID)
|
|
||||||
require.Equal(t, expectedTime, cmd.LastEvalTime)
|
|
||||||
require.Equal(t, rule.UID, cmd.RuleUID)
|
|
||||||
require.Equal(t, evalState.String(), string(cmd.CurrentState))
|
|
||||||
require.Equal(t, s.Labels, data.Labels(cmd.Labels))
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("it reports metrics", func(t *testing.T) {
|
|
||||||
// duration metric has 0 values because of mocked clock that do not advance
|
|
||||||
expectedMetric := fmt.Sprintf(
|
|
||||||
`# HELP grafana_alerting_rule_evaluation_duration_seconds The time to evaluate a rule.
|
|
||||||
# TYPE grafana_alerting_rule_evaluation_duration_seconds histogram
|
|
||||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.01"} 1
|
|
||||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.1"} 1
|
|
||||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.5"} 1
|
|
||||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="1"} 1
|
|
||||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="5"} 1
|
|
||||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="10"} 1
|
|
||||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="15"} 1
|
|
||||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="30"} 1
|
|
||||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="60"} 1
|
|
||||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="120"} 1
|
|
||||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="180"} 1
|
|
||||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="240"} 1
|
|
||||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="300"} 1
|
|
||||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="+Inf"} 1
|
|
||||||
grafana_alerting_rule_evaluation_duration_seconds_sum{org="%[1]d"} 0
|
|
||||||
grafana_alerting_rule_evaluation_duration_seconds_count{org="%[1]d"} 1
|
|
||||||
# HELP grafana_alerting_rule_evaluation_failures_total The total number of rule evaluation failures.
|
|
||||||
# TYPE grafana_alerting_rule_evaluation_failures_total counter
|
|
||||||
grafana_alerting_rule_evaluation_failures_total{org="%[1]d"} 0
|
|
||||||
# HELP grafana_alerting_rule_evaluations_total The total number of rule evaluations.
|
|
||||||
# TYPE grafana_alerting_rule_evaluations_total counter
|
|
||||||
grafana_alerting_rule_evaluations_total{org="%[1]d"} 1
|
|
||||||
# HELP grafana_alerting_rule_process_evaluation_duration_seconds The time to process the evaluation results for a rule.
|
|
||||||
# TYPE grafana_alerting_rule_process_evaluation_duration_seconds histogram
|
|
||||||
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="0.01"} 1
|
|
||||||
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="0.1"} 1
|
|
||||||
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="0.5"} 1
|
|
||||||
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="1"} 1
|
|
||||||
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="5"} 1
|
|
||||||
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="10"} 1
|
|
||||||
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="15"} 1
|
|
||||||
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="30"} 1
|
|
||||||
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="60"} 1
|
|
||||||
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="120"} 1
|
|
||||||
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="180"} 1
|
|
||||||
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="240"} 1
|
|
||||||
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="300"} 1
|
|
||||||
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="+Inf"} 1
|
|
||||||
grafana_alerting_rule_process_evaluation_duration_seconds_sum{org="%[1]d"} 0
|
|
||||||
grafana_alerting_rule_process_evaluation_duration_seconds_count{org="%[1]d"} 1
|
|
||||||
# HELP grafana_alerting_rule_send_alerts_duration_seconds The time to send the alerts to Alertmanager.
|
|
||||||
# TYPE grafana_alerting_rule_send_alerts_duration_seconds histogram
|
|
||||||
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="0.01"} 1
|
|
||||||
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="0.1"} 1
|
|
||||||
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="0.5"} 1
|
|
||||||
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="1"} 1
|
|
||||||
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="5"} 1
|
|
||||||
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="10"} 1
|
|
||||||
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="15"} 1
|
|
||||||
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="30"} 1
|
|
||||||
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="60"} 1
|
|
||||||
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="120"} 1
|
|
||||||
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="180"} 1
|
|
||||||
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="240"} 1
|
|
||||||
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="300"} 1
|
|
||||||
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="+Inf"} 1
|
|
||||||
grafana_alerting_rule_send_alerts_duration_seconds_sum{org="%[1]d"} 0
|
|
||||||
grafana_alerting_rule_send_alerts_duration_seconds_count{org="%[1]d"} 1
|
|
||||||
`, rule.OrgID)
|
|
||||||
|
|
||||||
err := testutil.GatherAndCompare(reg, bytes.NewBufferString(expectedMetric), "grafana_alerting_rule_evaluation_duration_seconds", "grafana_alerting_rule_evaluations_total", "grafana_alerting_rule_evaluation_failures_total", "grafana_alerting_rule_process_evaluation_duration_seconds", "grafana_alerting_rule_send_alerts_duration_seconds")
|
|
||||||
require.NoError(t, err)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Run("should exit", func(t *testing.T) {
|
|
||||||
t.Run("and not clear the state if parent context is cancelled", func(t *testing.T) {
|
|
||||||
stoppedChan := make(chan error)
|
|
||||||
sch, _, _, _ := createSchedule(make(chan time.Time), nil)
|
|
||||||
|
|
||||||
rule := models.AlertRuleGen()()
|
|
||||||
_ = sch.stateManager.ProcessEvalResults(context.Background(), sch.clock.Now(), rule, eval.GenerateResults(rand.Intn(5)+1, eval.ResultGen(eval.WithEvaluatedAt(sch.clock.Now()))), nil)
|
|
||||||
expectedStates := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
|
|
||||||
require.NotEmpty(t, expectedStates)
|
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
ruleInfo := newAlertRuleInfo(ctx)
|
|
||||||
go func() {
|
|
||||||
err := sch.ruleRoutine(models.AlertRuleKey{}, ruleInfo)
|
|
||||||
stoppedChan <- err
|
|
||||||
}()
|
|
||||||
|
|
||||||
cancel()
|
|
||||||
err := waitForErrChannel(t, stoppedChan)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, len(expectedStates), len(sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)))
|
|
||||||
})
|
|
||||||
t.Run("and clean up the state if delete is cancellation reason for inner context", func(t *testing.T) {
|
|
||||||
stoppedChan := make(chan error)
|
|
||||||
sch, _, _, _ := createSchedule(make(chan time.Time), nil)
|
|
||||||
|
|
||||||
rule := models.AlertRuleGen()()
|
|
||||||
_ = sch.stateManager.ProcessEvalResults(context.Background(), sch.clock.Now(), rule, eval.GenerateResults(rand.Intn(5)+1, eval.ResultGen(eval.WithEvaluatedAt(sch.clock.Now()))), nil)
|
|
||||||
require.NotEmpty(t, sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID))
|
|
||||||
|
|
||||||
ruleInfo := newAlertRuleInfo(context.Background())
|
|
||||||
go func() {
|
|
||||||
err := sch.ruleRoutine(rule.GetKey(), ruleInfo)
|
|
||||||
stoppedChan <- err
|
|
||||||
}()
|
|
||||||
|
|
||||||
ruleInfo.stop(errRuleDeleted)
|
|
||||||
err := waitForErrChannel(t, stoppedChan)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
require.Empty(t, sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID))
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("when a message is sent to update channel", func(t *testing.T) {
|
|
||||||
rule := models.AlertRuleGen(withQueryForState(t, eval.Normal))()
|
|
||||||
folderTitle := "folderName"
|
|
||||||
ruleFp := ruleWithFolder{rule, folderTitle}.Fingerprint()
|
|
||||||
|
|
||||||
evalAppliedChan := make(chan time.Time)
|
|
||||||
|
|
||||||
sender := NewSyncAlertsSenderMock()
|
|
||||||
sender.EXPECT().Send(mock.Anything, rule.GetKey(), mock.Anything).Return()
|
|
||||||
|
|
||||||
sch, ruleStore, _, _ := createSchedule(evalAppliedChan, sender)
|
|
||||||
ruleStore.PutRule(context.Background(), rule)
|
|
||||||
sch.schedulableAlertRules.set([]*models.AlertRule{rule}, map[models.FolderKey]string{rule.GetFolderKey(): folderTitle})
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
t.Cleanup(cancel)
|
|
||||||
ruleInfo := newAlertRuleInfo(ctx)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
_ = sch.ruleRoutine(rule.GetKey(), ruleInfo)
|
|
||||||
}()
|
|
||||||
|
|
||||||
// init evaluation loop so it got the rule version
|
|
||||||
ruleInfo.evalCh <- &evaluation{
|
|
||||||
scheduledAt: sch.clock.Now(),
|
|
||||||
rule: rule,
|
|
||||||
folderTitle: folderTitle,
|
|
||||||
}
|
|
||||||
|
|
||||||
waitForTimeChannel(t, evalAppliedChan)
|
|
||||||
|
|
||||||
// define some state
|
|
||||||
states := make([]*state.State, 0, len(allStates))
|
|
||||||
for _, s := range allStates {
|
|
||||||
for i := 0; i < 2; i++ {
|
|
||||||
states = append(states, &state.State{
|
|
||||||
AlertRuleUID: rule.UID,
|
|
||||||
CacheID: util.GenerateShortUID(),
|
|
||||||
OrgID: rule.OrgID,
|
|
||||||
State: s,
|
|
||||||
StartsAt: sch.clock.Now(),
|
|
||||||
EndsAt: sch.clock.Now().Add(time.Duration(rand.Intn(25)+5) * time.Second),
|
|
||||||
Labels: rule.Labels,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
sch.stateManager.Put(states)
|
|
||||||
|
|
||||||
states = sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
|
|
||||||
expectedToBeSent := 0
|
|
||||||
for _, s := range states {
|
|
||||||
if s.State == eval.Normal || s.State == eval.Pending {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
expectedToBeSent++
|
|
||||||
}
|
|
||||||
require.Greaterf(t, expectedToBeSent, 0, "State manager was expected to return at least one state that can be expired")
|
|
||||||
|
|
||||||
t.Run("should do nothing if version in channel is the same", func(t *testing.T) {
|
|
||||||
ruleInfo.updateCh <- ruleVersionAndPauseStatus{ruleFp, false}
|
|
||||||
ruleInfo.updateCh <- ruleVersionAndPauseStatus{ruleFp, false} // second time just to make sure that previous messages were handled
|
|
||||||
|
|
||||||
actualStates := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
|
|
||||||
require.Len(t, actualStates, len(states))
|
|
||||||
|
|
||||||
sender.AssertNotCalled(t, "Send", mock.Anything, mock.Anything)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("should clear the state and expire firing alerts if version in channel is greater", func(t *testing.T) {
|
|
||||||
ruleInfo.updateCh <- ruleVersionAndPauseStatus{ruleFp + 1, false}
|
|
||||||
|
|
||||||
require.Eventually(t, func() bool {
|
|
||||||
return len(sender.Calls()) > 0
|
|
||||||
}, 5*time.Second, 100*time.Millisecond)
|
|
||||||
|
|
||||||
require.Empty(t, sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID))
|
|
||||||
sender.AssertNumberOfCalls(t, "Send", 1)
|
|
||||||
args, ok := sender.Calls()[0].Arguments[2].(definitions.PostableAlerts)
|
|
||||||
require.Truef(t, ok, fmt.Sprintf("expected argument of function was supposed to be 'definitions.PostableAlerts' but got %T", sender.Calls()[0].Arguments[2]))
|
|
||||||
require.Len(t, args.PostableAlerts, expectedToBeSent)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("when evaluation fails", func(t *testing.T) {
|
|
||||||
rule := models.AlertRuleGen(withQueryForState(t, eval.Error))()
|
|
||||||
rule.ExecErrState = models.ErrorErrState
|
|
||||||
|
|
||||||
evalAppliedChan := make(chan time.Time)
|
|
||||||
|
|
||||||
sender := NewSyncAlertsSenderMock()
|
|
||||||
sender.EXPECT().Send(mock.Anything, rule.GetKey(), mock.Anything).Return()
|
|
||||||
|
|
||||||
sch, ruleStore, _, reg := createSchedule(evalAppliedChan, sender)
|
|
||||||
sch.maxAttempts = 3
|
|
||||||
ruleStore.PutRule(context.Background(), rule)
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
t.Cleanup(cancel)
|
|
||||||
ruleInfo := newAlertRuleInfo(ctx)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
_ = sch.ruleRoutine(rule.GetKey(), ruleInfo)
|
|
||||||
}()
|
|
||||||
|
|
||||||
ruleInfo.evalCh <- &evaluation{
|
|
||||||
scheduledAt: sch.clock.Now(),
|
|
||||||
rule: rule,
|
|
||||||
}
|
|
||||||
|
|
||||||
waitForTimeChannel(t, evalAppliedChan)
|
|
||||||
|
|
||||||
t.Run("it should increase failure counter", func(t *testing.T) {
|
|
||||||
// duration metric has 0 values because of mocked clock that do not advance
|
|
||||||
expectedMetric := fmt.Sprintf(
|
|
||||||
`# HELP grafana_alerting_rule_evaluation_duration_seconds The time to evaluate a rule.
|
|
||||||
# TYPE grafana_alerting_rule_evaluation_duration_seconds histogram
|
|
||||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.01"} 3
|
|
||||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.1"} 3
|
|
||||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.5"} 3
|
|
||||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="1"} 3
|
|
||||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="5"} 3
|
|
||||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="10"} 3
|
|
||||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="15"} 3
|
|
||||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="30"} 3
|
|
||||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="60"} 3
|
|
||||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="120"} 3
|
|
||||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="180"} 3
|
|
||||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="240"} 3
|
|
||||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="300"} 3
|
|
||||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="+Inf"} 3
|
|
||||||
grafana_alerting_rule_evaluation_duration_seconds_sum{org="%[1]d"} 0
|
|
||||||
grafana_alerting_rule_evaluation_duration_seconds_count{org="%[1]d"} 3
|
|
||||||
# HELP grafana_alerting_rule_evaluation_failures_total The total number of rule evaluation failures.
|
|
||||||
# TYPE grafana_alerting_rule_evaluation_failures_total counter
|
|
||||||
grafana_alerting_rule_evaluation_failures_total{org="%[1]d"} 3
|
|
||||||
# HELP grafana_alerting_rule_evaluations_total The total number of rule evaluations.
|
|
||||||
# TYPE grafana_alerting_rule_evaluations_total counter
|
|
||||||
grafana_alerting_rule_evaluations_total{org="%[1]d"} 3
|
|
||||||
# HELP grafana_alerting_rule_process_evaluation_duration_seconds The time to process the evaluation results for a rule.
|
|
||||||
# TYPE grafana_alerting_rule_process_evaluation_duration_seconds histogram
|
|
||||||
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="0.01"} 1
|
|
||||||
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="0.1"} 1
|
|
||||||
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="0.5"} 1
|
|
||||||
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="1"} 1
|
|
||||||
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="5"} 1
|
|
||||||
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="10"} 1
|
|
||||||
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="15"} 1
|
|
||||||
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="30"} 1
|
|
||||||
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="60"} 1
|
|
||||||
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="120"} 1
|
|
||||||
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="180"} 1
|
|
||||||
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="240"} 1
|
|
||||||
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="300"} 1
|
|
||||||
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="+Inf"} 1
|
|
||||||
grafana_alerting_rule_process_evaluation_duration_seconds_sum{org="%[1]d"} 0
|
|
||||||
grafana_alerting_rule_process_evaluation_duration_seconds_count{org="%[1]d"} 1
|
|
||||||
# HELP grafana_alerting_rule_send_alerts_duration_seconds The time to send the alerts to Alertmanager.
|
|
||||||
# TYPE grafana_alerting_rule_send_alerts_duration_seconds histogram
|
|
||||||
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="0.01"} 1
|
|
||||||
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="0.1"} 1
|
|
||||||
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="0.5"} 1
|
|
||||||
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="1"} 1
|
|
||||||
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="5"} 1
|
|
||||||
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="10"} 1
|
|
||||||
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="15"} 1
|
|
||||||
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="30"} 1
|
|
||||||
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="60"} 1
|
|
||||||
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="120"} 1
|
|
||||||
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="180"} 1
|
|
||||||
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="240"} 1
|
|
||||||
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="300"} 1
|
|
||||||
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="+Inf"} 1
|
|
||||||
grafana_alerting_rule_send_alerts_duration_seconds_sum{org="%[1]d"} 0
|
|
||||||
grafana_alerting_rule_send_alerts_duration_seconds_count{org="%[1]d"} 1
|
|
||||||
`, rule.OrgID)
|
|
||||||
|
|
||||||
err := testutil.GatherAndCompare(reg, bytes.NewBufferString(expectedMetric), "grafana_alerting_rule_evaluation_duration_seconds", "grafana_alerting_rule_evaluations_total", "grafana_alerting_rule_evaluation_failures_total", "grafana_alerting_rule_process_evaluation_duration_seconds", "grafana_alerting_rule_send_alerts_duration_seconds")
|
|
||||||
require.NoError(t, err)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("it should send special alert DatasourceError", func(t *testing.T) {
|
|
||||||
sender.AssertNumberOfCalls(t, "Send", 1)
|
|
||||||
args, ok := sender.Calls()[0].Arguments[2].(definitions.PostableAlerts)
|
|
||||||
require.Truef(t, ok, fmt.Sprintf("expected argument of function was supposed to be 'definitions.PostableAlerts' but got %T", sender.Calls()[0].Arguments[2]))
|
|
||||||
assert.Len(t, args.PostableAlerts, 1)
|
|
||||||
assert.Equal(t, state.ErrorAlertName, args.PostableAlerts[0].Labels[prometheusModel.AlertNameLabel])
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("when there are alerts that should be firing", func(t *testing.T) {
|
|
||||||
t.Run("it should call sender", func(t *testing.T) {
|
|
||||||
// eval.Alerting makes state manager to create notifications for alertmanagers
|
|
||||||
rule := models.AlertRuleGen(withQueryForState(t, eval.Alerting))()
|
|
||||||
|
|
||||||
evalAppliedChan := make(chan time.Time)
|
|
||||||
|
|
||||||
sender := NewSyncAlertsSenderMock()
|
|
||||||
sender.EXPECT().Send(mock.Anything, rule.GetKey(), mock.Anything).Return()
|
|
||||||
|
|
||||||
sch, ruleStore, _, _ := createSchedule(evalAppliedChan, sender)
|
|
||||||
ruleStore.PutRule(context.Background(), rule)
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
t.Cleanup(cancel)
|
|
||||||
ruleInfo := newAlertRuleInfo(ctx)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
_ = sch.ruleRoutine(rule.GetKey(), ruleInfo)
|
|
||||||
}()
|
|
||||||
|
|
||||||
ruleInfo.evalCh <- &evaluation{
|
|
||||||
scheduledAt: sch.clock.Now(),
|
|
||||||
rule: rule,
|
|
||||||
}
|
|
||||||
|
|
||||||
waitForTimeChannel(t, evalAppliedChan)
|
|
||||||
|
|
||||||
sender.AssertNumberOfCalls(t, "Send", 1)
|
|
||||||
args, ok := sender.Calls()[0].Arguments[2].(definitions.PostableAlerts)
|
|
||||||
require.Truef(t, ok, fmt.Sprintf("expected argument of function was supposed to be 'definitions.PostableAlerts' but got %T", sender.Calls()[0].Arguments[2]))
|
|
||||||
|
|
||||||
require.Len(t, args.PostableAlerts, 1)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("when there are no alerts to send it should not call notifiers", func(t *testing.T) {
|
|
||||||
rule := models.AlertRuleGen(withQueryForState(t, eval.Normal))()
|
|
||||||
|
|
||||||
evalAppliedChan := make(chan time.Time)
|
|
||||||
|
|
||||||
sender := NewSyncAlertsSenderMock()
|
|
||||||
sender.EXPECT().Send(mock.Anything, rule.GetKey(), mock.Anything).Return()
|
|
||||||
|
|
||||||
sch, ruleStore, _, _ := createSchedule(evalAppliedChan, sender)
|
|
||||||
ruleStore.PutRule(context.Background(), rule)
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
t.Cleanup(cancel)
|
|
||||||
ruleInfo := newAlertRuleInfo(ctx)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
_ = sch.ruleRoutine(rule.GetKey(), ruleInfo)
|
|
||||||
}()
|
|
||||||
|
|
||||||
ruleInfo.evalCh <- &evaluation{
|
|
||||||
scheduledAt: sch.clock.Now(),
|
|
||||||
rule: rule,
|
|
||||||
}
|
|
||||||
|
|
||||||
waitForTimeChannel(t, evalAppliedChan)
|
|
||||||
|
|
||||||
sender.AssertNotCalled(t, "Send", mock.Anything, mock.Anything)
|
|
||||||
|
|
||||||
require.NotEmpty(t, sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID))
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSchedule_deleteAlertRule(t *testing.T) {
|
func TestSchedule_deleteAlertRule(t *testing.T) {
|
||||||
t.Run("when rule exists", func(t *testing.T) {
|
t.Run("when rule exists", func(t *testing.T) {
|
||||||
t.Run("it should stop evaluation loop and remove the controller from registry", func(t *testing.T) {
|
t.Run("it should stop evaluation loop and remove the controller from registry", func(t *testing.T) {
|
||||||
|
Loading…
Reference in New Issue
Block a user