grafana/pkg/services/alerting/engine.go

110 lines
2.4 KiB
Go
Raw Normal View History

package alerting
import (
"time"
"github.com/benbjohnson/clock"
"github.com/grafana/grafana/pkg/log"
)
type Engine struct {
execQueue chan *AlertJob
resultQueue chan *AlertResultContext
clock clock.Clock
ticker *Ticker
scheduler Scheduler
handler AlertHandler
ruleReader RuleReader
log log.Logger
responseHandler ResultHandler
}
func NewEngine() *Engine {
e := &Engine{
ticker: NewTicker(time.Now(), time.Second*0, clock.New()),
execQueue: make(chan *AlertJob, 1000),
resultQueue: make(chan *AlertResultContext, 1000),
scheduler: NewScheduler(),
handler: NewHandler(),
ruleReader: NewRuleReader(),
log: log.New("alerting.engine"),
responseHandler: NewResultHandler(),
}
return e
}
func (e *Engine) Start() {
e.log.Info("Starting Alerting Engine")
2016-06-06 07:24:14 -05:00
go e.alertingTicker()
go e.execDispatch()
go e.resultHandler()
}
func (e *Engine) Stop() {
close(e.execQueue)
close(e.resultQueue)
}
2016-06-06 07:24:14 -05:00
func (e *Engine) alertingTicker() {
defer func() {
if err := recover(); err != nil {
2016-06-22 06:43:11 -05:00
e.log.Error("Scheduler Panic: stopping alertingTicker", "error", err, "stack", log.Stack(1))
}
}()
tickIndex := 0
for {
select {
case tick := <-e.ticker.C:
// TEMP SOLUTION update rules ever tenth tick
if tickIndex%10 == 0 {
e.scheduler.Update(e.ruleReader.Fetch())
}
e.scheduler.Tick(tick, e.execQueue)
2016-06-06 07:24:14 -05:00
tickIndex++
}
}
}
func (e *Engine) execDispatch() {
2016-06-22 06:43:11 -05:00
defer func() {
if err := recover(); err != nil {
e.log.Error("Scheduler Panic: stopping executor", "error", err, "stack", log.Stack(1))
}
}()
for job := range e.execQueue {
log.Trace("Alerting: engine:execDispatch() starting job %s", job.Rule.Name)
e.executeJob(job)
}
}
func (e *Engine) executeJob(job *AlertJob) {
job.Running = true
context := NewAlertResultContext(job.Rule)
e.handler.Execute(context)
job.Running = false
}
func (e *Engine) resultHandler() {
2016-06-22 06:43:11 -05:00
defer func() {
if err := recover(); err != nil {
e.log.Error("Engine Panic, stopping resultHandler", "error", err, "stack", log.Stack(1))
}
}()
for result := range e.resultQueue {
e.log.Debug("Alert Rule Result", "ruleId", result.Rule.Id, "triggered", result.Triggered)
if result.Error != nil {
e.log.Error("Alert Rule Result Error", "ruleId", result.Rule.Id, "error", result.Error, "retry")
} else {
e.responseHandler.Handle(result)
}
}
}