mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Alerting: Keep state manager cache during cache warm-up (#95727)
* Alerting: Keep state manager cache during cache warm-up Instead of overwriting the state manager cache during warm-up, we update the data in the cache if it is not there yet. If the cache already contains a state entry with the same key, we do not overwrite it.
This commit is contained in:
parent
ede0b7e9ed
commit
305123df91
@ -76,22 +76,31 @@ func (c *cache) getOrCreate(ctx context.Context, log log.Logger, alertRule *ngMo
|
|||||||
// Instead of just calculating ID we create an entire state - a candidate. If rule states already hold a state with this ID, this candidate will be discarded and the existing one will be returned.
|
// Instead of just calculating ID we create an entire state - a candidate. If rule states already hold a state with this ID, this candidate will be discarded and the existing one will be returned.
|
||||||
// Otherwise, this candidate will be added to the rule states and returned.
|
// Otherwise, this candidate will be added to the rule states and returned.
|
||||||
stateCandidate := calculateState(ctx, log, alertRule, result, extraLabels, externalURL)
|
stateCandidate := calculateState(ctx, log, alertRule, result, extraLabels, externalURL)
|
||||||
|
return c.getOrAdd(stateCandidate, log)
|
||||||
|
}
|
||||||
|
|
||||||
|
// getOrAdd retrieves an existing State from the cache if it exists,
|
||||||
|
// or adds the provided State if it is not present.
|
||||||
|
func (c *cache) getOrAdd(state State, log log.Logger) *State {
|
||||||
c.mtxStates.Lock()
|
c.mtxStates.Lock()
|
||||||
defer c.mtxStates.Unlock()
|
defer c.mtxStates.Unlock()
|
||||||
|
|
||||||
|
// Retrieve or initialize the org-level map for storing rule states
|
||||||
var orgStates map[string]*ruleStates
|
var orgStates map[string]*ruleStates
|
||||||
var ok bool
|
var ok bool
|
||||||
if orgStates, ok = c.states[stateCandidate.OrgID]; !ok {
|
if orgStates, ok = c.states[state.OrgID]; !ok {
|
||||||
orgStates = make(map[string]*ruleStates)
|
orgStates = make(map[string]*ruleStates)
|
||||||
c.states[stateCandidate.OrgID] = orgStates
|
c.states[state.OrgID] = orgStates
|
||||||
}
|
}
|
||||||
var states *ruleStates
|
|
||||||
if states, ok = orgStates[stateCandidate.AlertRuleUID]; !ok {
|
// Retrieve or initialize the rule-level states map
|
||||||
states = &ruleStates{states: make(map[data.Fingerprint]*State)}
|
var rs *ruleStates
|
||||||
c.states[stateCandidate.OrgID][stateCandidate.AlertRuleUID] = states
|
if rs, ok = orgStates[state.AlertRuleUID]; !ok {
|
||||||
|
rs = &ruleStates{states: make(map[data.Fingerprint]*State)}
|
||||||
|
c.states[state.OrgID][state.AlertRuleUID] = rs
|
||||||
}
|
}
|
||||||
return states.getOrAdd(stateCandidate, log)
|
|
||||||
|
return rs.getOrAdd(state, log)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rs *ruleStates) getOrAdd(stateCandidate State, log log.Logger) *State {
|
func (rs *ruleStates) getOrAdd(stateCandidate State, log log.Logger) *State {
|
||||||
@ -255,12 +264,6 @@ func (c *cache) deleteRuleStates(ruleKey ngModels.AlertRuleKey, predicate func(s
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) setAllStates(newStates map[int64]map[string]*ruleStates) {
|
|
||||||
c.mtxStates.Lock()
|
|
||||||
defer c.mtxStates.Unlock()
|
|
||||||
c.states = newStates
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *cache) set(entry *State) {
|
func (c *cache) set(entry *State) {
|
||||||
c.mtxStates.Lock()
|
c.mtxStates.Lock()
|
||||||
defer c.mtxStates.Unlock()
|
defer c.mtxStates.Unlock()
|
||||||
|
@ -294,6 +294,65 @@ func Test_getOrCreate(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Test_getOrAdd(t *testing.T) {
|
||||||
|
logger := log.NewNopLogger()
|
||||||
|
|
||||||
|
type testCase struct {
|
||||||
|
name string
|
||||||
|
initialState *State
|
||||||
|
newState *State
|
||||||
|
}
|
||||||
|
|
||||||
|
orgID := int64(1)
|
||||||
|
alertRuleUID := "rule-uid"
|
||||||
|
cacheID := data.Fingerprint(12345)
|
||||||
|
|
||||||
|
cases := []testCase{
|
||||||
|
{
|
||||||
|
name: "add new state",
|
||||||
|
initialState: nil,
|
||||||
|
newState: &State{
|
||||||
|
OrgID: orgID,
|
||||||
|
AlertRuleUID: alertRuleUID,
|
||||||
|
CacheID: cacheID,
|
||||||
|
Labels: data.Labels{"label1": "value1"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "retrieve existing state",
|
||||||
|
initialState: &State{
|
||||||
|
OrgID: orgID,
|
||||||
|
AlertRuleUID: alertRuleUID,
|
||||||
|
CacheID: cacheID,
|
||||||
|
Labels: data.Labels{"label1": "value1"},
|
||||||
|
},
|
||||||
|
newState: &State{
|
||||||
|
OrgID: orgID,
|
||||||
|
AlertRuleUID: alertRuleUID,
|
||||||
|
CacheID: cacheID,
|
||||||
|
Labels: data.Labels{"label2": "value2"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range cases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
c := newCache()
|
||||||
|
if tc.initialState != nil {
|
||||||
|
c.getOrAdd(*tc.initialState, logger)
|
||||||
|
}
|
||||||
|
|
||||||
|
result := c.getOrAdd(*tc.newState, logger)
|
||||||
|
|
||||||
|
if tc.initialState == nil {
|
||||||
|
require.Equal(t, tc.newState, result, "expected newState to be added")
|
||||||
|
} else {
|
||||||
|
require.Equal(t, tc.initialState, result, "expected to retrieve existing state")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func Test_mergeLabels(t *testing.T) {
|
func Test_mergeLabels(t *testing.T) {
|
||||||
t.Run("merges two maps", func(t *testing.T) {
|
t.Run("merges two maps", func(t *testing.T) {
|
||||||
a := models.GenerateAlertLabels(5, "set1-")
|
a := models.GenerateAlertLabels(5, "set1-")
|
||||||
|
@ -137,7 +137,6 @@ func (st *Manager) Warm(ctx context.Context, rulesReader RuleReader, instanceRea
|
|||||||
}
|
}
|
||||||
|
|
||||||
statesCount := 0
|
statesCount := 0
|
||||||
states := make(map[int64]map[string]*ruleStates, len(orgIds))
|
|
||||||
for _, orgId := range orgIds {
|
for _, orgId := range orgIds {
|
||||||
// Get Rules
|
// Get Rules
|
||||||
ruleCmd := ngModels.ListAlertRulesQuery{
|
ruleCmd := ngModels.ListAlertRulesQuery{
|
||||||
@ -168,9 +167,6 @@ func (st *Manager) Warm(ctx context.Context, rulesReader RuleReader, instanceRea
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
orgStates := make(map[string]*ruleStates, len(ruleByUID))
|
|
||||||
states[orgId] = orgStates
|
|
||||||
|
|
||||||
// Get Instances
|
// Get Instances
|
||||||
cmd := ngModels.ListAlertInstancesQuery{
|
cmd := ngModels.ListAlertInstancesQuery{
|
||||||
RuleOrgID: orgId,
|
RuleOrgID: orgId,
|
||||||
@ -193,12 +189,6 @@ func (st *Manager) Warm(ctx context.Context, rulesReader RuleReader, instanceRea
|
|||||||
annotations = make(map[string]string)
|
annotations = make(map[string]string)
|
||||||
}
|
}
|
||||||
|
|
||||||
rulesStates, ok := orgStates[entry.RuleUID]
|
|
||||||
if !ok {
|
|
||||||
rulesStates = &ruleStates{states: make(map[data.Fingerprint]*State)}
|
|
||||||
orgStates[entry.RuleUID] = rulesStates
|
|
||||||
}
|
|
||||||
|
|
||||||
lbs := map[string]string(entry.Labels)
|
lbs := map[string]string(entry.Labels)
|
||||||
cacheID := entry.Labels.Fingerprint()
|
cacheID := entry.Labels.Fingerprint()
|
||||||
var resultFp data.Fingerprint
|
var resultFp data.Fingerprint
|
||||||
@ -209,7 +199,7 @@ func (st *Manager) Warm(ctx context.Context, rulesReader RuleReader, instanceRea
|
|||||||
}
|
}
|
||||||
resultFp = data.Fingerprint(fp)
|
resultFp = data.Fingerprint(fp)
|
||||||
}
|
}
|
||||||
rulesStates.states[cacheID] = &State{
|
state := State{
|
||||||
AlertRuleUID: entry.RuleUID,
|
AlertRuleUID: entry.RuleUID,
|
||||||
OrgID: entry.RuleOrgID,
|
OrgID: entry.RuleOrgID,
|
||||||
CacheID: cacheID,
|
CacheID: cacheID,
|
||||||
@ -225,11 +215,11 @@ func (st *Manager) Warm(ctx context.Context, rulesReader RuleReader, instanceRea
|
|||||||
ResolvedAt: entry.ResolvedAt,
|
ResolvedAt: entry.ResolvedAt,
|
||||||
LastSentAt: entry.LastSentAt,
|
LastSentAt: entry.LastSentAt,
|
||||||
}
|
}
|
||||||
|
st.cache.getOrAdd(state, st.log)
|
||||||
statesCount++
|
statesCount++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
st.cache.setAllStates(states)
|
|
||||||
st.log.Info("State cache has been initialized", "states", statesCount, "duration", time.Since(startTime))
|
st.log.Info("State cache has been initialized", "states", statesCount, "duration", time.Since(startTime))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user