Alerting nested state cache (#33666)

* nest cache by orgID, ruleUID, stateID

* update accessors to use new cache structure

* test and linter fixup

* fix panic

Co-authored-by: Kyle Brandt <kyle@grafana.com>

* add comment to identify what's going on with nested maps in cache

Co-authored-by: Kyle Brandt <kyle@grafana.com>
This commit is contained in:
David Parrott 2021-05-04 09:57:50 -07:00 committed by GitHub
parent 5072fefc22
commit 39099bf3c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 62 additions and 50 deletions

View File

@ -34,7 +34,7 @@ func (srv PrometheusSrv) RouteGetAlertStatuses(c *models.ReqContext) response.Re
Alerts: []*apimodels.Alert{}, Alerts: []*apimodels.Alert{},
}, },
} }
for _, alertState := range srv.manager.GetAll() { for _, alertState := range srv.manager.GetAll(c.OrgId) {
startsAt := alertState.StartsAt startsAt := alertState.StartsAt
alertResponse.Data.Alerts = append(alertResponse.Data.Alerts, &apimodels.Alert{ alertResponse.Data.Alerts = append(alertResponse.Data.Alerts, &apimodels.Alert{
Labels: map[string]string(alertState.Labels), Labels: map[string]string(alertState.Labels),
@ -89,7 +89,6 @@ func (srv PrometheusSrv) RouteGetRuleStatuses(c *models.ReqContext) response.Res
EvaluationTime: 0, // TODO: see if we are able to pass this along with evaluation results EvaluationTime: 0, // TODO: see if we are able to pass this along with evaluation results
} }
stateMap := srv.manager.GetStatesByRuleUID()
for _, rule := range alertRuleQuery.Result { for _, rule := range alertRuleQuery.Result {
var queryStr string var queryStr string
encodedQuery, err := json.Marshal(rule.Data) encodedQuery, err := json.Marshal(rule.Data)
@ -114,7 +113,7 @@ func (srv PrometheusSrv) RouteGetRuleStatuses(c *models.ReqContext) response.Res
LastEvaluation: time.Time{}, LastEvaluation: time.Time{},
} }
for _, alertState := range stateMap[rule.UID] { for _, alertState := range srv.manager.GetStatesForRuleUID(c.OrgId, rule.UID) {
activeAt := alertState.StartsAt activeAt := alertState.StartsAt
alert := &apimodels.Alert{ alert := &apimodels.Alert{
Labels: map[string]string(alertState.Labels), Labels: map[string]string(alertState.Labels),

View File

@ -306,7 +306,13 @@ func (sch *schedule) Ticker(grafanaCtx context.Context, stateManager *state.Mana
} }
case <-grafanaCtx.Done(): case <-grafanaCtx.Done():
err := dispatcherGroup.Wait() err := dispatcherGroup.Wait()
sch.saveAlertStates(stateManager.GetAll()) orgIdsCmd := models.FetchUniqueOrgIdsQuery{}
if err := sch.instanceStore.FetchOrgIds(&orgIdsCmd); err != nil {
sch.log.Error("unable to fetch orgIds", "msg", err.Error())
}
for _, v := range orgIdsCmd.Result {
sch.saveAlertStates(stateManager.GetAll(v.DefinitionOrgID))
}
stateManager.Close() stateManager.Close()
return err return err
} }

View File

@ -15,7 +15,7 @@ import (
) )
type cache struct { type cache struct {
states map[string]*State states map[int64]map[string]map[string]*State // orgID > alertRuleUID > stateID > state
mtxStates sync.RWMutex mtxStates sync.RWMutex
log log.Logger log log.Logger
metrics *metrics.Metrics metrics *metrics.Metrics
@ -23,7 +23,7 @@ type cache struct {
func newCache(logger log.Logger, metrics *metrics.Metrics) *cache { func newCache(logger log.Logger, metrics *metrics.Metrics) *cache {
return &cache{ return &cache{
states: make(map[string]*State), states: make(map[int64]map[string]map[string]*State),
log: logger, log: logger,
metrics: metrics, metrics: metrics,
} }
@ -45,7 +45,12 @@ func (c *cache) getOrCreate(alertRule *ngModels.AlertRule, result eval.Result) *
c.log.Error("error getting cacheId for entry", "msg", err.Error()) c.log.Error("error getting cacheId for entry", "msg", err.Error())
} }
if state, ok := c.states[id]; ok { if _, ok := c.states[alertRule.OrgID]; !ok {
c.states[alertRule.OrgID] = make(map[string]map[string]*State)
c.states[alertRule.OrgID][alertRule.UID] = make(map[string]*State)
}
if state, ok := c.states[alertRule.OrgID][alertRule.UID][id]; ok {
return state return state
} }
@ -68,66 +73,64 @@ func (c *cache) getOrCreate(alertRule *ngModels.AlertRule, result eval.Result) *
if result.State == eval.Alerting { if result.State == eval.Alerting {
newState.StartsAt = result.EvaluatedAt newState.StartsAt = result.EvaluatedAt
} }
c.states[id] = newState c.states[alertRule.OrgID][alertRule.UID][id] = newState
return newState return newState
} }
func (c *cache) set(entry *State) { func (c *cache) set(entry *State) {
c.mtxStates.Lock() c.mtxStates.Lock()
defer c.mtxStates.Unlock() defer c.mtxStates.Unlock()
c.states[entry.CacheId] = entry if _, ok := c.states[entry.OrgID]; !ok {
c.states[entry.OrgID] = make(map[string]map[string]*State)
}
if _, ok := c.states[entry.OrgID][entry.AlertRuleUID]; !ok {
c.states[entry.OrgID][entry.AlertRuleUID] = make(map[string]*State)
}
c.states[entry.OrgID][entry.AlertRuleUID][entry.CacheId] = entry
} }
func (c *cache) get(id string) (*State, error) { func (c *cache) get(orgID int64, alertRuleUID, stateId string) (*State, error) {
c.mtxStates.Lock() c.mtxStates.Lock()
defer c.mtxStates.Unlock() defer c.mtxStates.Unlock()
if state, ok := c.states[id]; ok { if state, ok := c.states[orgID][alertRuleUID][stateId]; ok {
return state, nil return state, nil
} }
return nil, fmt.Errorf("no entry for id: %s", id) return nil, fmt.Errorf("no entry for %s:%s was found", alertRuleUID, stateId)
} }
func (c *cache) getAll() []*State { func (c *cache) getAll(orgID int64) []*State {
var states []*State var states []*State
c.mtxStates.Lock() c.mtxStates.Lock()
defer c.mtxStates.Unlock() defer c.mtxStates.Unlock()
for _, v := range c.states { for _, v1 := range c.states[orgID] {
states = append(states, v) for _, v2 := range v1 {
states = append(states, v2)
}
} }
return states return states
} }
func (c *cache) getStatesByRuleUID() map[string][]*State { func (c *cache) getStatesForRuleUID(orgID int64, alertRuleUID string) []*State {
ruleMap := make(map[string][]*State) var ruleStates []*State
c.mtxStates.Lock() c.mtxStates.Lock()
defer c.mtxStates.Unlock() defer c.mtxStates.Unlock()
for _, state := range c.states { for _, state := range c.states[orgID][alertRuleUID] {
if ruleStates, ok := ruleMap[state.AlertRuleUID]; ok { ruleStates = append(ruleStates, state)
ruleStates = append(ruleStates, state)
ruleMap[state.AlertRuleUID] = ruleStates
} else {
ruleStates := []*State{state}
ruleMap[state.AlertRuleUID] = ruleStates
}
} }
return ruleMap return ruleStates
} }
// removeByRuleUID deletes all entries in the state cache that match the given UID. // removeByRuleUID deletes all entries in the state cache that match the given UID.
func (c *cache) removeByRuleUID(orgID int64, uid string) { func (c *cache) removeByRuleUID(orgID int64, uid string) {
c.mtxStates.Lock() c.mtxStates.Lock()
defer c.mtxStates.Unlock() defer c.mtxStates.Unlock()
for k, state := range c.states { delete(c.states[orgID], uid)
if state.AlertRuleUID == uid && state.OrgID == orgID {
delete(c.states, k)
}
}
} }
func (c *cache) reset() { func (c *cache) reset() {
c.mtxStates.Lock() c.mtxStates.Lock()
defer c.mtxStates.Unlock() defer c.mtxStates.Unlock()
c.states = make(map[string]*State) c.states = make(map[int64]map[string]map[string]*State)
} }
func (c *cache) trim() { func (c *cache) trim() {
@ -144,16 +147,20 @@ func (c *cache) trim() {
eval.Error: 0, eval.Error: 0,
} }
for _, v := range c.states { for _, org := range c.states {
if len(v.Results) > 100 { for _, rule := range org {
newResults := make([]Evaluation, 100) for _, state := range rule {
// Keep last 100 results if len(state.Results) > 100 {
copy(newResults, v.Results[len(v.Results)-100:]) newResults := make([]Evaluation, 100)
v.Results = newResults // Keep last 100 results
} copy(newResults, state.Results[len(state.Results)-100:])
state.Results = newResults
}
n := ct[v.State] n := ct[state.State]
ct[v.State] = n + 1 ct[state.State] = n + 1
}
}
} }
for k, n := range ct { for k, n := range ct {

View File

@ -41,8 +41,8 @@ func (st *Manager) set(entry *State) {
st.cache.set(entry) st.cache.set(entry)
} }
func (st *Manager) Get(id string) (*State, error) { func (st *Manager) Get(orgID int64, alertRuleUID, stateId string) (*State, error) {
return st.cache.get(id) return st.cache.get(orgID, alertRuleUID, stateId)
} }
// ResetCache is used to ensure a clean cache on startup. // ResetCache is used to ensure a clean cache on startup.
@ -95,12 +95,12 @@ func (st *Manager) setNextState(alertRule *ngModels.AlertRule, result eval.Resul
return currentState return currentState
} }
func (st *Manager) GetAll() []*State { func (st *Manager) GetAll(orgID int64) []*State {
return st.cache.getAll() return st.cache.getAll(orgID)
} }
func (st *Manager) GetStatesByRuleUID() map[string][]*State { func (st *Manager) GetStatesForRuleUID(orgID int64, alertRuleUID string) []*State {
return st.cache.getStatesByRuleUID() return st.cache.getStatesForRuleUID(orgID, alertRuleUID)
} }
func (st *Manager) cleanUp() { func (st *Manager) cleanUp() {

View File

@ -783,8 +783,8 @@ func TestProcessEvalResults(t *testing.T) {
for _, res := range tc.evalResults { for _, res := range tc.evalResults {
_ = st.ProcessEvalResults(tc.alertRule, res) _ = st.ProcessEvalResults(tc.alertRule, res)
} }
for id, s := range tc.expectedStates { for _, s := range tc.expectedStates {
cachedState, err := st.Get(id) cachedState, err := st.Get(s.OrgID, s.AlertRuleUID, s.CacheId)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, s, cachedState) assert.Equal(t, s, cachedState)
} }

View File

@ -107,7 +107,7 @@ func TestWarmStateCache(t *testing.T) {
t.Run("instance cache has expected entries", func(t *testing.T) { t.Run("instance cache has expected entries", func(t *testing.T) {
for _, entry := range expectedEntries { for _, entry := range expectedEntries {
cacheEntry, err := st.Get(entry.CacheId) cacheEntry, err := st.Get(entry.OrgID, entry.AlertRuleUID, entry.CacheId)
require.NoError(t, err) require.NoError(t, err)
if diff := cmp.Diff(entry, cacheEntry, cmpopts.IgnoreFields(state.State{}, "Results")); diff != "" { if diff := cmp.Diff(entry, cacheEntry, cmpopts.IgnoreFields(state.State{}, "Results")); diff != "" {