mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Alerting: Trim results when at processing instead of on ticker (#34248)
* Trim results when at processing instead of on ticker * User RWMutex correctly * remove comment
This commit is contained in:
parent
bbb7bbf891
commit
25485100b0
@ -92,8 +92,8 @@ func (c *cache) set(entry *State) {
|
||||
}
|
||||
|
||||
func (c *cache) get(orgID int64, alertRuleUID, stateId string) (*State, error) {
|
||||
c.mtxStates.Lock()
|
||||
defer c.mtxStates.Unlock()
|
||||
c.mtxStates.RLock()
|
||||
defer c.mtxStates.RUnlock()
|
||||
if state, ok := c.states[orgID][alertRuleUID][stateId]; ok {
|
||||
return state, nil
|
||||
}
|
||||
@ -102,8 +102,8 @@ func (c *cache) get(orgID int64, alertRuleUID, stateId string) (*State, error) {
|
||||
|
||||
func (c *cache) getAll(orgID int64) []*State {
|
||||
var states []*State
|
||||
c.mtxStates.Lock()
|
||||
defer c.mtxStates.Unlock()
|
||||
c.mtxStates.RLock()
|
||||
defer c.mtxStates.RUnlock()
|
||||
for _, v1 := range c.states[orgID] {
|
||||
for _, v2 := range v1 {
|
||||
states = append(states, v2)
|
||||
@ -114,8 +114,8 @@ func (c *cache) getAll(orgID int64) []*State {
|
||||
|
||||
func (c *cache) getStatesForRuleUID(orgID int64, alertRuleUID string) []*State {
|
||||
var ruleStates []*State
|
||||
c.mtxStates.Lock()
|
||||
defer c.mtxStates.Unlock()
|
||||
c.mtxStates.RLock()
|
||||
defer c.mtxStates.RUnlock()
|
||||
for _, state := range c.states[orgID][alertRuleUID] {
|
||||
ruleStates = append(ruleStates, state)
|
||||
}
|
||||
@ -135,9 +135,9 @@ func (c *cache) reset() {
|
||||
c.states = make(map[int64]map[string]map[string]*State)
|
||||
}
|
||||
|
||||
func (c *cache) trim() {
|
||||
c.mtxStates.Lock()
|
||||
defer c.mtxStates.Unlock()
|
||||
func (c *cache) recordMetrics() {
|
||||
c.mtxStates.RLock()
|
||||
defer c.mtxStates.RUnlock()
|
||||
|
||||
// Set default values to zero such that gauges are reset
|
||||
// after all values from a single state disappear.
|
||||
@ -153,13 +153,6 @@ func (c *cache) trim() {
|
||||
c.metrics.GroupRules.WithLabelValues(fmt.Sprint(org)).Set(float64(len(orgMap)))
|
||||
for _, rule := range orgMap {
|
||||
for _, state := range rule {
|
||||
if len(state.Results) > 100 {
|
||||
newResults := make([]Evaluation, 100)
|
||||
// Keep last 100 results
|
||||
copy(newResults, state.Results[len(state.Results)-100:])
|
||||
state.Results = newResults
|
||||
}
|
||||
|
||||
n := ct[state.State]
|
||||
ct[state.State] = n + 1
|
||||
}
|
||||
|
@ -1,7 +1,6 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
@ -25,7 +24,7 @@ func NewManager(logger log.Logger, metrics *metrics.Metrics) *Manager {
|
||||
Log: logger,
|
||||
metrics: metrics,
|
||||
}
|
||||
go manager.cleanUp()
|
||||
go manager.recordMetrics()
|
||||
return manager
|
||||
}
|
||||
|
||||
@ -78,6 +77,7 @@ func (st *Manager) setNextState(alertRule *ngModels.AlertRule, result eval.Resul
|
||||
EvaluationState: result.State,
|
||||
EvaluationString: result.EvaluationString,
|
||||
})
|
||||
currentState.TrimResults(alertRule)
|
||||
|
||||
st.Log.Debug("setting alert state", "uid", alertRule.UID)
|
||||
switch result.State {
|
||||
@ -104,19 +104,18 @@ func (st *Manager) GetStatesForRuleUID(orgID int64, alertRuleUID string) []*Stat
|
||||
return st.cache.getStatesForRuleUID(orgID, alertRuleUID)
|
||||
}
|
||||
|
||||
func (st *Manager) cleanUp() {
|
||||
func (st *Manager) recordMetrics() {
|
||||
// TODO: parameterize?
|
||||
// Setting to a reasonable default scrape interval for Prometheus.
|
||||
dur := time.Duration(15) * time.Second
|
||||
ticker := time.NewTicker(dur)
|
||||
st.Log.Debug("starting cleanup process", "dur", fmt.Sprint(dur))
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
st.Log.Info("trimming alert state cache", "now", time.Now())
|
||||
st.cache.trim()
|
||||
st.Log.Info("recording state cache metrics", "now", time.Now())
|
||||
st.cache.recordMetrics()
|
||||
case <-st.quit:
|
||||
st.Log.Debug("stopping cleanup process", "now", time.Now())
|
||||
st.Log.Debug("stopping state cache metrics recording", "now", time.Now())
|
||||
ticker.Stop()
|
||||
return
|
||||
}
|
||||
|
@ -123,3 +123,17 @@ func (a *State) Equals(b *State) bool {
|
||||
a.LastEvaluationTime == b.LastEvaluationTime &&
|
||||
data.Labels(a.Annotations).String() == data.Labels(b.Annotations).String()
|
||||
}
|
||||
|
||||
func (a *State) TrimResults(alertRule *ngModels.AlertRule) {
|
||||
numBuckets := 2 * (int64(alertRule.For.Seconds()) / alertRule.IntervalSeconds)
|
||||
if numBuckets == 0 {
|
||||
numBuckets = 10 // keep at least 10 evaluations in the event For is set to 0
|
||||
}
|
||||
|
||||
if len(a.Results) < int(numBuckets) {
|
||||
return
|
||||
}
|
||||
newResults := make([]Evaluation, numBuckets)
|
||||
copy(newResults, a.Results[len(a.Results)-int(numBuckets):])
|
||||
a.Results = newResults
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user