mirror of
https://github.com/grafana/grafana.git
synced 2025-02-15 10:03:33 -06:00
* Alerting: Refactor state manager as a dependency Within the scheduler, the state manager was being passed around a certain number of functions. I've introduced it as a dependency to keep the "service" interfaces as clean and homogeneous as possible. This is relevant, because I'm going to introduce live reload of these components as part of my next PR and it is better if dependencies are self-contained. * remove unused functions * Fix a few more tests * Make sure the `stateManager` is declared before the schedule
214 lines
5.8 KiB
Go
214 lines
5.8 KiB
Go
package state
|
|
|
|
import (
|
|
"time"
|
|
|
|
"github.com/grafana/grafana/pkg/infra/log"
|
|
|
|
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
|
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
|
ngModels "github.com/grafana/grafana/pkg/services/ngalert/models"
|
|
"github.com/grafana/grafana/pkg/services/ngalert/store"
|
|
)
|
|
|
|
type Manager struct {
|
|
log log.Logger
|
|
metrics *metrics.Metrics
|
|
|
|
cache *cache
|
|
quit chan struct{}
|
|
ResendDelay time.Duration
|
|
|
|
ruleStore store.RuleStore
|
|
instanceStore store.InstanceStore
|
|
}
|
|
|
|
func NewManager(logger log.Logger, metrics *metrics.Metrics, ruleStore store.RuleStore, instanceStore store.InstanceStore) *Manager {
|
|
manager := &Manager{
|
|
cache: newCache(logger, metrics),
|
|
quit: make(chan struct{}),
|
|
ResendDelay: 1 * time.Minute, // TODO: make this configurable
|
|
log: logger,
|
|
metrics: metrics,
|
|
ruleStore: ruleStore,
|
|
instanceStore: instanceStore,
|
|
}
|
|
go manager.recordMetrics()
|
|
return manager
|
|
}
|
|
|
|
func (st *Manager) Close() {
|
|
st.quit <- struct{}{}
|
|
}
|
|
|
|
func (st *Manager) Warm() {
|
|
st.log.Info("warming cache for startup")
|
|
st.ResetCache()
|
|
|
|
orgIds, err := st.instanceStore.FetchOrgIds()
|
|
if err != nil {
|
|
st.log.Error("unable to fetch orgIds", "msg", err.Error())
|
|
}
|
|
|
|
var states []*State
|
|
for _, orgId := range orgIds {
|
|
// Get Rules
|
|
ruleCmd := ngModels.ListAlertRulesQuery{
|
|
OrgID: orgId,
|
|
}
|
|
if err := st.ruleStore.GetOrgAlertRules(&ruleCmd); err != nil {
|
|
st.log.Error("unable to fetch previous state", "msg", err.Error())
|
|
}
|
|
|
|
ruleByUID := make(map[string]*ngModels.AlertRule, len(ruleCmd.Result))
|
|
for _, rule := range ruleCmd.Result {
|
|
ruleByUID[rule.UID] = rule
|
|
}
|
|
|
|
// Get Instances
|
|
cmd := ngModels.ListAlertInstancesQuery{
|
|
RuleOrgID: orgId,
|
|
}
|
|
if err := st.instanceStore.ListAlertInstances(&cmd); err != nil {
|
|
st.log.Error("unable to fetch previous state", "msg", err.Error())
|
|
}
|
|
|
|
for _, entry := range cmd.Result {
|
|
ruleForEntry, ok := ruleByUID[entry.RuleUID]
|
|
if !ok {
|
|
st.log.Error("rule not found for instance, ignoring", "rule", entry.RuleUID)
|
|
continue
|
|
}
|
|
|
|
lbs := map[string]string(entry.Labels)
|
|
cacheId, err := entry.Labels.StringKey()
|
|
if err != nil {
|
|
st.log.Error("error getting cacheId for entry", "msg", err.Error())
|
|
}
|
|
stateForEntry := &State{
|
|
AlertRuleUID: entry.RuleUID,
|
|
OrgID: entry.RuleOrgID,
|
|
CacheId: cacheId,
|
|
Labels: lbs,
|
|
State: translateInstanceState(entry.CurrentState),
|
|
Results: []Evaluation{},
|
|
StartsAt: entry.CurrentStateSince,
|
|
EndsAt: entry.CurrentStateEnd,
|
|
LastEvaluationTime: entry.LastEvalTime,
|
|
Annotations: ruleForEntry.Annotations,
|
|
}
|
|
states = append(states, stateForEntry)
|
|
}
|
|
}
|
|
|
|
for _, s := range states {
|
|
st.set(s)
|
|
}
|
|
}
|
|
|
|
func (st *Manager) getOrCreate(alertRule *ngModels.AlertRule, result eval.Result) *State {
|
|
return st.cache.getOrCreate(alertRule, result)
|
|
}
|
|
|
|
func (st *Manager) set(entry *State) {
|
|
st.cache.set(entry)
|
|
}
|
|
|
|
func (st *Manager) Get(orgID int64, alertRuleUID, stateId string) (*State, error) {
|
|
return st.cache.get(orgID, alertRuleUID, stateId)
|
|
}
|
|
|
|
// ResetCache is used to ensure a clean cache on startup.
|
|
func (st *Manager) ResetCache() {
|
|
st.cache.reset()
|
|
}
|
|
|
|
// RemoveByRuleUID deletes all entries in the state manager that match the given rule UID.
|
|
func (st *Manager) RemoveByRuleUID(orgID int64, ruleUID string) {
|
|
st.cache.removeByRuleUID(orgID, ruleUID)
|
|
}
|
|
|
|
func (st *Manager) ProcessEvalResults(alertRule *ngModels.AlertRule, results eval.Results) []*State {
|
|
st.log.Debug("state manager processing evaluation results", "uid", alertRule.UID, "resultCount", len(results))
|
|
var states []*State
|
|
for _, result := range results {
|
|
s := st.setNextState(alertRule, result)
|
|
states = append(states, s)
|
|
}
|
|
st.log.Debug("returning changed states to scheduler", "count", len(states))
|
|
return states
|
|
}
|
|
|
|
//Set the current state based on evaluation results
|
|
func (st *Manager) setNextState(alertRule *ngModels.AlertRule, result eval.Result) *State {
|
|
currentState := st.getOrCreate(alertRule, result)
|
|
|
|
currentState.LastEvaluationTime = result.EvaluatedAt
|
|
currentState.EvaluationDuration = result.EvaluationDuration
|
|
currentState.Results = append(currentState.Results, Evaluation{
|
|
EvaluationTime: result.EvaluatedAt,
|
|
EvaluationState: result.State,
|
|
EvaluationString: result.EvaluationString,
|
|
})
|
|
currentState.TrimResults(alertRule)
|
|
|
|
st.log.Debug("setting alert state", "uid", alertRule.UID)
|
|
switch result.State {
|
|
case eval.Normal:
|
|
currentState.resultNormal(result)
|
|
case eval.Alerting:
|
|
currentState.resultAlerting(alertRule, result)
|
|
case eval.Error:
|
|
currentState.resultError(alertRule, result)
|
|
case eval.NoData:
|
|
currentState.resultNoData(alertRule, result)
|
|
case eval.Pending: // we do not emit results with this state
|
|
}
|
|
|
|
st.set(currentState)
|
|
return currentState
|
|
}
|
|
|
|
func (st *Manager) GetAll(orgID int64) []*State {
|
|
return st.cache.getAll(orgID)
|
|
}
|
|
|
|
func (st *Manager) GetStatesForRuleUID(orgID int64, alertRuleUID string) []*State {
|
|
return st.cache.getStatesForRuleUID(orgID, alertRuleUID)
|
|
}
|
|
|
|
func (st *Manager) recordMetrics() {
|
|
// TODO: parameterize?
|
|
// Setting to a reasonable default scrape interval for Prometheus.
|
|
dur := time.Duration(15) * time.Second
|
|
ticker := time.NewTicker(dur)
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
st.log.Info("recording state cache metrics", "now", time.Now())
|
|
st.cache.recordMetrics()
|
|
case <-st.quit:
|
|
st.log.Debug("stopping state cache metrics recording", "now", time.Now())
|
|
ticker.Stop()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (st *Manager) Put(states []*State) {
|
|
for _, s := range states {
|
|
st.set(s)
|
|
}
|
|
}
|
|
|
|
func translateInstanceState(state ngModels.InstanceStateType) eval.State {
|
|
switch {
|
|
case state == ngModels.InstanceStateFiring:
|
|
return eval.Alerting
|
|
case state == ngModels.InstanceStateNormal:
|
|
return eval.Normal
|
|
default:
|
|
return eval.Error
|
|
}
|
|
}
|