grafana/pkg/services/alerting/engine.go

147 lines
3.1 KiB
Go
Raw Normal View History

package alerting
import (
"context"
"time"
"github.com/benbjohnson/clock"
"github.com/grafana/grafana/pkg/log"
"golang.org/x/sync/errgroup"
)
type Engine struct {
2016-07-27 09:29:28 -05:00
execQueue chan *Job
resultQueue chan *EvalContext
clock clock.Clock
ticker *Ticker
scheduler Scheduler
evalHandler EvalHandler
ruleReader RuleReader
log log.Logger
resultHandler ResultHandler
}
func NewEngine() *Engine {
e := &Engine{
2016-07-27 09:29:28 -05:00
ticker: NewTicker(time.Now(), time.Second*0, clock.New()),
execQueue: make(chan *Job, 1000),
resultQueue: make(chan *EvalContext, 1000),
scheduler: NewScheduler(),
evalHandler: NewEvalHandler(),
ruleReader: NewRuleReader(),
log: log.New("alerting.engine"),
resultHandler: NewResultHandler(),
}
return e
}
func (e *Engine) Start(grafanaCtx context.Context) error {
e.log.Info("Starting Alerting Engine")
g, grafanaCtx := errgroup.WithContext(grafanaCtx)
g.Go(func() error { return e.alertingTicker(grafanaCtx) })
g.Go(func() error { return e.execDispatcher(grafanaCtx) })
g.Go(func() error { return e.resultDispatcher(grafanaCtx) })
return g.Wait()
}
func (e *Engine) Stop() {
close(e.execQueue)
close(e.resultQueue)
}
func (e *Engine) alertingTicker(grafanaCtx context.Context) error {
defer func() {
if err := recover(); err != nil {
2016-06-22 06:43:11 -05:00
e.log.Error("Scheduler Panic: stopping alertingTicker", "error", err, "stack", log.Stack(1))
}
}()
tickIndex := 0
for {
select {
case <-grafanaCtx.Done():
return grafanaCtx.Err()
case tick := <-e.ticker.C:
// TEMP SOLUTION update rules ever tenth tick
if tickIndex%10 == 0 {
e.scheduler.Update(e.ruleReader.Fetch())
}
e.scheduler.Tick(tick, e.execQueue)
2016-06-06 07:24:14 -05:00
tickIndex++
}
}
}
func (e *Engine) execDispatcher(grafanaCtx context.Context) error {
for {
select {
case <-grafanaCtx.Done():
close(e.resultQueue)
return grafanaCtx.Err()
case job := <-e.execQueue:
go e.executeJob(grafanaCtx, job)
}
}
}
func (e *Engine) executeJob(grafanaCtx context.Context, job *Job) error {
defer func() {
if err := recover(); err != nil {
e.log.Error("Execute Alert Panic", "error", err, "stack", log.Stack(1))
}
}()
done := make(chan *EvalContext, 1)
go func() {
job.Running = true
context := NewEvalContext(job.Rule)
e.evalHandler.Eval(context)
job.Running = false
done <- context
close(done)
}()
select {
case <-grafanaCtx.Done():
return grafanaCtx.Err()
case evalContext := <-done:
e.resultQueue <- evalContext
}
return nil
}
func (e *Engine) resultDispatcher(grafanaCtx context.Context) error {
for {
select {
case <-grafanaCtx.Done():
//handle all responses before shutting down.
for result := range e.resultQueue {
e.handleResponse(result)
}
return grafanaCtx.Err()
case result := <-e.resultQueue:
e.handleResponse(result)
}
}
}
func (e *Engine) handleResponse(result *EvalContext) {
2016-06-22 06:43:11 -05:00
defer func() {
if err := recover(); err != nil {
e.log.Error("Panic in resultDispatcher", "error", err, "stack", log.Stack(1))
2016-06-22 06:43:11 -05:00
}
}()
e.log.Debug("Alert Rule Result", "ruleId", result.Rule.Id, "firing", result.Firing)
e.resultHandler.Handle(result)
}