Alerting: Refactor state manager's cache (#56197)

* remove ResetAllStates because it's not used
* refactor cache to accept logs, metrics and url as method args
* update manager Warm method to set the entire state at once
* remove unused reset method
* introduce ruleStates
* change getOrCreate to belong to ruleStates
* update Get to not return error
This commit is contained in:
Yuriy Tseretyan 2022-10-06 15:30:12 -04:00 committed by GitHub
parent 15d2653b89
commit 7b6437402a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 125 additions and 101 deletions

View File

@ -125,8 +125,7 @@ func TestWarmStateCache(t *testing.T) {
t.Run("instance cache has expected entries", func(t *testing.T) {
for _, entry := range expectedEntries {
cacheEntry, err := st.Get(entry.OrgID, entry.AlertRuleUID, entry.CacheId)
require.NoError(t, err)
cacheEntry := st.Get(entry.OrgID, entry.AlertRuleUID, entry.CacheId)
if diff := cmp.Diff(entry, cacheEntry, cmpopts.IgnoreFields(state.State{}, "Results")); diff != "" {
t.Errorf("Result mismatch (-want +got):\n%s", diff)

View File

@ -15,28 +15,40 @@ import (
ngModels "github.com/grafana/grafana/pkg/services/ngalert/models"
)
type cache struct {
states map[int64]map[string]map[string]*State // orgID > alertRuleUID > stateID > state
mtxStates sync.RWMutex
log log.Logger
metrics *metrics.State
externalURL *url.URL
type ruleStates struct {
states map[string]*State
}
func newCache(logger log.Logger, metrics *metrics.State, externalURL *url.URL) *cache {
type cache struct {
states map[int64]map[string]*ruleStates // orgID > alertRuleUID > stateID > state
mtxStates sync.RWMutex
}
func newCache() *cache {
return &cache{
states: make(map[int64]map[string]map[string]*State),
log: logger,
metrics: metrics,
externalURL: externalURL,
states: make(map[int64]map[string]*ruleStates),
}
}
func (c *cache) getOrCreate(ctx context.Context, alertRule *ngModels.AlertRule, result eval.Result, extraLabels data.Labels) *State {
func (c *cache) getOrCreate(ctx context.Context, log log.Logger, alertRule *ngModels.AlertRule, result eval.Result, extraLabels data.Labels, externalURL *url.URL) *State {
c.mtxStates.Lock()
defer c.mtxStates.Unlock()
var orgStates map[string]*ruleStates
var ok bool
if orgStates, ok = c.states[alertRule.OrgID]; !ok {
orgStates = make(map[string]*ruleStates)
c.states[alertRule.OrgID] = orgStates
}
var states *ruleStates
if states, ok = orgStates[alertRule.UID]; !ok {
states = &ruleStates{states: make(map[string]*State)}
c.states[alertRule.OrgID][alertRule.UID] = states
}
return states.getOrCreate(ctx, log, alertRule, result, extraLabels, externalURL)
}
ruleLabels, annotations := c.expandRuleLabelsAndAnnotations(ctx, alertRule, result, extraLabels)
func (rs *ruleStates) getOrCreate(ctx context.Context, log log.Logger, alertRule *ngModels.AlertRule, result eval.Result, extraLabels data.Labels, externalURL *url.URL) *State {
ruleLabels, annotations := rs.expandRuleLabelsAndAnnotations(ctx, log, alertRule, result, extraLabels, externalURL)
lbs := make(data.Labels, len(extraLabels)+len(ruleLabels)+len(result.Instance))
dupes := make(data.Labels)
@ -53,7 +65,7 @@ func (c *cache) getOrCreate(ctx context.Context, alertRule *ngModels.AlertRule,
}
}
if len(dupes) > 0 {
c.log.Warn("rule declares one or many reserved labels. Those rules labels will be ignored", "labels", dupes)
log.Warn("rule declares one or many reserved labels. Those rules labels will be ignored", "labels", dupes)
}
dupes = make(data.Labels)
for key, val := range result.Instance {
@ -66,23 +78,16 @@ func (c *cache) getOrCreate(ctx context.Context, alertRule *ngModels.AlertRule,
}
}
if len(dupes) > 0 {
c.log.Warn("evaluation result contains either reserved labels or labels declared in the rules. Those labels from the result will be ignored", "labels", dupes)
log.Warn("evaluation result contains either reserved labels or labels declared in the rules. Those labels from the result will be ignored", "labels", dupes)
}
il := ngModels.InstanceLabels(lbs)
id, err := il.StringKey()
if err != nil {
c.log.Error("error getting cacheId for entry", "err", err.Error())
log.Error("error getting cacheId for entry", "err", err.Error())
}
if _, ok := c.states[alertRule.OrgID]; !ok {
c.states[alertRule.OrgID] = make(map[string]map[string]*State)
}
if _, ok := c.states[alertRule.OrgID][alertRule.UID]; !ok {
c.states[alertRule.OrgID][alertRule.UID] = make(map[string]*State)
}
if state, ok := c.states[alertRule.OrgID][alertRule.UID][id]; ok {
if state, ok := rs.states[id]; ok {
// Annotations can change over time, however we also want to maintain
// certain annotations across evaluations
for k, v := range state.Annotations {
@ -95,7 +100,7 @@ func (c *cache) getOrCreate(ctx context.Context, alertRule *ngModels.AlertRule,
}
}
state.Annotations = annotations
c.states[alertRule.OrgID][alertRule.UID][id] = state
rs.states[id] = state
return state
}
@ -112,21 +117,21 @@ func (c *cache) getOrCreate(ctx context.Context, alertRule *ngModels.AlertRule,
if result.State == eval.Alerting {
newState.StartsAt = result.EvaluatedAt
}
c.states[alertRule.OrgID][alertRule.UID][id] = newState
rs.states[id] = newState
return newState
}
func (c *cache) expandRuleLabelsAndAnnotations(ctx context.Context, alertRule *ngModels.AlertRule, alertInstance eval.Result, extraLabels data.Labels) (data.Labels, data.Labels) {
func (rs *ruleStates) expandRuleLabelsAndAnnotations(ctx context.Context, log log.Logger, alertRule *ngModels.AlertRule, alertInstance eval.Result, extraLabels data.Labels, externalURL *url.URL) (data.Labels, data.Labels) {
// use labels from the result and extra labels to expand the labels and annotations declared by the rule
templateLabels := mergeLabels(extraLabels, alertInstance.Instance)
expand := func(original map[string]string) map[string]string {
expanded := make(map[string]string, len(original))
for k, v := range original {
ev, err := expandTemplate(ctx, alertRule.Title, v, templateLabels, alertInstance, c.externalURL)
ev, err := expandTemplate(ctx, alertRule.Title, v, templateLabels, alertInstance, externalURL)
expanded[k] = ev
if err != nil {
c.log.Error("error in expanding template", "name", k, "value", v, "err", err.Error())
log.Error("error in expanding template", "name", k, "value", v, "err", err.Error())
// Store the original template on error.
expanded[k] = v
}
@ -137,25 +142,36 @@ func (c *cache) expandRuleLabelsAndAnnotations(ctx context.Context, alertRule *n
return expand(alertRule.Labels), expand(alertRule.Annotations)
}
func (c *cache) setAllStates(newStates map[int64]map[string]*ruleStates) {
c.mtxStates.Lock()
defer c.mtxStates.Unlock()
c.states = newStates
}
func (c *cache) set(entry *State) {
c.mtxStates.Lock()
defer c.mtxStates.Unlock()
if _, ok := c.states[entry.OrgID]; !ok {
c.states[entry.OrgID] = make(map[string]map[string]*State)
c.states[entry.OrgID] = make(map[string]*ruleStates)
}
if _, ok := c.states[entry.OrgID][entry.AlertRuleUID]; !ok {
c.states[entry.OrgID][entry.AlertRuleUID] = make(map[string]*State)
c.states[entry.OrgID][entry.AlertRuleUID] = &ruleStates{states: make(map[string]*State)}
}
c.states[entry.OrgID][entry.AlertRuleUID][entry.CacheId] = entry
c.states[entry.OrgID][entry.AlertRuleUID].states[entry.CacheId] = entry
}
func (c *cache) get(orgID int64, alertRuleUID, stateId string) (*State, error) {
func (c *cache) get(orgID int64, alertRuleUID, stateId string) *State {
c.mtxStates.RLock()
defer c.mtxStates.RUnlock()
if state, ok := c.states[orgID][alertRuleUID][stateId]; ok {
return state, nil
ruleStates, ok := c.states[orgID][alertRuleUID]
if ok {
var state *State
state, ok = ruleStates.states[stateId]
if ok {
return state
}
}
return nil, fmt.Errorf("no entry for %s:%s was found", alertRuleUID, stateId)
return nil
}
func (c *cache) getAll(orgID int64) []*State {
@ -163,7 +179,7 @@ func (c *cache) getAll(orgID int64) []*State {
c.mtxStates.RLock()
defer c.mtxStates.RUnlock()
for _, v1 := range c.states[orgID] {
for _, v2 := range v1 {
for _, v2 := range v1.states {
states = append(states, v2)
}
}
@ -171,38 +187,47 @@ func (c *cache) getAll(orgID int64) []*State {
}
func (c *cache) getStatesForRuleUID(orgID int64, alertRuleUID string) []*State {
var ruleStates []*State
var result []*State
c.mtxStates.RLock()
defer c.mtxStates.RUnlock()
for _, state := range c.states[orgID][alertRuleUID] {
ruleStates = append(ruleStates, state)
orgRules, ok := c.states[orgID]
if !ok {
return nil
}
return ruleStates
rs, ok := orgRules[alertRuleUID]
if !ok {
return nil
}
for _, state := range rs.states {
result = append(result, state)
}
return result
}
// removeByRuleUID deletes all entries in the state cache that match the given UID. Returns removed states
func (c *cache) removeByRuleUID(orgID int64, uid string) []*State {
c.mtxStates.Lock()
defer c.mtxStates.Unlock()
statesMap := c.states[orgID][uid]
delete(c.states[orgID], uid)
if statesMap == nil {
orgStates, ok := c.states[orgID]
if !ok {
return nil
}
states := make([]*State, 0, len(statesMap))
for _, state := range statesMap {
rs, ok := orgStates[uid]
if !ok {
return nil
}
delete(c.states[orgID], uid)
if len(rs.states) == 0 {
return nil
}
states := make([]*State, 0, len(rs.states))
for _, state := range rs.states {
states = append(states, state)
}
return states
}
func (c *cache) reset() {
c.mtxStates.Lock()
defer c.mtxStates.Unlock()
c.states = make(map[int64]map[string]map[string]*State)
}
func (c *cache) recordMetrics() {
func (c *cache) recordMetrics(metrics *metrics.State) {
c.mtxStates.RLock()
defer c.mtxStates.RUnlock()
@ -217,9 +242,9 @@ func (c *cache) recordMetrics() {
}
for org, orgMap := range c.states {
c.metrics.GroupRules.WithLabelValues(fmt.Sprint(org)).Set(float64(len(orgMap)))
metrics.GroupRules.WithLabelValues(fmt.Sprint(org)).Set(float64(len(orgMap)))
for _, rule := range orgMap {
for _, state := range rule {
for _, state := range rule.states {
n := ct[state.State]
ct[state.State] = n + 1
}
@ -227,7 +252,7 @@ func (c *cache) recordMetrics() {
}
for k, n := range ct {
c.metrics.AlertState.WithLabelValues(strings.ToLower(k.String())).Set(float64(n))
metrics.AlertState.WithLabelValues(strings.ToLower(k.String())).Set(float64(n))
}
}
@ -248,5 +273,9 @@ func mergeLabels(a, b data.Labels) data.Labels {
func (c *cache) deleteEntry(orgID int64, alertRuleUID, cacheID string) {
c.mtxStates.Lock()
defer c.mtxStates.Unlock()
delete(c.states[orgID][alertRuleUID], cacheID)
ruleStates, ok := c.states[orgID][alertRuleUID]
if !ok {
return
}
delete(ruleStates.states, cacheID)
}

View File

@ -12,17 +12,18 @@ import (
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/util"
)
func Test_getOrCreate(t *testing.T) {
c := newCache(log.New("test"), &metrics.State{}, &url.URL{
url := &url.URL{
Scheme: "http",
Host: "localhost:3000",
Path: "/test",
})
}
l := log.New("test")
c := newCache()
generateRule := models.AlertRuleGen(models.WithNotEmptyLabels(5, "rule-"))
@ -33,7 +34,7 @@ func Test_getOrCreate(t *testing.T) {
result := eval.Result{
Instance: models.GenerateAlertLabels(5, "result-"),
}
state := c.getOrCreate(context.Background(), rule, result, extraLabels)
state := c.getOrCreate(context.Background(), l, rule, result, extraLabels, url)
for key, expected := range extraLabels {
require.Equal(t, expected, state.Labels[key])
}
@ -61,7 +62,7 @@ func Test_getOrCreate(t *testing.T) {
result.Instance[key] = "result-" + util.GenerateShortUID()
}
state := c.getOrCreate(context.Background(), rule, result, extraLabels)
state := c.getOrCreate(context.Background(), l, rule, result, extraLabels, url)
for key, expected := range extraLabels {
require.Equal(t, expected, state.Labels[key])
}
@ -77,7 +78,7 @@ func Test_getOrCreate(t *testing.T) {
for key := range rule.Labels {
result.Instance[key] = "result-" + util.GenerateShortUID()
}
state := c.getOrCreate(context.Background(), rule, result, extraLabels)
state := c.getOrCreate(context.Background(), l, rule, result, extraLabels, url)
for key, expected := range rule.Labels {
require.Equal(t, expected, state.Labels[key])
}
@ -99,7 +100,7 @@ func Test_getOrCreate(t *testing.T) {
}
rule.Labels = labelTemplates
state := c.getOrCreate(context.Background(), rule, result, extraLabels)
state := c.getOrCreate(context.Background(), l, rule, result, extraLabels, url)
for key, expected := range extraLabels {
assert.Equal(t, expected, state.Labels["rule-"+key])
}
@ -125,7 +126,7 @@ func Test_getOrCreate(t *testing.T) {
}
rule.Annotations = annotationTemplates
state := c.getOrCreate(context.Background(), rule, result, extraLabels)
state := c.getOrCreate(context.Background(), l, rule, result, extraLabels, url)
for key, expected := range extraLabels {
assert.Equal(t, expected, state.Annotations["rule-"+key])
}

View File

@ -40,12 +40,13 @@ type Manager struct {
instanceStore InstanceStore
imageService image.ImageService
historian Historian
externalURL *url.URL
}
func NewManager(logger log.Logger, metrics *metrics.State, externalURL *url.URL,
ruleStore RuleReader, instanceStore InstanceStore, imageService image.ImageService, clock clock.Clock, historian Historian) *Manager {
manager := &Manager{
cache: newCache(logger, metrics, externalURL),
cache: newCache(),
quit: make(chan struct{}),
ResendDelay: ResendDelay, // TODO: make this configurable
log: logger,
@ -55,6 +56,7 @@ func NewManager(logger log.Logger, metrics *metrics.State, externalURL *url.URL,
imageService: imageService,
historian: historian,
clock: clock,
externalURL: externalURL,
}
go manager.recordMetrics()
return manager
@ -65,15 +67,16 @@ func (st *Manager) Close() {
}
func (st *Manager) Warm(ctx context.Context) {
st.log.Info("warming cache for startup")
st.ResetAllStates()
startTime := time.Now()
st.log.Info("Warming state cache for startup")
orgIds, err := st.instanceStore.FetchOrgIds(ctx)
if err != nil {
st.log.Error("unable to fetch orgIds", "msg", err.Error())
st.log.Error("unable to fetch orgIds", "err", err.Error())
}
var states []*State
statesCount := 0
states := make(map[int64]map[string]*ruleStates, len(orgIds))
for _, orgId := range orgIds {
// Get Rules
ruleCmd := ngModels.ListAlertRulesQuery{
@ -88,6 +91,9 @@ func (st *Manager) Warm(ctx context.Context) {
ruleByUID[rule.UID] = rule
}
orgStates := make(map[string]*ruleStates, len(ruleByUID))
states[orgId] = orgStates
// Get Instances
cmd := ngModels.ListAlertInstancesQuery{
RuleOrgID: orgId,
@ -99,16 +105,22 @@ func (st *Manager) Warm(ctx context.Context) {
for _, entry := range cmd.Result {
ruleForEntry, ok := ruleByUID[entry.RuleUID]
if !ok {
st.log.Error("rule not found for instance, ignoring", "rule", entry.RuleUID)
// TODO Should we delete the orphaned state from the db?
continue
}
rulesStates, ok := orgStates[entry.RuleUID]
if !ok {
rulesStates = &ruleStates{states: make(map[string]*State)}
orgStates[entry.RuleUID] = rulesStates
}
lbs := map[string]string(entry.Labels)
cacheId, err := entry.Labels.StringKey()
if err != nil {
st.log.Error("error getting cacheId for entry", "msg", err.Error())
}
stateForEntry := &State{
rulesStates.states[cacheId] = &State{
AlertRuleUID: entry.RuleUID,
OrgID: entry.RuleOrgID,
CacheId: cacheId,
@ -121,32 +133,17 @@ func (st *Manager) Warm(ctx context.Context) {
LastEvaluationTime: entry.LastEvalTime,
Annotations: ruleForEntry.Annotations,
}
states = append(states, stateForEntry)
statesCount++
}
}
for _, s := range states {
st.set(s)
}
st.cache.setAllStates(states)
st.log.Info("State cache has been initialized", "loaded_states", statesCount, "duration", time.Since(startTime))
}
func (st *Manager) getOrCreate(ctx context.Context, alertRule *ngModels.AlertRule, result eval.Result, extraLabels data.Labels) *State {
return st.cache.getOrCreate(ctx, alertRule, result, extraLabels)
}
func (st *Manager) set(entry *State) {
st.cache.set(entry)
}
func (st *Manager) Get(orgID int64, alertRuleUID, stateId string) (*State, error) {
func (st *Manager) Get(orgID int64, alertRuleUID, stateId string) *State {
return st.cache.get(orgID, alertRuleUID, stateId)
}
// ResetAllStates is used to ensure a clean cache on startup.
func (st *Manager) ResetAllStates() {
st.cache.reset()
}
// ResetStateByRuleUID deletes all entries in the state manager that match the given rule UID.
func (st *Manager) ResetStateByRuleUID(ctx context.Context, ruleKey ngModels.AlertRuleKey) []*State {
logger := st.log.New(ruleKey.LogContext()...)
@ -215,7 +212,7 @@ func (st *Manager) maybeTakeScreenshot(
// Set the current state based on evaluation results
func (st *Manager) setNextState(ctx context.Context, alertRule *ngModels.AlertRule, result eval.Result, extraLabels data.Labels) *State {
currentState := st.getOrCreate(ctx, alertRule, result, extraLabels)
currentState := st.cache.getOrCreate(ctx, st.log, alertRule, result, extraLabels, st.externalURL)
currentState.LastEvaluationTime = result.EvaluatedAt
currentState.EvaluationDuration = result.EvaluationDuration
@ -265,7 +262,7 @@ func (st *Manager) setNextState(ctx context.Context, alertRule *ngModels.AlertRu
"err", err)
}
st.set(currentState)
st.cache.set(currentState)
shouldUpdateAnnotation := oldState != currentState.State || oldReason != currentState.StateReason
if shouldUpdateAnnotation {
@ -291,7 +288,7 @@ func (st *Manager) recordMetrics() {
select {
case <-ticker.C:
st.log.Debug("recording state cache metrics", "now", st.clock.Now())
st.cache.recordMetrics()
st.cache.recordMetrics(st.metrics)
case <-st.quit:
st.log.Debug("stopping state cache metrics recording", "now", st.clock.Now())
ticker.Stop()
@ -302,7 +299,7 @@ func (st *Manager) recordMetrics() {
func (st *Manager) Put(states []*State) {
for _, s := range states {
st.set(s)
st.cache.set(s)
}
}

View File

@ -1997,8 +1997,7 @@ func TestProcessEvalResults(t *testing.T) {
assert.Len(t, states, len(tc.expectedStates))
for _, s := range tc.expectedStates {
cachedState, err := st.Get(s.OrgID, s.AlertRuleUID, s.CacheId)
require.NoError(t, err)
cachedState := st.Get(s.OrgID, s.AlertRuleUID, s.CacheId)
assert.Equal(t, s, cachedState)
}
@ -2158,8 +2157,7 @@ func TestStaleResultsHandler(t *testing.T) {
"__alert_rule_uid__": rule.UID,
})
for _, s := range tc.expectedStates {
cachedState, err := st.Get(s.OrgID, s.AlertRuleUID, s.CacheId)
require.NoError(t, err)
cachedState := st.Get(s.OrgID, s.AlertRuleUID, s.CacheId)
assert.Equal(t, s, cachedState)
}
}