Alerting: Enhancements to /rules (#33085)

* set processing time

* merge labels and set on response

* use state cache for adding alerts to rules

* minor cleanup

* pr feedback

* Do not initialize mutex unnecessarily

Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com>

* linter

Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com>
This commit is contained in:
David Parrott
2021-04-21 09:30:03 -07:00
committed by GitHub
parent 7480d9e2be
commit 4be1d84f23
6 changed files with 193 additions and 152 deletions

View File

@@ -5,6 +5,8 @@ import (
"sync"
"time"
prometheusModel "github.com/prometheus/common/model"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana-plugin-sdk-go/data"
@@ -13,7 +15,7 @@ import (
)
type AlertState struct {
UID string
AlertRuleUID string
OrgID int64
CacheId string
Labels data.Labels
@@ -22,6 +24,7 @@ type AlertState struct {
StartsAt time.Time
EndsAt time.Time
LastEvaluationTime time.Time
EvaluationDuration time.Duration
Annotations map[string]string
}
@@ -31,21 +34,20 @@ type StateEvaluation struct {
}
type cache struct {
cacheMap map[string]AlertState
mu sync.Mutex
states map[string]AlertState
mtxStates sync.Mutex
}
type StateTracker struct {
stateCache cache
quit chan struct{}
Log log.Logger
cache cache
quit chan struct{}
Log log.Logger
}
func NewStateTracker(logger log.Logger) *StateTracker {
tracker := &StateTracker{
stateCache: cache{
cacheMap: make(map[string]AlertState),
mu: sync.Mutex{},
cache: cache{
states: make(map[string]AlertState),
},
quit: make(chan struct{}),
Log: logger,
@@ -54,135 +56,158 @@ func NewStateTracker(logger log.Logger) *StateTracker {
return tracker
}
func (st *StateTracker) getOrCreate(alertRule *ngModels.AlertRule, result eval.Result) AlertState {
st.stateCache.mu.Lock()
defer st.stateCache.mu.Unlock()
lbs := data.Labels{}
if len(result.Instance) > 0 {
lbs = result.Instance
}
func (st *StateTracker) getOrCreate(alertRule *ngModels.AlertRule, result eval.Result, evaluationDuration time.Duration) AlertState {
st.cache.mtxStates.Lock()
defer st.cache.mtxStates.Unlock()
// if duplicate labels exist, alertRule label will take precedence
lbs := mergeLabels(alertRule.Labels, result.Instance)
lbs["__alert_rule_uid__"] = alertRule.UID
lbs["__alert_rule_namespace_uid__"] = alertRule.NamespaceUID
lbs["__alert_rule_title__"] = alertRule.Title
lbs[prometheusModel.AlertNameLabel] = alertRule.Title
id := fmt.Sprintf("%s", map[string]string(lbs))
if state, ok := st.cache.states[id]; ok {
return state
}
annotations := map[string]string{}
if len(alertRule.Annotations) > 0 {
annotations = alertRule.Annotations
}
idString := fmt.Sprintf("%s", map[string]string(lbs))
if state, ok := st.stateCache.cacheMap[idString]; ok {
return state
newResults := []StateEvaluation{
{
EvaluationTime: result.EvaluatedAt,
EvaluationState: result.State,
},
}
st.Log.Debug("adding new alert state cache entry", "cacheId", idString, "state", result.State.String(), "evaluatedAt", result.EvaluatedAt.String())
st.Log.Debug("adding new alert state cache entry", "cacheId", id, "state", result.State.String(), "evaluatedAt", result.EvaluatedAt.String())
newState := AlertState{
UID: alertRule.UID,
OrgID: alertRule.OrgID,
CacheId: idString,
Labels: lbs,
State: result.State,
Results: []StateEvaluation{},
Annotations: annotations,
AlertRuleUID: alertRule.UID,
OrgID: alertRule.OrgID,
CacheId: id,
Labels: lbs,
State: result.State,
Results: newResults,
Annotations: annotations,
EvaluationDuration: evaluationDuration,
}
if result.State == eval.Alerting {
newState.StartsAt = result.EvaluatedAt
}
st.stateCache.cacheMap[idString] = newState
st.cache.states[id] = newState
return newState
}
func (st *StateTracker) set(stateEntry AlertState) {
st.stateCache.mu.Lock()
defer st.stateCache.mu.Unlock()
st.stateCache.cacheMap[stateEntry.CacheId] = stateEntry
func (st *StateTracker) set(entry AlertState) {
st.cache.mtxStates.Lock()
defer st.cache.mtxStates.Unlock()
st.cache.states[entry.CacheId] = entry
}
func (st *StateTracker) Get(stateId string) AlertState {
st.stateCache.mu.Lock()
defer st.stateCache.mu.Unlock()
return st.stateCache.cacheMap[stateId]
func (st *StateTracker) Get(id string) AlertState {
st.cache.mtxStates.Lock()
defer st.cache.mtxStates.Unlock()
return st.cache.states[id]
}
//Used to ensure a clean cache on startup
func (st *StateTracker) ResetCache() {
st.stateCache.mu.Lock()
defer st.stateCache.mu.Unlock()
st.stateCache.cacheMap = make(map[string]AlertState)
st.cache.mtxStates.Lock()
defer st.cache.mtxStates.Unlock()
st.cache.states = make(map[string]AlertState)
}
func (st *StateTracker) ProcessEvalResults(alertRule *ngModels.AlertRule, results eval.Results) []AlertState {
func (st *StateTracker) ProcessEvalResults(alertRule *ngModels.AlertRule, results eval.Results, evaluationDuration time.Duration) []AlertState {
st.Log.Info("state tracker processing evaluation results", "uid", alertRule.UID, "resultCount", len(results))
var changedStates []AlertState
var states []AlertState
for _, result := range results {
s, _ := st.setNextState(alertRule, result)
changedStates = append(changedStates, s)
s := st.setNextState(alertRule, result, evaluationDuration)
states = append(states, s)
}
st.Log.Debug("returning changed states to scheduler", "count", len(changedStates))
return changedStates
st.Log.Debug("returning changed states to scheduler", "count", len(states))
return states
}
//TODO: When calculating if an alert should not be firing anymore, we should take three things into account:
// 1. The re-send the delay if any, we don't want to send every firing alert every time, we should have a fixed delay across all alerts to avoid saturating the notification system
// 2. The evaluation interval defined for this particular alert - we don't support that yet but will eventually allow you to define how often do you want this alert to be evaluted
// 3. The base interval defined by the scheduler - in the case where #2 is not yet an option we can use the base interval at which every alert runs.
//Set the current state based on evaluation results
//return the state and a bool indicating whether a state transition occurred
func (st *StateTracker) setNextState(alertRule *ngModels.AlertRule, result eval.Result) (AlertState, bool) {
currentState := st.getOrCreate(alertRule, result)
func (st *StateTracker) setNextState(alertRule *ngModels.AlertRule, result eval.Result, evaluationDuration time.Duration) AlertState {
currentState := st.getOrCreate(alertRule, result, evaluationDuration)
st.Log.Debug("setting alert state", "uid", alertRule.UID)
switch {
case currentState.State == result.State:
st.Log.Debug("no state transition", "cacheId", currentState.CacheId, "state", currentState.State.String())
currentState.LastEvaluationTime = result.EvaluatedAt
currentState.EvaluationDuration = evaluationDuration
currentState.Results = append(currentState.Results, StateEvaluation{
EvaluationTime: result.EvaluatedAt,
EvaluationState: result.State,
})
if currentState.State == eval.Alerting {
currentState.EndsAt = result.EvaluatedAt.Add(40 * time.Second)
currentState.EndsAt = result.EvaluatedAt.Add(alertRule.For * time.Second)
}
st.set(currentState)
return currentState, false
return currentState
case currentState.State == eval.Normal && result.State == eval.Alerting:
st.Log.Debug("state transition from normal to alerting", "cacheId", currentState.CacheId)
currentState.State = eval.Alerting
currentState.LastEvaluationTime = result.EvaluatedAt
currentState.StartsAt = result.EvaluatedAt
currentState.EndsAt = result.EvaluatedAt.Add(40 * time.Second)
currentState.EndsAt = result.EvaluatedAt.Add(alertRule.For * time.Second)
currentState.EvaluationDuration = evaluationDuration
currentState.Results = append(currentState.Results, StateEvaluation{
EvaluationTime: result.EvaluatedAt,
EvaluationState: result.State,
})
currentState.Annotations["alerting"] = result.EvaluatedAt.String()
st.set(currentState)
return currentState, true
return currentState
case currentState.State == eval.Alerting && result.State == eval.Normal:
st.Log.Debug("state transition from alerting to normal", "cacheId", currentState.CacheId)
currentState.State = eval.Normal
currentState.LastEvaluationTime = result.EvaluatedAt
currentState.EndsAt = result.EvaluatedAt
currentState.EvaluationDuration = evaluationDuration
currentState.Results = append(currentState.Results, StateEvaluation{
EvaluationTime: result.EvaluatedAt,
EvaluationState: result.State,
})
st.set(currentState)
return currentState, true
return currentState
default:
return currentState, false
return currentState
}
}
func (st *StateTracker) GetAll() []AlertState {
var states []AlertState
st.stateCache.mu.Lock()
defer st.stateCache.mu.Unlock()
for _, v := range st.stateCache.cacheMap {
st.cache.mtxStates.Lock()
defer st.cache.mtxStates.Unlock()
for _, v := range st.cache.states {
states = append(states, v)
}
return states
}
func (st *StateTracker) GetStatesByRuleUID() map[string][]AlertState {
ruleMap := make(map[string][]AlertState)
st.cache.mtxStates.Lock()
defer st.cache.mtxStates.Unlock()
for _, state := range st.cache.states {
if ruleStates, ok := ruleMap[state.AlertRuleUID]; ok {
ruleStates = append(ruleStates, state)
ruleMap[state.AlertRuleUID] = ruleStates
} else {
ruleStates := []AlertState{state}
ruleMap[state.AlertRuleUID] = ruleStates
}
}
return ruleMap
}
func (st *StateTracker) cleanUp() {
ticker := time.NewTicker(time.Duration(60) * time.Minute)
st.Log.Debug("starting cleanup process", "intervalMinutes", 60)
@@ -200,9 +225,9 @@ func (st *StateTracker) cleanUp() {
func (st *StateTracker) trim() {
st.Log.Info("trimming alert state cache", "now", time.Now())
st.stateCache.mu.Lock()
defer st.stateCache.mu.Unlock()
for _, v := range st.stateCache.cacheMap {
st.cache.mtxStates.Lock()
defer st.cache.mtxStates.Unlock()
for _, v := range st.cache.states {
if len(v.Results) > 100 {
st.Log.Debug("trimming result set", "cacheId", v.CacheId, "count", len(v.Results)-100)
newResults := make([]StateEvaluation, 100)
@@ -214,7 +239,7 @@ func (st *StateTracker) trim() {
}
func (a AlertState) Equals(b AlertState) bool {
return a.UID == b.UID &&
return a.AlertRuleUID == b.AlertRuleUID &&
a.OrgID == b.OrgID &&
a.CacheId == b.CacheId &&
a.Labels.String() == b.Labels.String() &&
@@ -229,3 +254,17 @@ func (st *StateTracker) Put(states []AlertState) {
st.set(s)
}
}
// if duplicate labels exist, keep the value from the first set
func mergeLabels(a, b data.Labels) data.Labels {
newLbs := data.Labels{}
for k, v := range a {
newLbs[k] = v
}
for k, v := range b {
if _, ok := newLbs[k]; !ok {
newLbs[k] = v
}
}
return newLbs
}