Alerting: Update state manager to take image only once per rule evaluation (#98289)

* add test

* update state manager to take image only once per rule evaluation process execution

* update test
This commit is contained in:
Yuri Tseretyan 2025-01-09 12:57:58 -05:00 committed by GitHub
parent ee7ffb7a04
commit 4f62c8a160
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 228 additions and 37 deletions

View File

@ -325,10 +325,35 @@ func (st *Manager) ProcessEvalResults(
defer span.End()
logger := st.log.FromContext(ctx)
logger.Debug("State manager processing evaluation results", "resultCount", len(results))
states := st.setNextStateForRule(ctx, alertRule, results, extraLabels, logger)
staleStates := st.deleteStaleStatesFromCache(ctx, logger, evaluatedAt, alertRule)
// lazy evaluation of takeImage only once and only if it is requested.
var fn func() *ngModels.Image
{
var image *ngModels.Image
var imageTaken bool
fn = func() *ngModels.Image {
if imageTaken {
return image
}
logger.Debug("Taking image", "dashboard", alertRule.GetDashboardUID(), "panel", alertRule.GetPanelID())
img, err := takeImage(ctx, st.images, alertRule)
imageTaken = true
if err != nil {
logger.Warn("Failed to take an image",
"dashboard", alertRule.GetDashboardUID(),
"panel", alertRule.GetPanelID(),
"error", err)
return nil
}
image = img
return image
}
}
logger.Debug("State manager processing evaluation results", "resultCount", len(results))
states := st.setNextStateForRule(ctx, alertRule, results, extraLabels, logger, fn)
staleStates := st.deleteStaleStatesFromCache(logger, evaluatedAt, alertRule, fn)
span.AddEvent("results processed", trace.WithAttributes(
attribute.Int64("state_transitions", int64(len(states))),
attribute.Int64("stale_states", int64(len(staleStates))),
@ -370,7 +395,7 @@ func (st *Manager) updateLastSentAt(states StateTransitions, evaluatedAt time.Ti
return result
}
func (st *Manager) setNextStateForRule(ctx context.Context, alertRule *ngModels.AlertRule, results eval.Results, extraLabels data.Labels, logger log.Logger) []StateTransition {
func (st *Manager) setNextStateForRule(ctx context.Context, alertRule *ngModels.AlertRule, results eval.Results, extraLabels data.Labels, logger log.Logger, takeImageFn func() *ngModels.Image) []StateTransition {
if st.applyNoDataAndErrorToAllStates && results.IsNoData() && (alertRule.NoDataState == ngModels.Alerting || alertRule.NoDataState == ngModels.OK || alertRule.NoDataState == ngModels.KeepLast) { // If it is no data, check the mapping and switch all results to the new state
// aggregate UID of datasources that returned NoData into one and provide as auxiliary info via annotationa. See: https://github.com/grafana/grafana/issues/88184
var refIds strings.Builder
@ -398,14 +423,14 @@ func (st *Manager) setNextStateForRule(ctx context.Context, alertRule *ngModels.
"datasource_uid": datasourceUIDs.String(),
"ref_id": refIds.String(),
}
transitions := st.setNextStateForAll(ctx, alertRule, results[0], logger, annotations)
transitions := st.setNextStateForAll(alertRule, results[0], logger, annotations, takeImageFn)
if len(transitions) > 0 {
return transitions // if there are no current states for the rule. Create ones for each result
}
}
if st.applyNoDataAndErrorToAllStates && results.IsError() && (alertRule.ExecErrState == ngModels.AlertingErrState || alertRule.ExecErrState == ngModels.OkErrState || alertRule.ExecErrState == ngModels.KeepLastErrState) {
// TODO squash all errors into one, and provide as annotation
transitions := st.setNextStateForAll(ctx, alertRule, results[0], logger, nil)
transitions := st.setNextStateForAll(alertRule, results[0], logger, nil, takeImageFn)
if len(transitions) > 0 {
return transitions // if there are no current states for the rule. Create ones for each result
}
@ -413,14 +438,14 @@ func (st *Manager) setNextStateForRule(ctx context.Context, alertRule *ngModels.
transitions := make([]StateTransition, 0, len(results))
for _, result := range results {
currentState := st.cache.create(ctx, logger, alertRule, result, extraLabels, st.externalURL)
s := st.setNextState(ctx, alertRule, currentState, result, nil, logger)
s := st.setNextState(alertRule, currentState, result, nil, logger, takeImageFn)
st.cache.set(currentState) // replace the existing state with the new one
transitions = append(transitions, s)
}
return transitions
}
func (st *Manager) setNextStateForAll(ctx context.Context, alertRule *ngModels.AlertRule, result eval.Result, logger log.Logger, extraAnnotations data.Labels) []StateTransition {
func (st *Manager) setNextStateForAll(alertRule *ngModels.AlertRule, result eval.Result, logger log.Logger, extraAnnotations data.Labels, takeImageFn func() *ngModels.Image) []StateTransition {
currentStates := st.cache.getStatesForRuleUID(alertRule.OrgID, alertRule.UID, false)
transitions := make([]StateTransition, 0, len(currentStates))
updated := ruleStates{
@ -428,7 +453,7 @@ func (st *Manager) setNextStateForAll(ctx context.Context, alertRule *ngModels.A
}
for _, currentState := range currentStates {
newState := currentState.Copy()
t := st.setNextState(ctx, alertRule, newState, result, extraAnnotations, logger)
t := st.setNextState(alertRule, newState, result, extraAnnotations, logger, takeImageFn)
updated.states[newState.CacheID] = newState
transitions = append(transitions, t)
}
@ -437,7 +462,7 @@ func (st *Manager) setNextStateForAll(ctx context.Context, alertRule *ngModels.A
}
// Set the current state based on evaluation results
func (st *Manager) setNextState(ctx context.Context, alertRule *ngModels.AlertRule, currentState *State, result eval.Result, extraAnnotations data.Labels, logger log.Logger) StateTransition {
func (st *Manager) setNextState(alertRule *ngModels.AlertRule, currentState *State, result eval.Result, extraAnnotations data.Labels, logger log.Logger, takeImageFn func() *ngModels.Image) StateTransition {
start := st.clock.Now()
currentState.LastEvaluationTime = result.EvaluatedAt
@ -514,13 +539,8 @@ func (st *Manager) setNextState(ctx context.Context, alertRule *ngModels.AlertRu
}
if shouldTakeImage(currentState.State, oldState, currentState.Image, newlyResolved) {
image, err := takeImage(ctx, st.images, alertRule)
if err != nil {
logger.Warn("Failed to take an image",
"dashboard", alertRule.GetDashboardUID(),
"panel", alertRule.GetPanelID(),
"error", err)
} else if image != nil {
image := takeImageFn()
if image != nil {
currentState.Image = image
}
}
@ -586,7 +606,7 @@ func translateInstanceState(state ngModels.InstanceStateType) eval.State {
}
}
func (st *Manager) deleteStaleStatesFromCache(ctx context.Context, logger log.Logger, evaluatedAt time.Time, alertRule *ngModels.AlertRule) []StateTransition {
func (st *Manager) deleteStaleStatesFromCache(logger log.Logger, evaluatedAt time.Time, alertRule *ngModels.AlertRule, takeImageFn func() *ngModels.Image) []StateTransition {
// If we are removing two or more stale series it makes sense to share the resolved image as the alert rule is the same.
// TODO: We will need to change this when we support images without screenshots as each series will have a different image
staleStates := st.cache.deleteRuleStates(alertRule.GetKey(), func(s *State) bool {
@ -606,13 +626,8 @@ func (st *Manager) deleteStaleStatesFromCache(ctx context.Context, logger log.Lo
if oldState == eval.Alerting {
s.ResolvedAt = &evaluatedAt
image, err := takeImage(ctx, st.images, alertRule)
if err != nil {
logger.Warn("Failed to take an image",
"dashboard", alertRule.GetDashboardUID(),
"panel", alertRule.GetPanelID(),
"error", err)
} else if image != nil {
image := takeImageFn()
if image != nil {
s.Image = image
}
}

View File

@ -19,6 +19,7 @@ import (
"github.com/grafana/grafana/pkg/expr"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/log/logtest"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
@ -26,18 +27,6 @@ import (
"github.com/grafana/grafana/pkg/util"
)
// Not for parallel tests.
type CountingImageService struct {
Called int
}
func (c *CountingImageService) NewImage(_ context.Context, _ *ngmodels.AlertRule) (*ngmodels.Image, error) {
c.Called += 1
return &ngmodels.Image{
Token: fmt.Sprint(rand.Int()),
}, nil
}
func TestStateIsStale(t *testing.T) {
now := time.Now()
intervalSeconds := rand.Int63n(10) + 5
@ -3932,6 +3921,161 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
})
}
func TestProcessEvalResults_Screenshots(t *testing.T) {
gen := ngmodels.RuleGen
baseRule := gen.With(
gen.WithDashboardAndPanel(util.Pointer(util.GenerateShortUID()), util.Pointer(rand.Int63())),
gen.WithLabels(nil),
gen.WithFor(0),
).Generate()
evalDuration := time.Duration(baseRule.IntervalSeconds) * time.Second
t0 := time.Now()
tn := func(n int) time.Time {
return t0.Add(time.Duration(n) * evalDuration)
}
t1 := tn(1)
randomImage := func() *ngmodels.Image {
return &ngmodels.Image{Token: fmt.Sprint(rand.Int())}
}
newState := func(s eval.State, labels data.Labels, image *ngmodels.Image) State {
res := State{
AlertRuleUID: baseRule.UID,
OrgID: baseRule.OrgID,
Image: image,
Labels: labels,
ResultFingerprint: labels.Fingerprint(),
State: s,
LatestResult: &Evaluation{
EvaluationState: s,
},
StartsAt: t0,
LastEvaluationTime: t0,
CacheID: data.Fingerprint(0),
}
setCacheID(&res)
return res
}
newResult := func(mutators ...eval.ResultMutator) eval.Result {
r := eval.Result{
State: eval.Normal,
}
for _, mutator := range mutators {
mutator(&r)
}
return r
}
labels1 := data.Labels{
"instance_label": "test-1",
}
labels2 := data.Labels{
"instance_label": "test-2",
}
labels3 := data.Labels{
"instance_label": "test-3",
}
testCases := []struct {
desc string
rule ngmodels.AlertRule
initStates []State
results [][]eval.Result
imageService *CountingImageService
expectedCalledTimes int
}{
{
desc: "when transition to Alerting from empty state",
rule: baseRule,
initStates: []State{},
results: [][]eval.Result{
{
newResult(eval.WithState(eval.Alerting), eval.WithLabels(labels1)),
newResult(eval.WithState(eval.Alerting), eval.WithLabels(labels2)),
newResult(eval.WithState(eval.Alerting), eval.WithLabels(labels3)),
},
},
imageService: newSuccessfulCountingImageService(),
expectedCalledTimes: 1,
},
{
desc: "when transition to Alerting from Normal states, existing images ignored",
initStates: []State{
newState(eval.Normal, labels1, randomImage()),
newState(eval.Normal, labels2, nil),
newState(eval.Normal, labels3, randomImage()),
},
results: [][]eval.Result{
{
newResult(eval.WithState(eval.Alerting), eval.WithLabels(labels1)),
newResult(eval.WithState(eval.Alerting), eval.WithLabels(labels2)),
newResult(eval.WithState(eval.Alerting), eval.WithLabels(labels3)),
},
},
imageService: newSuccessfulCountingImageService(),
expectedCalledTimes: 1,
},
{
desc: "when Alerting and no screenshot",
initStates: []State{
newState(eval.Alerting, labels1, nil),
newState(eval.Alerting, labels2, nil),
newState(eval.Alerting, labels3, nil),
},
results: [][]eval.Result{
{
newResult(eval.WithState(eval.Alerting), eval.WithLabels(labels1)),
newResult(eval.WithState(eval.Alerting), eval.WithLabels(labels2)),
newResult(eval.WithState(eval.Alerting), eval.WithLabels(labels3)),
},
},
imageService: newSuccessfulCountingImageService(),
expectedCalledTimes: 1,
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
clk := clock.NewMock()
ctx := context.Background()
cfg := ManagerCfg{
Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(),
ExternalURL: nil,
InstanceStore: &FakeInstanceStore{},
Images: tc.imageService,
Clock: clk,
Historian: &FakeHistorian{},
Tracer: tracing.InitializeTracerForTest(),
Log: &logtest.Fake{},
}
mgr := NewManager(cfg, NewNoopPersister())
for _, s := range tc.initStates {
mgr.cache.set(&s)
}
for n, results := range tc.results {
tx := tn(n)
clk.Set(t1)
for idx := range results {
results[idx].EvaluatedAt = tx
}
transitions := mgr.ProcessEvalResults(ctx, t1, &baseRule, results, nil, nil)
for _, transition := range transitions {
assert.Equalf(t, tc.imageService.Image, transition.Image, "Transition %s does not have image but should", transition.Labels.String())
}
}
assert.Equal(t, tc.expectedCalledTimes, tc.imageService.Called)
})
}
}
func setCacheID(s *State) *State {
if s.CacheID != 0 {
return s

View File

@ -2,6 +2,8 @@ package state
import (
"context"
"fmt"
"math/rand"
"slices"
"sync"
@ -107,3 +109,33 @@ func (s *NoopImageService) NewImage(_ context.Context, _ *models.AlertRule) (*mo
// NoopSender is a no-op sender. Used when you want state manager to update LastSentAt without sending any alerts.
var NoopSender = func(_ context.Context, _ StateTransitions) {}
type CountingImageService struct {
mtx sync.Mutex
Called int
Image *models.Image
Err error
}
func (c *CountingImageService) NewImage(_ context.Context, _ *models.AlertRule) (*models.Image, error) {
c.mtx.Lock()
defer c.mtx.Unlock()
c.Called += 1
return c.Image, c.Err
}
func newSuccessfulCountingImageService() *CountingImageService {
return &CountingImageService{
Called: 0,
Image: &models.Image{
Token: fmt.Sprint(rand.Int()),
},
}
}
func NewFailingCountingImageService(err error) *CountingImageService {
return &CountingImageService{
Called: 0,
Err: err,
}
}