mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Alerting: Refactor state manager as a dependency (#36513)
* Alerting: Refactor state manager as a dependency Within the scheduler, the state manager was being passed around a certain number of functions. I've introduced it as a dependency to keep the "service" interfaces as clean and homogeneous as possible. This is relevant, because I'm going to introduce live reload of these components as part of my next PR and it is better if dependencies are self-contained. * remove unused functions * Fix a few more tests * Make sure the `stateManager` is declared before the schedule
This commit is contained in:
parent
acf54905ee
commit
a86ad1190c
@ -49,10 +49,10 @@ type AlertNG struct {
|
||||
DataProxy *datasourceproxy.DatasourceProxyService `inject:""`
|
||||
QuotaService *quota.QuotaService `inject:""`
|
||||
Metrics *metrics.Metrics `inject:""`
|
||||
Alertmanager *notifier.Alertmanager
|
||||
Log log.Logger
|
||||
schedule schedule.ScheduleService
|
||||
stateManager *state.Manager
|
||||
Alertmanager *notifier.Alertmanager
|
||||
}
|
||||
|
||||
func init() {
|
||||
@ -62,7 +62,6 @@ func init() {
|
||||
// Init initializes the AlertingService.
|
||||
func (ng *AlertNG) Init() error {
|
||||
ng.Log = log.New("ngalert")
|
||||
ng.stateManager = state.NewManager(ng.Log, ng.Metrics)
|
||||
baseInterval := baseIntervalSeconds * time.Second
|
||||
|
||||
store := &store.DBstore{
|
||||
@ -89,7 +88,8 @@ func (ng *AlertNG) Init() error {
|
||||
Notifier: ng.Alertmanager,
|
||||
Metrics: ng.Metrics,
|
||||
}
|
||||
ng.schedule = schedule.NewScheduler(schedCfg, ng.DataService, ng.Cfg.AppURL)
|
||||
ng.stateManager = state.NewManager(ng.Log, ng.Metrics, store, store)
|
||||
ng.schedule = schedule.NewScheduler(schedCfg, ng.DataService, ng.Cfg.AppURL, ng.stateManager)
|
||||
|
||||
api := api.API{
|
||||
Cfg: ng.Cfg,
|
||||
@ -113,11 +113,11 @@ func (ng *AlertNG) Init() error {
|
||||
// Run starts the scheduler.
|
||||
func (ng *AlertNG) Run(ctx context.Context) error {
|
||||
ng.Log.Debug("ngalert starting")
|
||||
ng.schedule.WarmStateCache(ng.stateManager)
|
||||
ng.stateManager.Warm()
|
||||
|
||||
children, subCtx := errgroup.WithContext(ctx)
|
||||
children.Go(func() error {
|
||||
return ng.schedule.Ticker(subCtx, ng.stateManager)
|
||||
return ng.schedule.Ticker(subCtx)
|
||||
})
|
||||
children.Go(func() error {
|
||||
return ng.Alertmanager.Run(subCtx)
|
||||
|
@ -25,10 +25,9 @@ var timeNow = time.Now
|
||||
|
||||
// ScheduleService handles scheduling
|
||||
type ScheduleService interface {
|
||||
Ticker(context.Context, *state.Manager) error
|
||||
Ticker(context.Context) error
|
||||
Pause() error
|
||||
Unpause() error
|
||||
WarmStateCache(*state.Manager)
|
||||
|
||||
// the following are used by tests only used for tests
|
||||
evalApplied(models.AlertRuleKey, time.Time)
|
||||
@ -36,7 +35,134 @@ type ScheduleService interface {
|
||||
overrideCfg(cfg SchedulerCfg)
|
||||
}
|
||||
|
||||
func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRuleKey, evalCh <-chan *evalContext, stopCh <-chan struct{}, stateManager *state.Manager) error {
|
||||
// Notifier handles the delivery of alert notifications to the end user
|
||||
type Notifier interface {
|
||||
PutAlerts(alerts apimodels.PostableAlerts) error
|
||||
}
|
||||
|
||||
type schedule struct {
|
||||
// base tick rate (fastest possible configured check)
|
||||
baseInterval time.Duration
|
||||
|
||||
// each alert rule gets its own channel and routine
|
||||
registry alertRuleRegistry
|
||||
|
||||
maxAttempts int64
|
||||
|
||||
clock clock.Clock
|
||||
|
||||
heartbeat *alerting.Ticker
|
||||
|
||||
// evalApplied is only used for tests: test code can set it to non-nil
|
||||
// function, and then it'll be called from the event loop whenever the
|
||||
// message from evalApplied is handled.
|
||||
evalAppliedFunc func(models.AlertRuleKey, time.Time)
|
||||
|
||||
// stopApplied is only used for tests: test code can set it to non-nil
|
||||
// function, and then it'll be called from the event loop whenever the
|
||||
// message from stopApplied is handled.
|
||||
stopAppliedFunc func(models.AlertRuleKey)
|
||||
|
||||
log log.Logger
|
||||
|
||||
evaluator eval.Evaluator
|
||||
|
||||
ruleStore store.RuleStore
|
||||
|
||||
instanceStore store.InstanceStore
|
||||
|
||||
dataService *tsdb.Service
|
||||
|
||||
stateManager *state.Manager
|
||||
|
||||
appURL string
|
||||
|
||||
notifier Notifier
|
||||
metrics *metrics.Metrics
|
||||
}
|
||||
|
||||
// SchedulerCfg is the scheduler configuration.
|
||||
type SchedulerCfg struct {
|
||||
C clock.Clock
|
||||
BaseInterval time.Duration
|
||||
Logger log.Logger
|
||||
EvalAppliedFunc func(models.AlertRuleKey, time.Time)
|
||||
MaxAttempts int64
|
||||
StopAppliedFunc func(models.AlertRuleKey)
|
||||
Evaluator eval.Evaluator
|
||||
RuleStore store.RuleStore
|
||||
InstanceStore store.InstanceStore
|
||||
Notifier Notifier
|
||||
Metrics *metrics.Metrics
|
||||
}
|
||||
|
||||
// NewScheduler returns a new schedule.
|
||||
func NewScheduler(cfg SchedulerCfg, dataService *tsdb.Service, appURL string, stateManager *state.Manager) *schedule {
|
||||
ticker := alerting.NewTicker(cfg.C.Now(), time.Second*0, cfg.C, int64(cfg.BaseInterval.Seconds()))
|
||||
sch := schedule{
|
||||
registry: alertRuleRegistry{alertRuleInfo: make(map[models.AlertRuleKey]alertRuleInfo)},
|
||||
maxAttempts: cfg.MaxAttempts,
|
||||
clock: cfg.C,
|
||||
baseInterval: cfg.BaseInterval,
|
||||
log: cfg.Logger,
|
||||
heartbeat: ticker,
|
||||
evalAppliedFunc: cfg.EvalAppliedFunc,
|
||||
stopAppliedFunc: cfg.StopAppliedFunc,
|
||||
evaluator: cfg.Evaluator,
|
||||
ruleStore: cfg.RuleStore,
|
||||
instanceStore: cfg.InstanceStore,
|
||||
dataService: dataService,
|
||||
notifier: cfg.Notifier,
|
||||
metrics: cfg.Metrics,
|
||||
appURL: appURL,
|
||||
stateManager: stateManager,
|
||||
}
|
||||
return &sch
|
||||
}
|
||||
|
||||
func (sch *schedule) overrideCfg(cfg SchedulerCfg) {
|
||||
sch.clock = cfg.C
|
||||
sch.baseInterval = cfg.BaseInterval
|
||||
sch.heartbeat = alerting.NewTicker(cfg.C.Now(), time.Second*0, cfg.C, int64(cfg.BaseInterval.Seconds()))
|
||||
sch.evalAppliedFunc = cfg.EvalAppliedFunc
|
||||
sch.stopAppliedFunc = cfg.StopAppliedFunc
|
||||
}
|
||||
|
||||
func (sch *schedule) evalApplied(alertDefKey models.AlertRuleKey, now time.Time) {
|
||||
if sch.evalAppliedFunc == nil {
|
||||
return
|
||||
}
|
||||
|
||||
sch.evalAppliedFunc(alertDefKey, now)
|
||||
}
|
||||
|
||||
func (sch *schedule) stopApplied(alertDefKey models.AlertRuleKey) {
|
||||
if sch.stopAppliedFunc == nil {
|
||||
return
|
||||
}
|
||||
|
||||
sch.stopAppliedFunc(alertDefKey)
|
||||
}
|
||||
|
||||
func (sch *schedule) Pause() error {
|
||||
if sch == nil {
|
||||
return fmt.Errorf("scheduler is not initialised")
|
||||
}
|
||||
sch.heartbeat.Pause()
|
||||
sch.log.Info("alert rule scheduler paused", "now", sch.clock.Now())
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sch *schedule) Unpause() error {
|
||||
if sch == nil {
|
||||
return fmt.Errorf("scheduler is not initialised")
|
||||
}
|
||||
sch.heartbeat.Unpause()
|
||||
sch.log.Info("alert rule scheduler unpaused", "now", sch.clock.Now())
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRuleKey, evalCh <-chan *evalContext, stopCh <-chan struct{}) error {
|
||||
sch.log.Debug("alert rule routine started", "key", key)
|
||||
|
||||
evalRunning := false
|
||||
@ -86,9 +212,9 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul
|
||||
return err
|
||||
}
|
||||
|
||||
processedStates := stateManager.ProcessEvalResults(alertRule, results)
|
||||
processedStates := sch.stateManager.ProcessEvalResults(alertRule, results)
|
||||
sch.saveAlertStates(processedStates)
|
||||
alerts := FromAlertStateToPostableAlerts(sch.log, processedStates, stateManager, sch.appURL)
|
||||
alerts := FromAlertStateToPostableAlerts(sch.log, processedStates, sch.stateManager, sch.appURL)
|
||||
sch.log.Debug("sending alerts to notifier", "count", len(alerts.PostableAlerts), "alerts", alerts.PostableAlerts)
|
||||
err = sch.sendAlerts(alerts)
|
||||
if err != nil {
|
||||
@ -122,131 +248,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul
|
||||
}
|
||||
}
|
||||
|
||||
// Notifier handles the delivery of alert notifications to the end user
|
||||
type Notifier interface {
|
||||
PutAlerts(alerts apimodels.PostableAlerts) error
|
||||
}
|
||||
|
||||
type schedule struct {
|
||||
// base tick rate (fastest possible configured check)
|
||||
baseInterval time.Duration
|
||||
|
||||
// each alert rule gets its own channel and routine
|
||||
registry alertRuleRegistry
|
||||
|
||||
maxAttempts int64
|
||||
|
||||
clock clock.Clock
|
||||
|
||||
heartbeat *alerting.Ticker
|
||||
|
||||
// evalApplied is only used for tests: test code can set it to non-nil
|
||||
// function, and then it'll be called from the event loop whenever the
|
||||
// message from evalApplied is handled.
|
||||
evalAppliedFunc func(models.AlertRuleKey, time.Time)
|
||||
|
||||
// stopApplied is only used for tests: test code can set it to non-nil
|
||||
// function, and then it'll be called from the event loop whenever the
|
||||
// message from stopApplied is handled.
|
||||
stopAppliedFunc func(models.AlertRuleKey)
|
||||
|
||||
log log.Logger
|
||||
|
||||
evaluator eval.Evaluator
|
||||
|
||||
ruleStore store.RuleStore
|
||||
|
||||
instanceStore store.InstanceStore
|
||||
|
||||
dataService *tsdb.Service
|
||||
|
||||
appURL string
|
||||
|
||||
notifier Notifier
|
||||
metrics *metrics.Metrics
|
||||
}
|
||||
|
||||
// SchedulerCfg is the scheduler configuration.
|
||||
type SchedulerCfg struct {
|
||||
C clock.Clock
|
||||
BaseInterval time.Duration
|
||||
Logger log.Logger
|
||||
EvalAppliedFunc func(models.AlertRuleKey, time.Time)
|
||||
MaxAttempts int64
|
||||
StopAppliedFunc func(models.AlertRuleKey)
|
||||
Evaluator eval.Evaluator
|
||||
RuleStore store.RuleStore
|
||||
InstanceStore store.InstanceStore
|
||||
Notifier Notifier
|
||||
Metrics *metrics.Metrics
|
||||
}
|
||||
|
||||
// NewScheduler returns a new schedule.
|
||||
func NewScheduler(cfg SchedulerCfg, dataService *tsdb.Service, appURL string) *schedule {
|
||||
ticker := alerting.NewTicker(cfg.C.Now(), time.Second*0, cfg.C, int64(cfg.BaseInterval.Seconds()))
|
||||
sch := schedule{
|
||||
registry: alertRuleRegistry{alertRuleInfo: make(map[models.AlertRuleKey]alertRuleInfo)},
|
||||
maxAttempts: cfg.MaxAttempts,
|
||||
clock: cfg.C,
|
||||
baseInterval: cfg.BaseInterval,
|
||||
log: cfg.Logger,
|
||||
heartbeat: ticker,
|
||||
evalAppliedFunc: cfg.EvalAppliedFunc,
|
||||
stopAppliedFunc: cfg.StopAppliedFunc,
|
||||
evaluator: cfg.Evaluator,
|
||||
ruleStore: cfg.RuleStore,
|
||||
instanceStore: cfg.InstanceStore,
|
||||
dataService: dataService,
|
||||
notifier: cfg.Notifier,
|
||||
metrics: cfg.Metrics,
|
||||
appURL: appURL,
|
||||
}
|
||||
return &sch
|
||||
}
|
||||
|
||||
func (sch *schedule) overrideCfg(cfg SchedulerCfg) {
|
||||
sch.clock = cfg.C
|
||||
sch.baseInterval = cfg.BaseInterval
|
||||
sch.heartbeat = alerting.NewTicker(cfg.C.Now(), time.Second*0, cfg.C, int64(cfg.BaseInterval.Seconds()))
|
||||
sch.evalAppliedFunc = cfg.EvalAppliedFunc
|
||||
sch.stopAppliedFunc = cfg.StopAppliedFunc
|
||||
}
|
||||
|
||||
func (sch *schedule) evalApplied(alertDefKey models.AlertRuleKey, now time.Time) {
|
||||
if sch.evalAppliedFunc == nil {
|
||||
return
|
||||
}
|
||||
|
||||
sch.evalAppliedFunc(alertDefKey, now)
|
||||
}
|
||||
|
||||
func (sch *schedule) stopApplied(alertDefKey models.AlertRuleKey) {
|
||||
if sch.stopAppliedFunc == nil {
|
||||
return
|
||||
}
|
||||
|
||||
sch.stopAppliedFunc(alertDefKey)
|
||||
}
|
||||
|
||||
func (sch *schedule) Pause() error {
|
||||
if sch == nil {
|
||||
return fmt.Errorf("scheduler is not initialised")
|
||||
}
|
||||
sch.heartbeat.Pause()
|
||||
sch.log.Info("alert rule scheduler paused", "now", sch.clock.Now())
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sch *schedule) Unpause() error {
|
||||
if sch == nil {
|
||||
return fmt.Errorf("scheduler is not initialised")
|
||||
}
|
||||
sch.heartbeat.Unpause()
|
||||
sch.log.Info("alert rule scheduler unpaused", "now", sch.clock.Now())
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sch *schedule) Ticker(grafanaCtx context.Context, stateManager *state.Manager) error {
|
||||
func (sch *schedule) Ticker(grafanaCtx context.Context) error {
|
||||
dispatcherGroup, ctx := errgroup.WithContext(grafanaCtx)
|
||||
for {
|
||||
select {
|
||||
@ -275,7 +277,7 @@ func (sch *schedule) Ticker(grafanaCtx context.Context, stateManager *state.Mana
|
||||
|
||||
if newRoutine && !invalidInterval {
|
||||
dispatcherGroup.Go(func() error {
|
||||
return sch.ruleRoutine(ctx, key, ruleInfo.evalCh, ruleInfo.stopCh, stateManager)
|
||||
return sch.ruleRoutine(ctx, key, ruleInfo.evalCh, ruleInfo.stopCh)
|
||||
})
|
||||
}
|
||||
|
||||
@ -327,10 +329,10 @@ func (sch *schedule) Ticker(grafanaCtx context.Context, stateManager *state.Mana
|
||||
}
|
||||
|
||||
for _, v := range orgIds {
|
||||
sch.saveAlertStates(stateManager.GetAll(v))
|
||||
sch.saveAlertStates(sch.stateManager.GetAll(v))
|
||||
}
|
||||
|
||||
stateManager.Close()
|
||||
sch.stateManager.Close()
|
||||
return waitErr
|
||||
}
|
||||
}
|
||||
@ -359,79 +361,6 @@ func (sch *schedule) saveAlertStates(states []*state.State) {
|
||||
}
|
||||
}
|
||||
|
||||
func (sch *schedule) WarmStateCache(st *state.Manager) {
|
||||
sch.log.Info("warming cache for startup")
|
||||
st.ResetCache()
|
||||
|
||||
orgIds, err := sch.instanceStore.FetchOrgIds()
|
||||
if err != nil {
|
||||
sch.log.Error("unable to fetch orgIds", "msg", err.Error())
|
||||
}
|
||||
|
||||
var states []*state.State
|
||||
for _, orgId := range orgIds {
|
||||
// Get Rules
|
||||
ruleCmd := models.ListAlertRulesQuery{
|
||||
OrgID: orgId,
|
||||
}
|
||||
if err := sch.ruleStore.GetOrgAlertRules(&ruleCmd); err != nil {
|
||||
sch.log.Error("unable to fetch previous state", "msg", err.Error())
|
||||
}
|
||||
|
||||
ruleByUID := make(map[string]*models.AlertRule, len(ruleCmd.Result))
|
||||
for _, rule := range ruleCmd.Result {
|
||||
ruleByUID[rule.UID] = rule
|
||||
}
|
||||
|
||||
// Get Instances
|
||||
cmd := models.ListAlertInstancesQuery{
|
||||
RuleOrgID: orgId,
|
||||
}
|
||||
if err := sch.instanceStore.ListAlertInstances(&cmd); err != nil {
|
||||
sch.log.Error("unable to fetch previous state", "msg", err.Error())
|
||||
}
|
||||
|
||||
for _, entry := range cmd.Result {
|
||||
ruleForEntry, ok := ruleByUID[entry.RuleUID]
|
||||
if !ok {
|
||||
sch.log.Error("rule not found for instance, ignoring", "rule", entry.RuleUID)
|
||||
continue
|
||||
}
|
||||
|
||||
lbs := map[string]string(entry.Labels)
|
||||
cacheId, err := entry.Labels.StringKey()
|
||||
if err != nil {
|
||||
sch.log.Error("error getting cacheId for entry", "msg", err.Error())
|
||||
}
|
||||
stateForEntry := &state.State{
|
||||
AlertRuleUID: entry.RuleUID,
|
||||
OrgID: entry.RuleOrgID,
|
||||
CacheId: cacheId,
|
||||
Labels: lbs,
|
||||
State: translateInstanceState(entry.CurrentState),
|
||||
Results: []state.Evaluation{},
|
||||
StartsAt: entry.CurrentStateSince,
|
||||
EndsAt: entry.CurrentStateEnd,
|
||||
LastEvaluationTime: entry.LastEvalTime,
|
||||
Annotations: ruleForEntry.Annotations,
|
||||
}
|
||||
states = append(states, stateForEntry)
|
||||
}
|
||||
}
|
||||
st.Put(states)
|
||||
}
|
||||
|
||||
func translateInstanceState(state models.InstanceStateType) eval.State {
|
||||
switch {
|
||||
case state == models.InstanceStateFiring:
|
||||
return eval.Alerting
|
||||
case state == models.InstanceStateNormal:
|
||||
return eval.Normal
|
||||
default:
|
||||
return eval.Error
|
||||
}
|
||||
}
|
||||
|
||||
type alertRuleRegistry struct {
|
||||
mu sync.Mutex
|
||||
alertRuleInfo map[models.AlertRuleKey]alertRuleInfo
|
||||
|
@ -107,9 +107,8 @@ func TestWarmStateCache(t *testing.T) {
|
||||
InstanceStore: dbstore,
|
||||
Metrics: metrics.NewMetrics(prometheus.NewRegistry()),
|
||||
}
|
||||
sched := schedule.NewScheduler(schedCfg, nil, "http://localhost")
|
||||
st := state.NewManager(schedCfg.Logger, nilMetrics)
|
||||
sched.WarmStateCache(st)
|
||||
st := state.NewManager(schedCfg.Logger, nilMetrics, dbstore, dbstore)
|
||||
st.Warm()
|
||||
|
||||
t.Run("instance cache has expected entries", func(t *testing.T) {
|
||||
for _, entry := range expectedEntries {
|
||||
@ -153,13 +152,13 @@ func TestAlertingTicker(t *testing.T) {
|
||||
Logger: log.New("ngalert schedule test"),
|
||||
Metrics: metrics.NewMetrics(prometheus.NewRegistry()),
|
||||
}
|
||||
sched := schedule.NewScheduler(schedCfg, nil, "http://localhost")
|
||||
st := state.NewManager(schedCfg.Logger, nilMetrics, dbstore, dbstore)
|
||||
sched := schedule.NewScheduler(schedCfg, nil, "http://localhost", st)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
st := state.NewManager(schedCfg.Logger, nilMetrics)
|
||||
go func() {
|
||||
err := sched.Ticker(ctx, st)
|
||||
err := sched.Ticker(ctx)
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
runtime.Gosched()
|
||||
|
@ -8,23 +8,30 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
||||
ngModels "github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/store"
|
||||
)
|
||||
|
||||
type Manager struct {
|
||||
log log.Logger
|
||||
metrics *metrics.Metrics
|
||||
|
||||
cache *cache
|
||||
quit chan struct{}
|
||||
ResendDelay time.Duration
|
||||
Log log.Logger
|
||||
metrics *metrics.Metrics
|
||||
|
||||
ruleStore store.RuleStore
|
||||
instanceStore store.InstanceStore
|
||||
}
|
||||
|
||||
func NewManager(logger log.Logger, metrics *metrics.Metrics) *Manager {
|
||||
func NewManager(logger log.Logger, metrics *metrics.Metrics, ruleStore store.RuleStore, instanceStore store.InstanceStore) *Manager {
|
||||
manager := &Manager{
|
||||
cache: newCache(logger, metrics),
|
||||
quit: make(chan struct{}),
|
||||
ResendDelay: 1 * time.Minute, // TODO: make this configurable
|
||||
Log: logger,
|
||||
metrics: metrics,
|
||||
cache: newCache(logger, metrics),
|
||||
quit: make(chan struct{}),
|
||||
ResendDelay: 1 * time.Minute, // TODO: make this configurable
|
||||
log: logger,
|
||||
metrics: metrics,
|
||||
ruleStore: ruleStore,
|
||||
instanceStore: instanceStore,
|
||||
}
|
||||
go manager.recordMetrics()
|
||||
return manager
|
||||
@ -34,6 +41,71 @@ func (st *Manager) Close() {
|
||||
st.quit <- struct{}{}
|
||||
}
|
||||
|
||||
func (st *Manager) Warm() {
|
||||
st.log.Info("warming cache for startup")
|
||||
st.ResetCache()
|
||||
|
||||
orgIds, err := st.instanceStore.FetchOrgIds()
|
||||
if err != nil {
|
||||
st.log.Error("unable to fetch orgIds", "msg", err.Error())
|
||||
}
|
||||
|
||||
var states []*State
|
||||
for _, orgId := range orgIds {
|
||||
// Get Rules
|
||||
ruleCmd := ngModels.ListAlertRulesQuery{
|
||||
OrgID: orgId,
|
||||
}
|
||||
if err := st.ruleStore.GetOrgAlertRules(&ruleCmd); err != nil {
|
||||
st.log.Error("unable to fetch previous state", "msg", err.Error())
|
||||
}
|
||||
|
||||
ruleByUID := make(map[string]*ngModels.AlertRule, len(ruleCmd.Result))
|
||||
for _, rule := range ruleCmd.Result {
|
||||
ruleByUID[rule.UID] = rule
|
||||
}
|
||||
|
||||
// Get Instances
|
||||
cmd := ngModels.ListAlertInstancesQuery{
|
||||
RuleOrgID: orgId,
|
||||
}
|
||||
if err := st.instanceStore.ListAlertInstances(&cmd); err != nil {
|
||||
st.log.Error("unable to fetch previous state", "msg", err.Error())
|
||||
}
|
||||
|
||||
for _, entry := range cmd.Result {
|
||||
ruleForEntry, ok := ruleByUID[entry.RuleUID]
|
||||
if !ok {
|
||||
st.log.Error("rule not found for instance, ignoring", "rule", entry.RuleUID)
|
||||
continue
|
||||
}
|
||||
|
||||
lbs := map[string]string(entry.Labels)
|
||||
cacheId, err := entry.Labels.StringKey()
|
||||
if err != nil {
|
||||
st.log.Error("error getting cacheId for entry", "msg", err.Error())
|
||||
}
|
||||
stateForEntry := &State{
|
||||
AlertRuleUID: entry.RuleUID,
|
||||
OrgID: entry.RuleOrgID,
|
||||
CacheId: cacheId,
|
||||
Labels: lbs,
|
||||
State: translateInstanceState(entry.CurrentState),
|
||||
Results: []Evaluation{},
|
||||
StartsAt: entry.CurrentStateSince,
|
||||
EndsAt: entry.CurrentStateEnd,
|
||||
LastEvaluationTime: entry.LastEvalTime,
|
||||
Annotations: ruleForEntry.Annotations,
|
||||
}
|
||||
states = append(states, stateForEntry)
|
||||
}
|
||||
}
|
||||
|
||||
for _, s := range states {
|
||||
st.set(s)
|
||||
}
|
||||
}
|
||||
|
||||
func (st *Manager) getOrCreate(alertRule *ngModels.AlertRule, result eval.Result) *State {
|
||||
return st.cache.getOrCreate(alertRule, result)
|
||||
}
|
||||
@ -57,13 +129,13 @@ func (st *Manager) RemoveByRuleUID(orgID int64, ruleUID string) {
|
||||
}
|
||||
|
||||
func (st *Manager) ProcessEvalResults(alertRule *ngModels.AlertRule, results eval.Results) []*State {
|
||||
st.Log.Debug("state manager processing evaluation results", "uid", alertRule.UID, "resultCount", len(results))
|
||||
st.log.Debug("state manager processing evaluation results", "uid", alertRule.UID, "resultCount", len(results))
|
||||
var states []*State
|
||||
for _, result := range results {
|
||||
s := st.setNextState(alertRule, result)
|
||||
states = append(states, s)
|
||||
}
|
||||
st.Log.Debug("returning changed states to scheduler", "count", len(states))
|
||||
st.log.Debug("returning changed states to scheduler", "count", len(states))
|
||||
return states
|
||||
}
|
||||
|
||||
@ -80,7 +152,7 @@ func (st *Manager) setNextState(alertRule *ngModels.AlertRule, result eval.Resul
|
||||
})
|
||||
currentState.TrimResults(alertRule)
|
||||
|
||||
st.Log.Debug("setting alert state", "uid", alertRule.UID)
|
||||
st.log.Debug("setting alert state", "uid", alertRule.UID)
|
||||
switch result.State {
|
||||
case eval.Normal:
|
||||
currentState.resultNormal(result)
|
||||
@ -113,10 +185,10 @@ func (st *Manager) recordMetrics() {
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
st.Log.Info("recording state cache metrics", "now", time.Now())
|
||||
st.log.Info("recording state cache metrics", "now", time.Now())
|
||||
st.cache.recordMetrics()
|
||||
case <-st.quit:
|
||||
st.Log.Debug("stopping state cache metrics recording", "now", time.Now())
|
||||
st.log.Debug("stopping state cache metrics recording", "now", time.Now())
|
||||
ticker.Stop()
|
||||
return
|
||||
}
|
||||
@ -128,3 +200,14 @@ func (st *Manager) Put(states []*State) {
|
||||
st.set(s)
|
||||
}
|
||||
}
|
||||
|
||||
func translateInstanceState(state ngModels.InstanceStateType) eval.State {
|
||||
switch {
|
||||
case state == ngModels.InstanceStateFiring:
|
||||
return eval.Alerting
|
||||
case state == ngModels.InstanceStateNormal:
|
||||
return eval.Normal
|
||||
default:
|
||||
return eval.Error
|
||||
}
|
||||
}
|
||||
|
@ -826,7 +826,7 @@ func TestProcessEvalResults(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
st := state.NewManager(log.New("test_state_manager"), nilMetrics)
|
||||
st := state.NewManager(log.New("test_state_manager"), nilMetrics, nil, nil)
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
for _, res := range tc.evalResults {
|
||||
_ = st.ProcessEvalResults(tc.alertRule, res)
|
||||
|
Loading…
Reference in New Issue
Block a user