mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Alerting: Update state manager to have immutable state in cache (#95985)
* create a new state and set at the end * propagate labels datasource_uid and ref_id from current state if it's error * copy the state when apply to all
This commit is contained in:
parent
f9ac3301d3
commit
420db99d16
@ -71,68 +71,7 @@ func (c *cache) countAlertsBy(state eval.State) float64 {
|
||||
return count
|
||||
}
|
||||
|
||||
func (c *cache) getOrCreate(ctx context.Context, log log.Logger, alertRule *ngModels.AlertRule, result eval.Result, extraLabels data.Labels, externalURL *url.URL) *State {
|
||||
// Calculation of state ID involves label and annotation expansion, which may be resource intensive operations, and doing it in the context guarded by mtxStates may create a lot of contention.
|
||||
// Instead of just calculating ID we create an entire state - a candidate. If rule states already hold a state with this ID, this candidate will be discarded and the existing one will be returned.
|
||||
// Otherwise, this candidate will be added to the rule states and returned.
|
||||
stateCandidate := calculateState(ctx, log, alertRule, result, extraLabels, externalURL)
|
||||
return c.getOrAdd(stateCandidate, log)
|
||||
}
|
||||
|
||||
// getOrAdd retrieves an existing State from the cache if it exists,
|
||||
// or adds the provided State if it is not present.
|
||||
func (c *cache) getOrAdd(state State, log log.Logger) *State {
|
||||
c.mtxStates.Lock()
|
||||
defer c.mtxStates.Unlock()
|
||||
|
||||
// Retrieve or initialize the org-level map for storing rule states
|
||||
var orgStates map[string]*ruleStates
|
||||
var ok bool
|
||||
if orgStates, ok = c.states[state.OrgID]; !ok {
|
||||
orgStates = make(map[string]*ruleStates)
|
||||
c.states[state.OrgID] = orgStates
|
||||
}
|
||||
|
||||
// Retrieve or initialize the rule-level states map
|
||||
var rs *ruleStates
|
||||
if rs, ok = orgStates[state.AlertRuleUID]; !ok {
|
||||
rs = &ruleStates{states: make(map[data.Fingerprint]*State)}
|
||||
c.states[state.OrgID][state.AlertRuleUID] = rs
|
||||
}
|
||||
|
||||
return rs.getOrAdd(state, log)
|
||||
}
|
||||
|
||||
func (rs *ruleStates) getOrAdd(stateCandidate State, log log.Logger) *State {
|
||||
state, ok := rs.states[stateCandidate.CacheID]
|
||||
// Check if the state with this ID already exists.
|
||||
if !ok {
|
||||
rs.states[stateCandidate.CacheID] = &stateCandidate
|
||||
return &stateCandidate
|
||||
}
|
||||
|
||||
// Annotations can change over time, however we also want to maintain
|
||||
// certain annotations across evaluations
|
||||
for k, v := range state.Annotations {
|
||||
if _, ok := ngModels.InternalAnnotationNameSet[k]; ok {
|
||||
// If the annotation is not present then it should be copied from the
|
||||
// previous state to the next state
|
||||
if _, ok := stateCandidate.Annotations[k]; !ok {
|
||||
stateCandidate.Annotations[k] = v
|
||||
}
|
||||
}
|
||||
}
|
||||
state.Annotations = stateCandidate.Annotations
|
||||
state.Values = stateCandidate.Values
|
||||
if state.ResultFingerprint != stateCandidate.ResultFingerprint {
|
||||
log.Info("Result fingerprint has changed", "oldFingerprint", state.ResultFingerprint, "newFingerprint", stateCandidate.ResultFingerprint, "cacheID", state.CacheID, "stateLabels", state.Labels.String())
|
||||
state.ResultFingerprint = stateCandidate.ResultFingerprint
|
||||
}
|
||||
rs.states[stateCandidate.CacheID] = state
|
||||
return state
|
||||
}
|
||||
|
||||
func calculateState(ctx context.Context, log log.Logger, alertRule *ngModels.AlertRule, result eval.Result, extraLabels data.Labels, externalURL *url.URL) State {
|
||||
func expandAnnotationsAndLabels(ctx context.Context, log log.Logger, alertRule *ngModels.AlertRule, result eval.Result, extraLabels data.Labels, externalURL *url.URL) (data.Labels, data.Labels) {
|
||||
var reserved []string
|
||||
resultLabels := result.Instance
|
||||
if len(resultLabels) > 0 {
|
||||
@ -201,23 +140,82 @@ func calculateState(ctx context.Context, log log.Logger, alertRule *ngModels.Ale
|
||||
if len(dupes) > 0 {
|
||||
log.Debug("Evaluation result contains either reserved labels or labels declared in the rules. Those labels from the result will be ignored", "labels", dupes)
|
||||
}
|
||||
return lbs, annotations
|
||||
}
|
||||
|
||||
func (c *cache) create(ctx context.Context, log log.Logger, alertRule *ngModels.AlertRule, result eval.Result, extraLabels data.Labels, externalURL *url.URL) *State {
|
||||
lbs, annotations := expandAnnotationsAndLabels(ctx, log, alertRule, result, extraLabels, externalURL)
|
||||
|
||||
cacheID := lbs.Fingerprint()
|
||||
|
||||
// For new states, we set StartsAt & EndsAt to EvaluatedAt as this is the
|
||||
// expected value for a Normal state during state transition.
|
||||
newState := State{
|
||||
AlertRuleUID: alertRule.UID,
|
||||
OrgID: alertRule.OrgID,
|
||||
CacheID: cacheID,
|
||||
Labels: lbs,
|
||||
Annotations: annotations,
|
||||
EvaluationDuration: result.EvaluationDuration,
|
||||
StartsAt: result.EvaluatedAt,
|
||||
EndsAt: result.EvaluatedAt,
|
||||
ResultFingerprint: result.Instance.Fingerprint(), // remember original result fingerprint
|
||||
OrgID: alertRule.OrgID,
|
||||
AlertRuleUID: alertRule.UID,
|
||||
CacheID: cacheID,
|
||||
State: eval.Normal,
|
||||
StateReason: "",
|
||||
ResultFingerprint: result.Instance.Fingerprint(), // remember original result fingerprint
|
||||
LatestResult: nil,
|
||||
Error: nil,
|
||||
Image: nil,
|
||||
Annotations: annotations,
|
||||
Labels: lbs,
|
||||
Values: nil,
|
||||
StartsAt: result.EvaluatedAt,
|
||||
EndsAt: result.EvaluatedAt,
|
||||
ResolvedAt: nil,
|
||||
LastSentAt: nil,
|
||||
LastEvaluationString: "",
|
||||
LastEvaluationTime: result.EvaluatedAt,
|
||||
EvaluationDuration: result.EvaluationDuration,
|
||||
}
|
||||
return newState
|
||||
|
||||
existingState := c.get(alertRule.OrgID, alertRule.UID, cacheID)
|
||||
if existingState == nil {
|
||||
return &newState
|
||||
}
|
||||
// if there is existing state, copy over the current values that may be needed to determine the final state.
|
||||
// TODO remove some unnecessary assignments below because they are overridden in setNextState
|
||||
newState.State = existingState.State
|
||||
newState.StateReason = existingState.StateReason
|
||||
newState.Image = existingState.Image
|
||||
newState.LatestResult = existingState.LatestResult
|
||||
newState.Error = existingState.Error
|
||||
newState.Values = existingState.Values
|
||||
newState.LastEvaluationString = existingState.LastEvaluationString
|
||||
newState.StartsAt = existingState.StartsAt
|
||||
newState.EndsAt = existingState.EndsAt
|
||||
newState.ResolvedAt = existingState.ResolvedAt
|
||||
newState.LastSentAt = existingState.LastSentAt
|
||||
// Annotations can change over time, however we also want to maintain
|
||||
// certain annotations across evaluations
|
||||
for key := range ngModels.InternalAnnotationNameSet { // Changing in
|
||||
value, ok := existingState.Annotations[key]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
// If the annotation is not present then it should be copied from
|
||||
// the current state to the new state
|
||||
if _, ok = newState.Annotations[key]; !ok {
|
||||
newState.Annotations[key] = value
|
||||
}
|
||||
}
|
||||
|
||||
// if the current state is "data source error" then it may have additional labels that may not exist in the new state.
|
||||
// See https://github.com/grafana/grafana/blob/c7fdf8ce706c2c9d438f5e6eabd6e580bac4946b/pkg/services/ngalert/state/state.go#L161-L163
|
||||
// copy known labels over to the new instance, it can help reduce flapping
|
||||
// TODO fix this?
|
||||
if existingState.State == eval.Error && result.State == eval.Error {
|
||||
setIfExist := func(lbl string) {
|
||||
if v, ok := existingState.Labels[lbl]; ok {
|
||||
newState.Labels[lbl] = v
|
||||
}
|
||||
}
|
||||
setIfExist("datasource_uid")
|
||||
setIfExist("ref_id")
|
||||
}
|
||||
return &newState
|
||||
}
|
||||
|
||||
// expand returns the expanded templates of all annotations or labels for the template data.
|
||||
@ -264,6 +262,15 @@ func (c *cache) deleteRuleStates(ruleKey ngModels.AlertRuleKey, predicate func(s
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *cache) setRuleStates(ruleKey ngModels.AlertRuleKey, s ruleStates) {
|
||||
c.mtxStates.Lock()
|
||||
defer c.mtxStates.Unlock()
|
||||
if _, ok := c.states[ruleKey.OrgID]; !ok {
|
||||
c.states[ruleKey.OrgID] = make(map[string]*ruleStates)
|
||||
}
|
||||
c.states[ruleKey.OrgID][ruleKey.UID] = &s
|
||||
}
|
||||
|
||||
func (c *cache) set(entry *State) {
|
||||
c.mtxStates.Lock()
|
||||
defer c.mtxStates.Unlock()
|
||||
|
@ -43,7 +43,7 @@ func BenchmarkGetOrCreateTest(b *testing.B) {
|
||||
// values := make([]int64, count)
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
_ = cache.getOrCreate(ctx, log, rule, result, nil, u)
|
||||
_ = cache.create(ctx, log, rule, result, nil, u)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net/url"
|
||||
"testing"
|
||||
"time"
|
||||
@ -117,7 +118,7 @@ func Test_expand(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func Test_getOrCreate(t *testing.T) {
|
||||
func Test_create(t *testing.T) {
|
||||
url := &url.URL{
|
||||
Scheme: "http",
|
||||
Host: "localhost:3000",
|
||||
@ -136,7 +137,7 @@ func Test_getOrCreate(t *testing.T) {
|
||||
result := eval.Result{
|
||||
Instance: models.GenerateAlertLabels(5, "result-"),
|
||||
}
|
||||
state := c.getOrCreate(context.Background(), l, rule, result, extraLabels, url)
|
||||
state := c.create(context.Background(), l, rule, result, extraLabels, url)
|
||||
for key, expected := range extraLabels {
|
||||
require.Equal(t, expected, state.Labels[key])
|
||||
}
|
||||
@ -164,7 +165,7 @@ func Test_getOrCreate(t *testing.T) {
|
||||
result.Instance[key] = "result-" + util.GenerateShortUID()
|
||||
}
|
||||
|
||||
state := c.getOrCreate(context.Background(), l, rule, result, extraLabels, url)
|
||||
state := c.create(context.Background(), l, rule, result, extraLabels, url)
|
||||
for key, expected := range extraLabels {
|
||||
require.Equal(t, expected, state.Labels[key])
|
||||
}
|
||||
@ -180,7 +181,7 @@ func Test_getOrCreate(t *testing.T) {
|
||||
for key := range rule.Labels {
|
||||
result.Instance[key] = "result-" + util.GenerateShortUID()
|
||||
}
|
||||
state := c.getOrCreate(context.Background(), l, rule, result, extraLabels, url)
|
||||
state := c.create(context.Background(), l, rule, result, extraLabels, url)
|
||||
for key, expected := range rule.Labels {
|
||||
require.Equal(t, expected, state.Labels[key])
|
||||
}
|
||||
@ -202,7 +203,7 @@ func Test_getOrCreate(t *testing.T) {
|
||||
}
|
||||
rule.Labels = labelTemplates
|
||||
|
||||
state := c.getOrCreate(context.Background(), l, rule, result, extraLabels, url)
|
||||
state := c.create(context.Background(), l, rule, result, extraLabels, url)
|
||||
for key, expected := range extraLabels {
|
||||
assert.Equal(t, expected, state.Labels["rule-"+key])
|
||||
}
|
||||
@ -210,7 +211,6 @@ func Test_getOrCreate(t *testing.T) {
|
||||
assert.Equal(t, expected, state.Labels["rule-"+key])
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("rule annotations should be able to be expanded with result and extra labels", func(t *testing.T) {
|
||||
result := eval.Result{
|
||||
Instance: models.GenerateAlertLabels(5, "result-"),
|
||||
@ -229,7 +229,7 @@ func Test_getOrCreate(t *testing.T) {
|
||||
}
|
||||
rule.Annotations = annotationTemplates
|
||||
|
||||
state := c.getOrCreate(context.Background(), l, rule, result, extraLabels, url)
|
||||
state := c.create(context.Background(), l, rule, result, extraLabels, url)
|
||||
for key, expected := range extraLabels {
|
||||
assert.Equal(t, expected, state.Annotations["rule-"+key])
|
||||
}
|
||||
@ -237,7 +237,6 @@ func Test_getOrCreate(t *testing.T) {
|
||||
assert.Equal(t, expected, state.Annotations["rule-"+key])
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("when result labels collide with system labels from LabelsUserCannotSpecify", func(t *testing.T) {
|
||||
result := eval.Result{
|
||||
Instance: models.GenerateAlertLabels(5, "result-"),
|
||||
@ -260,7 +259,7 @@ func Test_getOrCreate(t *testing.T) {
|
||||
|
||||
rule := generateRule()
|
||||
|
||||
state := c.getOrCreate(context.Background(), l, rule, result, nil, url)
|
||||
state := c.create(context.Background(), l, rule, result, nil, url)
|
||||
|
||||
for key := range models.LabelsUserCannotSpecify {
|
||||
assert.NotContains(t, state.Labels, key)
|
||||
@ -282,7 +281,7 @@ func Test_getOrCreate(t *testing.T) {
|
||||
result.Instance["label1_user"] = uuid.NewString()
|
||||
result.Instance["label4_user"] = uuid.NewString()
|
||||
|
||||
state = c.getOrCreate(context.Background(), l, rule, result, nil, url)
|
||||
state = c.create(context.Background(), l, rule, result, nil, url)
|
||||
assert.NotContains(t, state.Labels, "__label1__")
|
||||
assert.Contains(t, state.Labels, "label1")
|
||||
assert.Equal(t, state.Labels["label1"], result.Instance["label1"])
|
||||
@ -292,65 +291,138 @@ func Test_getOrCreate(t *testing.T) {
|
||||
assert.Equal(t, state.Labels["label4_user"], result.Instance["label4_user"])
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func Test_getOrAdd(t *testing.T) {
|
||||
logger := log.NewNopLogger()
|
||||
t.Run("creates a state with preset fields if there is no current state", func(t *testing.T) {
|
||||
rule := generateRule()
|
||||
|
||||
type testCase struct {
|
||||
name string
|
||||
initialState *State
|
||||
newState *State
|
||||
}
|
||||
extraLabels := models.GenerateAlertLabels(2, "extra-")
|
||||
|
||||
orgID := int64(1)
|
||||
alertRuleUID := "rule-uid"
|
||||
cacheID := data.Fingerprint(12345)
|
||||
result := eval.Result{
|
||||
Instance: models.GenerateAlertLabels(5, "result-"),
|
||||
}
|
||||
|
||||
cases := []testCase{
|
||||
{
|
||||
name: "add new state",
|
||||
initialState: nil,
|
||||
newState: &State{
|
||||
OrgID: orgID,
|
||||
AlertRuleUID: alertRuleUID,
|
||||
CacheID: cacheID,
|
||||
Labels: data.Labels{"label1": "value1"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "retrieve existing state",
|
||||
initialState: &State{
|
||||
OrgID: orgID,
|
||||
AlertRuleUID: alertRuleUID,
|
||||
CacheID: cacheID,
|
||||
Labels: data.Labels{"label1": "value1"},
|
||||
},
|
||||
newState: &State{
|
||||
OrgID: orgID,
|
||||
AlertRuleUID: alertRuleUID,
|
||||
CacheID: cacheID,
|
||||
Labels: data.Labels{"label2": "value2"},
|
||||
},
|
||||
},
|
||||
}
|
||||
expectedLbl, expectedAnn := expandAnnotationsAndLabels(context.Background(), l, rule, result, extraLabels, url)
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
c := newCache()
|
||||
if tc.initialState != nil {
|
||||
c.getOrAdd(*tc.initialState, logger)
|
||||
}
|
||||
state := c.create(context.Background(), l, rule, result, extraLabels, url)
|
||||
|
||||
result := c.getOrAdd(*tc.newState, logger)
|
||||
assert.Equal(t, rule.OrgID, state.OrgID)
|
||||
assert.Equal(t, rule.UID, state.AlertRuleUID)
|
||||
assert.Equal(t, state.Labels.Fingerprint(), state.CacheID)
|
||||
assert.Equal(t, result.State, state.State)
|
||||
assert.Equal(t, "", state.StateReason)
|
||||
assert.Equal(t, result.Instance.Fingerprint(), state.ResultFingerprint)
|
||||
assert.Nil(t, state.LatestResult)
|
||||
assert.Nil(t, state.Error)
|
||||
assert.Nil(t, state.Image)
|
||||
assert.EqualValues(t, expectedAnn, state.Annotations)
|
||||
assert.EqualValues(t, expectedLbl, state.Labels)
|
||||
assert.Nil(t, state.Values)
|
||||
assert.Equal(t, result.EvaluatedAt, state.StartsAt)
|
||||
assert.Equal(t, result.EvaluatedAt, state.EndsAt)
|
||||
assert.Nil(t, state.ResolvedAt)
|
||||
assert.Nil(t, state.LastSentAt)
|
||||
assert.Equal(t, "", state.LastEvaluationString)
|
||||
assert.Equal(t, result.EvaluatedAt, state.LastEvaluationTime)
|
||||
assert.Equal(t, result.EvaluationDuration, state.EvaluationDuration)
|
||||
})
|
||||
|
||||
if tc.initialState == nil {
|
||||
require.Equal(t, tc.newState, result, "expected newState to be added")
|
||||
} else {
|
||||
require.Equal(t, tc.initialState, result, "expected to retrieve existing state")
|
||||
}
|
||||
t.Run("it populates some fields from the current state if it exists", func(t *testing.T) {
|
||||
rule := generateRule()
|
||||
|
||||
extraLabels := models.GenerateAlertLabels(2, "extra-")
|
||||
|
||||
result := eval.Result{
|
||||
Instance: models.GenerateAlertLabels(5, "result-"),
|
||||
}
|
||||
|
||||
expectedLbl, expectedAnn := expandAnnotationsAndLabels(context.Background(), l, rule, result, extraLabels, url)
|
||||
|
||||
current := randomSate(rule.GetKey())
|
||||
current.CacheID = expectedLbl.Fingerprint()
|
||||
|
||||
c.set(¤t)
|
||||
|
||||
state := c.create(context.Background(), l, rule, result, extraLabels, url)
|
||||
|
||||
assert.Equal(t, rule.OrgID, state.OrgID)
|
||||
assert.Equal(t, rule.UID, state.AlertRuleUID)
|
||||
assert.Equal(t, state.Labels.Fingerprint(), state.CacheID)
|
||||
assert.Equal(t, result.Instance.Fingerprint(), state.ResultFingerprint)
|
||||
assert.EqualValues(t, expectedAnn, state.Annotations)
|
||||
assert.EqualValues(t, expectedLbl, state.Labels)
|
||||
assert.Equal(t, result.EvaluatedAt, state.LastEvaluationTime)
|
||||
assert.Equal(t, result.EvaluationDuration, state.EvaluationDuration)
|
||||
|
||||
assert.Equal(t, current.State, state.State)
|
||||
assert.Equal(t, current.StateReason, state.StateReason)
|
||||
assert.Equal(t, current.Image, state.Image)
|
||||
assert.Equal(t, current.LatestResult, state.LatestResult)
|
||||
assert.Equal(t, current.Error, state.Error)
|
||||
assert.Equal(t, current.Values, state.Values)
|
||||
assert.Equal(t, current.StartsAt, state.StartsAt)
|
||||
assert.Equal(t, current.EndsAt, state.EndsAt)
|
||||
assert.Equal(t, current.ResolvedAt, state.ResolvedAt)
|
||||
assert.Equal(t, current.LastSentAt, state.LastSentAt)
|
||||
assert.Equal(t, current.LastEvaluationString, state.LastEvaluationString)
|
||||
|
||||
t.Run("if result Error and current state is Error it should copy datasource_uid and ref_id labels", func(t *testing.T) {
|
||||
current = randomSate(rule.GetKey())
|
||||
current.CacheID = expectedLbl.Fingerprint()
|
||||
current.State = eval.Error
|
||||
current.Labels["datasource_uid"] = util.GenerateShortUID()
|
||||
current.Labels["ref_id"] = util.GenerateShortUID()
|
||||
|
||||
c.set(¤t)
|
||||
|
||||
result.State = eval.Error
|
||||
state = c.create(context.Background(), l, rule, result, extraLabels, url)
|
||||
|
||||
l := expectedLbl.Copy()
|
||||
l["datasource_uid"] = current.Labels["datasource_uid"]
|
||||
l["ref_id"] = current.Labels["ref_id"]
|
||||
|
||||
assert.Equal(t, current.CacheID, state.CacheID)
|
||||
assert.EqualValues(t, l, state.Labels)
|
||||
|
||||
assert.Equal(t, rule.OrgID, state.OrgID)
|
||||
assert.Equal(t, rule.UID, state.AlertRuleUID)
|
||||
|
||||
assert.Equal(t, result.Instance.Fingerprint(), state.ResultFingerprint)
|
||||
assert.EqualValues(t, expectedAnn, state.Annotations)
|
||||
assert.Equal(t, result.EvaluatedAt, state.LastEvaluationTime)
|
||||
assert.Equal(t, result.EvaluationDuration, state.EvaluationDuration)
|
||||
|
||||
assert.Equal(t, current.State, state.State)
|
||||
assert.Equal(t, current.StateReason, state.StateReason)
|
||||
assert.Equal(t, current.Image, state.Image)
|
||||
assert.Equal(t, current.LatestResult, state.LatestResult)
|
||||
assert.Equal(t, current.Error, state.Error)
|
||||
assert.Equal(t, current.Values, state.Values)
|
||||
assert.Equal(t, current.StartsAt, state.StartsAt)
|
||||
assert.Equal(t, current.EndsAt, state.EndsAt)
|
||||
assert.Equal(t, current.ResolvedAt, state.ResolvedAt)
|
||||
assert.Equal(t, current.LastSentAt, state.LastSentAt)
|
||||
assert.Equal(t, current.LastEvaluationString, state.LastEvaluationString)
|
||||
})
|
||||
}
|
||||
t.Run("copies system-owned annotations from current state", func(t *testing.T) {
|
||||
current = randomSate(rule.GetKey())
|
||||
current.CacheID = expectedLbl.Fingerprint()
|
||||
current.State = eval.Error
|
||||
for key := range models.InternalAnnotationNameSet {
|
||||
current.Annotations[key] = util.GenerateShortUID()
|
||||
}
|
||||
c.set(¤t)
|
||||
|
||||
result.State = eval.Error
|
||||
state = c.create(context.Background(), l, rule, result, extraLabels, url)
|
||||
ann := expectedAnn.Copy()
|
||||
for key := range models.InternalAnnotationNameSet {
|
||||
ann[key] = current.Annotations[key]
|
||||
}
|
||||
assert.EqualValues(t, expectedLbl, state.Labels)
|
||||
assert.EqualValues(t, ann, state.Annotations)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func Test_mergeLabels(t *testing.T) {
|
||||
@ -385,3 +457,39 @@ func Test_mergeLabels(t *testing.T) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func randomSate(ruleKey models.AlertRuleKey) State {
|
||||
return State{
|
||||
OrgID: ruleKey.OrgID,
|
||||
AlertRuleUID: ruleKey.UID,
|
||||
CacheID: data.Fingerprint(rand.Int63()),
|
||||
ResultFingerprint: data.Fingerprint(rand.Int63()),
|
||||
State: eval.Alerting,
|
||||
StateReason: util.GenerateShortUID(),
|
||||
LatestResult: &Evaluation{
|
||||
EvaluationTime: time.Time{},
|
||||
EvaluationState: eval.Error,
|
||||
Values: map[string]float64{
|
||||
"A": rand.Float64(),
|
||||
},
|
||||
Condition: "A",
|
||||
},
|
||||
Error: errors.New(util.GenerateShortUID()),
|
||||
Image: &models.Image{
|
||||
ID: rand.Int63(),
|
||||
Token: util.GenerateShortUID(),
|
||||
},
|
||||
Annotations: models.GenerateAlertLabels(2, "current-"),
|
||||
Labels: models.GenerateAlertLabels(2, "current-"),
|
||||
Values: map[string]float64{
|
||||
"A": rand.Float64(),
|
||||
},
|
||||
StartsAt: randomTimeInPast(),
|
||||
EndsAt: randomTimeInFuture(),
|
||||
ResolvedAt: util.Pointer(randomTimeInPast()),
|
||||
LastSentAt: util.Pointer(randomTimeInPast()),
|
||||
LastEvaluationString: util.GenerateShortUID(),
|
||||
LastEvaluationTime: randomTimeInPast(),
|
||||
EvaluationDuration: time.Duration(6000),
|
||||
}
|
||||
}
|
||||
|
@ -202,7 +202,7 @@ func (st *Manager) Warm(ctx context.Context, rulesReader RuleReader, instanceRea
|
||||
}
|
||||
resultFp = data.Fingerprint(fp)
|
||||
}
|
||||
state := State{
|
||||
state := &State{
|
||||
AlertRuleUID: entry.RuleUID,
|
||||
OrgID: entry.RuleOrgID,
|
||||
CacheID: cacheID,
|
||||
@ -218,7 +218,7 @@ func (st *Manager) Warm(ctx context.Context, rulesReader RuleReader, instanceRea
|
||||
ResolvedAt: entry.ResolvedAt,
|
||||
LastSentAt: entry.LastSentAt,
|
||||
}
|
||||
st.cache.getOrAdd(state, logger)
|
||||
st.cache.set(state)
|
||||
statesCount++
|
||||
}
|
||||
}
|
||||
@ -391,46 +391,50 @@ func (st *Manager) setNextStateForRule(ctx context.Context, alertRule *ngModels.
|
||||
}
|
||||
}
|
||||
}
|
||||
transitions := st.setNextStateForAll(ctx, alertRule, results[0], logger)
|
||||
annotations := map[string]string{
|
||||
"datasource_uid": datasourceUIDs.String(),
|
||||
"ref_id": refIds.String(),
|
||||
}
|
||||
transitions := st.setNextStateForAll(ctx, alertRule, results[0], logger, annotations)
|
||||
if len(transitions) > 0 {
|
||||
for _, t := range transitions {
|
||||
if t.State.Annotations == nil {
|
||||
t.State.Annotations = make(map[string]string)
|
||||
}
|
||||
t.State.Annotations["datasource_uid"] = datasourceUIDs.String()
|
||||
t.State.Annotations["ref_id"] = refIds.String()
|
||||
}
|
||||
return transitions // if there are no current states for the rule. Create ones for each result
|
||||
}
|
||||
}
|
||||
if st.applyNoDataAndErrorToAllStates && results.IsError() && (alertRule.ExecErrState == ngModels.AlertingErrState || alertRule.ExecErrState == ngModels.OkErrState || alertRule.ExecErrState == ngModels.KeepLastErrState) {
|
||||
// TODO squash all errors into one, and provide as annotation
|
||||
transitions := st.setNextStateForAll(ctx, alertRule, results[0], logger)
|
||||
transitions := st.setNextStateForAll(ctx, alertRule, results[0], logger, nil)
|
||||
if len(transitions) > 0 {
|
||||
return transitions // if there are no current states for the rule. Create ones for each result
|
||||
}
|
||||
}
|
||||
transitions := make([]StateTransition, 0, len(results))
|
||||
for _, result := range results {
|
||||
currentState := st.cache.getOrCreate(ctx, logger, alertRule, result, extraLabels, st.externalURL)
|
||||
s := st.setNextState(ctx, alertRule, currentState, result, logger)
|
||||
currentState := st.cache.create(ctx, logger, alertRule, result, extraLabels, st.externalURL)
|
||||
s := st.setNextState(ctx, alertRule, currentState, result, nil, logger)
|
||||
st.cache.set(currentState) // replace the existing state with the new one
|
||||
transitions = append(transitions, s)
|
||||
}
|
||||
return transitions
|
||||
}
|
||||
|
||||
func (st *Manager) setNextStateForAll(ctx context.Context, alertRule *ngModels.AlertRule, result eval.Result, logger log.Logger) []StateTransition {
|
||||
func (st *Manager) setNextStateForAll(ctx context.Context, alertRule *ngModels.AlertRule, result eval.Result, logger log.Logger, extraAnnotations data.Labels) []StateTransition {
|
||||
currentStates := st.cache.getStatesForRuleUID(alertRule.OrgID, alertRule.UID, false)
|
||||
transitions := make([]StateTransition, 0, len(currentStates))
|
||||
updated := ruleStates{
|
||||
states: make(map[data.Fingerprint]*State, len(currentStates)),
|
||||
}
|
||||
for _, currentState := range currentStates {
|
||||
t := st.setNextState(ctx, alertRule, currentState, result, logger)
|
||||
newState := currentState.Copy()
|
||||
t := st.setNextState(ctx, alertRule, newState, result, extraAnnotations, logger)
|
||||
updated.states[newState.CacheID] = newState
|
||||
transitions = append(transitions, t)
|
||||
}
|
||||
st.cache.setRuleStates(alertRule.GetKey(), updated)
|
||||
return transitions
|
||||
}
|
||||
|
||||
// Set the current state based on evaluation results
|
||||
func (st *Manager) setNextState(ctx context.Context, alertRule *ngModels.AlertRule, currentState *State, result eval.Result, logger log.Logger) StateTransition {
|
||||
func (st *Manager) setNextState(ctx context.Context, alertRule *ngModels.AlertRule, currentState *State, result eval.Result, extraAnnotations data.Labels, logger log.Logger) StateTransition {
|
||||
start := st.clock.Now()
|
||||
|
||||
currentState.LastEvaluationTime = result.EvaluatedAt
|
||||
@ -518,7 +522,9 @@ func (st *Manager) setNextState(ctx context.Context, alertRule *ngModels.AlertRu
|
||||
}
|
||||
}
|
||||
|
||||
st.cache.set(currentState)
|
||||
for key, val := range extraAnnotations {
|
||||
currentState.Annotations[key] = val
|
||||
}
|
||||
|
||||
nextState := StateTransition{
|
||||
State: currentState,
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/benbjohnson/clock"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/stretchr/testify/mock"
|
||||
|
||||
@ -21,6 +22,7 @@ import (
|
||||
)
|
||||
|
||||
func BenchmarkProcessEvalResults(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
as := annotations.FakeAnnotationsRepo{}
|
||||
as.On("SaveMany", mock.Anything, mock.Anything).Return(nil)
|
||||
metrics := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)
|
||||
@ -32,6 +34,7 @@ func BenchmarkProcessEvalResults(b *testing.B) {
|
||||
Historian: hist,
|
||||
Tracer: tracing.InitializeTracerForTest(),
|
||||
Log: log.New("ngalert.state.manager"),
|
||||
Clock: clock.New(),
|
||||
}
|
||||
sut := state.NewManager(cfg, state.NewNoopPersister())
|
||||
now := time.Now().UTC()
|
||||
@ -40,6 +43,8 @@ func BenchmarkProcessEvalResults(b *testing.B) {
|
||||
labels := map[string]string{}
|
||||
|
||||
var ans []state.StateTransition
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
ans = sut.ProcessEvalResults(context.Background(), now, &rule, results, labels, nil)
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"maps"
|
||||
"math"
|
||||
"strings"
|
||||
"time"
|
||||
@ -75,6 +76,36 @@ type State struct {
|
||||
EvaluationDuration time.Duration
|
||||
}
|
||||
|
||||
// Copy creates a shallow copy of the State except for labels and annotations.
|
||||
func (a *State) Copy() *State {
|
||||
// Deep copy annotations and labels
|
||||
annotationsCopy := make(map[string]string, len(a.Annotations))
|
||||
maps.Copy(annotationsCopy, a.Annotations)
|
||||
labelsCopy := make(data.Labels, len(a.Labels))
|
||||
maps.Copy(labelsCopy, a.Labels)
|
||||
return &State{
|
||||
OrgID: a.OrgID,
|
||||
AlertRuleUID: a.AlertRuleUID,
|
||||
CacheID: a.CacheID,
|
||||
State: a.State,
|
||||
StateReason: a.StateReason,
|
||||
ResultFingerprint: a.ResultFingerprint,
|
||||
LatestResult: a.LatestResult,
|
||||
Error: a.Error,
|
||||
Image: a.Image,
|
||||
Annotations: annotationsCopy,
|
||||
Labels: labelsCopy,
|
||||
Values: a.Values,
|
||||
StartsAt: a.StartsAt,
|
||||
EndsAt: a.EndsAt,
|
||||
ResolvedAt: a.ResolvedAt,
|
||||
LastSentAt: a.LastSentAt,
|
||||
LastEvaluationString: a.LastEvaluationString,
|
||||
LastEvaluationTime: a.LastEvaluationTime,
|
||||
EvaluationDuration: a.EvaluationDuration,
|
||||
}
|
||||
}
|
||||
|
||||
func (a *State) GetRuleKey() models.AlertRuleKey {
|
||||
return models.AlertRuleKey{
|
||||
OrgID: a.OrgID,
|
||||
|
Loading…
Reference in New Issue
Block a user