From 305123df91cd3011ed0e7185610a435d4ec0fcfd Mon Sep 17 00:00:00 2001 From: Alexander Akhmetov Date: Mon, 4 Nov 2024 18:26:52 +0100 Subject: [PATCH] Alerting: Keep state manager cache during cache warm-up (#95727) * Alerting: Keep state manager cache during cache warm-up Instead of overwriting the state manager cache during warm-up, we update the data in the cache if it is not there yet. If the cache already contains a state entry with the same key, we do not overwrite it. --- pkg/services/ngalert/state/cache.go | 29 ++++++------ pkg/services/ngalert/state/cache_test.go | 59 ++++++++++++++++++++++++ pkg/services/ngalert/state/manager.go | 14 +----- 3 files changed, 77 insertions(+), 25 deletions(-) diff --git a/pkg/services/ngalert/state/cache.go b/pkg/services/ngalert/state/cache.go index 0da09784d38..f9048ea41d3 100644 --- a/pkg/services/ngalert/state/cache.go +++ b/pkg/services/ngalert/state/cache.go @@ -76,22 +76,31 @@ func (c *cache) getOrCreate(ctx context.Context, log log.Logger, alertRule *ngMo // Instead of just calculating ID we create an entire state - a candidate. If rule states already hold a state with this ID, this candidate will be discarded and the existing one will be returned. // Otherwise, this candidate will be added to the rule states and returned. stateCandidate := calculateState(ctx, log, alertRule, result, extraLabels, externalURL) + return c.getOrAdd(stateCandidate, log) +} +// getOrAdd retrieves an existing State from the cache if it exists, +// or adds the provided State if it is not present. +func (c *cache) getOrAdd(state State, log log.Logger) *State { c.mtxStates.Lock() defer c.mtxStates.Unlock() + // Retrieve or initialize the org-level map for storing rule states var orgStates map[string]*ruleStates var ok bool - if orgStates, ok = c.states[stateCandidate.OrgID]; !ok { + if orgStates, ok = c.states[state.OrgID]; !ok { orgStates = make(map[string]*ruleStates) - c.states[stateCandidate.OrgID] = orgStates + c.states[state.OrgID] = orgStates } - var states *ruleStates - if states, ok = orgStates[stateCandidate.AlertRuleUID]; !ok { - states = &ruleStates{states: make(map[data.Fingerprint]*State)} - c.states[stateCandidate.OrgID][stateCandidate.AlertRuleUID] = states + + // Retrieve or initialize the rule-level states map + var rs *ruleStates + if rs, ok = orgStates[state.AlertRuleUID]; !ok { + rs = &ruleStates{states: make(map[data.Fingerprint]*State)} + c.states[state.OrgID][state.AlertRuleUID] = rs } - return states.getOrAdd(stateCandidate, log) + + return rs.getOrAdd(state, log) } func (rs *ruleStates) getOrAdd(stateCandidate State, log log.Logger) *State { @@ -255,12 +264,6 @@ func (c *cache) deleteRuleStates(ruleKey ngModels.AlertRuleKey, predicate func(s return nil } -func (c *cache) setAllStates(newStates map[int64]map[string]*ruleStates) { - c.mtxStates.Lock() - defer c.mtxStates.Unlock() - c.states = newStates -} - func (c *cache) set(entry *State) { c.mtxStates.Lock() defer c.mtxStates.Unlock() diff --git a/pkg/services/ngalert/state/cache_test.go b/pkg/services/ngalert/state/cache_test.go index fc57ced138c..4a10226ef09 100644 --- a/pkg/services/ngalert/state/cache_test.go +++ b/pkg/services/ngalert/state/cache_test.go @@ -294,6 +294,65 @@ func Test_getOrCreate(t *testing.T) { }) } +func Test_getOrAdd(t *testing.T) { + logger := log.NewNopLogger() + + type testCase struct { + name string + initialState *State + newState *State + } + + orgID := int64(1) + alertRuleUID := "rule-uid" + cacheID := data.Fingerprint(12345) + + cases := []testCase{ + { + name: "add new state", + initialState: nil, + newState: &State{ + OrgID: orgID, + AlertRuleUID: alertRuleUID, + CacheID: cacheID, + Labels: data.Labels{"label1": "value1"}, + }, + }, + { + name: "retrieve existing state", + initialState: &State{ + OrgID: orgID, + AlertRuleUID: alertRuleUID, + CacheID: cacheID, + Labels: data.Labels{"label1": "value1"}, + }, + newState: &State{ + OrgID: orgID, + AlertRuleUID: alertRuleUID, + CacheID: cacheID, + Labels: data.Labels{"label2": "value2"}, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + c := newCache() + if tc.initialState != nil { + c.getOrAdd(*tc.initialState, logger) + } + + result := c.getOrAdd(*tc.newState, logger) + + if tc.initialState == nil { + require.Equal(t, tc.newState, result, "expected newState to be added") + } else { + require.Equal(t, tc.initialState, result, "expected to retrieve existing state") + } + }) + } +} + func Test_mergeLabels(t *testing.T) { t.Run("merges two maps", func(t *testing.T) { a := models.GenerateAlertLabels(5, "set1-") diff --git a/pkg/services/ngalert/state/manager.go b/pkg/services/ngalert/state/manager.go index 666fd65de50..a2eb863fddd 100644 --- a/pkg/services/ngalert/state/manager.go +++ b/pkg/services/ngalert/state/manager.go @@ -137,7 +137,6 @@ func (st *Manager) Warm(ctx context.Context, rulesReader RuleReader, instanceRea } statesCount := 0 - states := make(map[int64]map[string]*ruleStates, len(orgIds)) for _, orgId := range orgIds { // Get Rules ruleCmd := ngModels.ListAlertRulesQuery{ @@ -168,9 +167,6 @@ func (st *Manager) Warm(ctx context.Context, rulesReader RuleReader, instanceRea } } - orgStates := make(map[string]*ruleStates, len(ruleByUID)) - states[orgId] = orgStates - // Get Instances cmd := ngModels.ListAlertInstancesQuery{ RuleOrgID: orgId, @@ -193,12 +189,6 @@ func (st *Manager) Warm(ctx context.Context, rulesReader RuleReader, instanceRea annotations = make(map[string]string) } - rulesStates, ok := orgStates[entry.RuleUID] - if !ok { - rulesStates = &ruleStates{states: make(map[data.Fingerprint]*State)} - orgStates[entry.RuleUID] = rulesStates - } - lbs := map[string]string(entry.Labels) cacheID := entry.Labels.Fingerprint() var resultFp data.Fingerprint @@ -209,7 +199,7 @@ func (st *Manager) Warm(ctx context.Context, rulesReader RuleReader, instanceRea } resultFp = data.Fingerprint(fp) } - rulesStates.states[cacheID] = &State{ + state := State{ AlertRuleUID: entry.RuleUID, OrgID: entry.RuleOrgID, CacheID: cacheID, @@ -225,11 +215,11 @@ func (st *Manager) Warm(ctx context.Context, rulesReader RuleReader, instanceRea ResolvedAt: entry.ResolvedAt, LastSentAt: entry.LastSentAt, } + st.cache.getOrAdd(state, st.log) statesCount++ } } - st.cache.setAllStates(states) st.log.Info("State cache has been initialized", "states", statesCount, "duration", time.Since(startTime)) }