diff --git a/pkg/services/ngalert/api/api_prometheus.go b/pkg/services/ngalert/api/api_prometheus.go index 4a0d5f2955d..178ae3de788 100644 --- a/pkg/services/ngalert/api/api_prometheus.go +++ b/pkg/services/ngalert/api/api_prometheus.go @@ -34,7 +34,7 @@ func (srv PrometheusSrv) RouteGetAlertStatuses(c *models.ReqContext) response.Re Alerts: []*apimodels.Alert{}, }, } - for _, alertState := range srv.manager.GetAll() { + for _, alertState := range srv.manager.GetAll(c.OrgId) { startsAt := alertState.StartsAt alertResponse.Data.Alerts = append(alertResponse.Data.Alerts, &apimodels.Alert{ Labels: map[string]string(alertState.Labels), @@ -89,7 +89,6 @@ func (srv PrometheusSrv) RouteGetRuleStatuses(c *models.ReqContext) response.Res EvaluationTime: 0, // TODO: see if we are able to pass this along with evaluation results } - stateMap := srv.manager.GetStatesByRuleUID() for _, rule := range alertRuleQuery.Result { var queryStr string encodedQuery, err := json.Marshal(rule.Data) @@ -114,7 +113,7 @@ func (srv PrometheusSrv) RouteGetRuleStatuses(c *models.ReqContext) response.Res LastEvaluation: time.Time{}, } - for _, alertState := range stateMap[rule.UID] { + for _, alertState := range srv.manager.GetStatesForRuleUID(c.OrgId, rule.UID) { activeAt := alertState.StartsAt alert := &apimodels.Alert{ Labels: map[string]string(alertState.Labels), diff --git a/pkg/services/ngalert/schedule/schedule.go b/pkg/services/ngalert/schedule/schedule.go index fcb6541b133..bc87c48e7c4 100644 --- a/pkg/services/ngalert/schedule/schedule.go +++ b/pkg/services/ngalert/schedule/schedule.go @@ -306,7 +306,13 @@ func (sch *schedule) Ticker(grafanaCtx context.Context, stateManager *state.Mana } case <-grafanaCtx.Done(): err := dispatcherGroup.Wait() - sch.saveAlertStates(stateManager.GetAll()) + orgIdsCmd := models.FetchUniqueOrgIdsQuery{} + if err := sch.instanceStore.FetchOrgIds(&orgIdsCmd); err != nil { + sch.log.Error("unable to fetch orgIds", "msg", err.Error()) + } + for _, v := range orgIdsCmd.Result { + sch.saveAlertStates(stateManager.GetAll(v.DefinitionOrgID)) + } stateManager.Close() return err } diff --git a/pkg/services/ngalert/state/cache.go b/pkg/services/ngalert/state/cache.go index 038289810e2..25294dab7e8 100644 --- a/pkg/services/ngalert/state/cache.go +++ b/pkg/services/ngalert/state/cache.go @@ -15,7 +15,7 @@ import ( ) type cache struct { - states map[string]*State + states map[int64]map[string]map[string]*State // orgID > alertRuleUID > stateID > state mtxStates sync.RWMutex log log.Logger metrics *metrics.Metrics @@ -23,7 +23,7 @@ type cache struct { func newCache(logger log.Logger, metrics *metrics.Metrics) *cache { return &cache{ - states: make(map[string]*State), + states: make(map[int64]map[string]map[string]*State), log: logger, metrics: metrics, } @@ -45,7 +45,12 @@ func (c *cache) getOrCreate(alertRule *ngModels.AlertRule, result eval.Result) * c.log.Error("error getting cacheId for entry", "msg", err.Error()) } - if state, ok := c.states[id]; ok { + if _, ok := c.states[alertRule.OrgID]; !ok { + c.states[alertRule.OrgID] = make(map[string]map[string]*State) + c.states[alertRule.OrgID][alertRule.UID] = make(map[string]*State) + } + + if state, ok := c.states[alertRule.OrgID][alertRule.UID][id]; ok { return state } @@ -68,66 +73,64 @@ func (c *cache) getOrCreate(alertRule *ngModels.AlertRule, result eval.Result) * if result.State == eval.Alerting { newState.StartsAt = result.EvaluatedAt } - c.states[id] = newState + c.states[alertRule.OrgID][alertRule.UID][id] = newState return newState } func (c *cache) set(entry *State) { c.mtxStates.Lock() defer c.mtxStates.Unlock() - c.states[entry.CacheId] = entry + if _, ok := c.states[entry.OrgID]; !ok { + c.states[entry.OrgID] = make(map[string]map[string]*State) + } + if _, ok := c.states[entry.OrgID][entry.AlertRuleUID]; !ok { + c.states[entry.OrgID][entry.AlertRuleUID] = make(map[string]*State) + } + c.states[entry.OrgID][entry.AlertRuleUID][entry.CacheId] = entry } -func (c *cache) get(id string) (*State, error) { +func (c *cache) get(orgID int64, alertRuleUID, stateId string) (*State, error) { c.mtxStates.Lock() defer c.mtxStates.Unlock() - if state, ok := c.states[id]; ok { + if state, ok := c.states[orgID][alertRuleUID][stateId]; ok { return state, nil } - return nil, fmt.Errorf("no entry for id: %s", id) + return nil, fmt.Errorf("no entry for %s:%s was found", alertRuleUID, stateId) } -func (c *cache) getAll() []*State { +func (c *cache) getAll(orgID int64) []*State { var states []*State c.mtxStates.Lock() defer c.mtxStates.Unlock() - for _, v := range c.states { - states = append(states, v) + for _, v1 := range c.states[orgID] { + for _, v2 := range v1 { + states = append(states, v2) + } } return states } -func (c *cache) getStatesByRuleUID() map[string][]*State { - ruleMap := make(map[string][]*State) +func (c *cache) getStatesForRuleUID(orgID int64, alertRuleUID string) []*State { + var ruleStates []*State c.mtxStates.Lock() defer c.mtxStates.Unlock() - for _, state := range c.states { - if ruleStates, ok := ruleMap[state.AlertRuleUID]; ok { - ruleStates = append(ruleStates, state) - ruleMap[state.AlertRuleUID] = ruleStates - } else { - ruleStates := []*State{state} - ruleMap[state.AlertRuleUID] = ruleStates - } + for _, state := range c.states[orgID][alertRuleUID] { + ruleStates = append(ruleStates, state) } - return ruleMap + return ruleStates } // removeByRuleUID deletes all entries in the state cache that match the given UID. func (c *cache) removeByRuleUID(orgID int64, uid string) { c.mtxStates.Lock() defer c.mtxStates.Unlock() - for k, state := range c.states { - if state.AlertRuleUID == uid && state.OrgID == orgID { - delete(c.states, k) - } - } + delete(c.states[orgID], uid) } func (c *cache) reset() { c.mtxStates.Lock() defer c.mtxStates.Unlock() - c.states = make(map[string]*State) + c.states = make(map[int64]map[string]map[string]*State) } func (c *cache) trim() { @@ -144,16 +147,20 @@ func (c *cache) trim() { eval.Error: 0, } - for _, v := range c.states { - if len(v.Results) > 100 { - newResults := make([]Evaluation, 100) - // Keep last 100 results - copy(newResults, v.Results[len(v.Results)-100:]) - v.Results = newResults - } + for _, org := range c.states { + for _, rule := range org { + for _, state := range rule { + if len(state.Results) > 100 { + newResults := make([]Evaluation, 100) + // Keep last 100 results + copy(newResults, state.Results[len(state.Results)-100:]) + state.Results = newResults + } - n := ct[v.State] - ct[v.State] = n + 1 + n := ct[state.State] + ct[state.State] = n + 1 + } + } } for k, n := range ct { diff --git a/pkg/services/ngalert/state/manager.go b/pkg/services/ngalert/state/manager.go index a0ffb19e3a9..2be9017723a 100644 --- a/pkg/services/ngalert/state/manager.go +++ b/pkg/services/ngalert/state/manager.go @@ -41,8 +41,8 @@ func (st *Manager) set(entry *State) { st.cache.set(entry) } -func (st *Manager) Get(id string) (*State, error) { - return st.cache.get(id) +func (st *Manager) Get(orgID int64, alertRuleUID, stateId string) (*State, error) { + return st.cache.get(orgID, alertRuleUID, stateId) } // ResetCache is used to ensure a clean cache on startup. @@ -95,12 +95,12 @@ func (st *Manager) setNextState(alertRule *ngModels.AlertRule, result eval.Resul return currentState } -func (st *Manager) GetAll() []*State { - return st.cache.getAll() +func (st *Manager) GetAll(orgID int64) []*State { + return st.cache.getAll(orgID) } -func (st *Manager) GetStatesByRuleUID() map[string][]*State { - return st.cache.getStatesByRuleUID() +func (st *Manager) GetStatesForRuleUID(orgID int64, alertRuleUID string) []*State { + return st.cache.getStatesForRuleUID(orgID, alertRuleUID) } func (st *Manager) cleanUp() { diff --git a/pkg/services/ngalert/tests/manager_test.go b/pkg/services/ngalert/tests/manager_test.go index 393ba7fe7f5..5a7341790cf 100644 --- a/pkg/services/ngalert/tests/manager_test.go +++ b/pkg/services/ngalert/tests/manager_test.go @@ -783,8 +783,8 @@ func TestProcessEvalResults(t *testing.T) { for _, res := range tc.evalResults { _ = st.ProcessEvalResults(tc.alertRule, res) } - for id, s := range tc.expectedStates { - cachedState, err := st.Get(id) + for _, s := range tc.expectedStates { + cachedState, err := st.Get(s.OrgID, s.AlertRuleUID, s.CacheId) require.NoError(t, err) assert.Equal(t, s, cachedState) } diff --git a/pkg/services/ngalert/tests/schedule_test.go b/pkg/services/ngalert/tests/schedule_test.go index 16400aca870..963870a4e7c 100644 --- a/pkg/services/ngalert/tests/schedule_test.go +++ b/pkg/services/ngalert/tests/schedule_test.go @@ -107,7 +107,7 @@ func TestWarmStateCache(t *testing.T) { t.Run("instance cache has expected entries", func(t *testing.T) { for _, entry := range expectedEntries { - cacheEntry, err := st.Get(entry.CacheId) + cacheEntry, err := st.Get(entry.OrgID, entry.AlertRuleUID, entry.CacheId) require.NoError(t, err) if diff := cmp.Diff(entry, cacheEntry, cmpopts.IgnoreFields(state.State{}, "Results")); diff != "" {