tech(goroutines): sync state between different go routines

This commit is contained in:
bergquist
2016-09-27 16:38:19 +02:00
parent 8e89173095
commit 34b31aeef8
9 changed files with 221 additions and 41 deletions

View File

@@ -1,10 +1,12 @@
package alerting
import (
"context"
"time"
"github.com/benbjohnson/clock"
"github.com/grafana/grafana/pkg/log"
"golang.org/x/sync/errgroup"
)
type Engine struct {
@@ -34,12 +36,16 @@ func NewEngine() *Engine {
return e
}
func (e *Engine) Start() {
func (e *Engine) Start(grafanaCtx context.Context) error {
e.log.Info("Starting Alerting Engine")
go e.alertingTicker()
go e.execDispatcher()
go e.resultDispatcher()
g, _ := errgroup.WithContext(grafanaCtx)
g.Go(func() error { return e.alertingTicker(grafanaCtx) })
g.Go(func() error { return e.execDispatcher(grafanaCtx) })
g.Go(func() error { return e.resultDispatcher(grafanaCtx) })
return g.Wait()
}
func (e *Engine) Stop() {
@@ -47,7 +53,7 @@ func (e *Engine) Stop() {
close(e.resultQueue)
}
func (e *Engine) alertingTicker() {
func (e *Engine) alertingTicker(grafanaCtx context.Context) error {
defer func() {
if err := recover(); err != nil {
e.log.Error("Scheduler Panic: stopping alertingTicker", "error", err, "stack", log.Stack(1))
@@ -58,6 +64,8 @@ func (e *Engine) alertingTicker() {
for {
select {
case <-grafanaCtx.Done():
return grafanaCtx.Err()
case tick := <-e.ticker.C:
// TEMP SOLUTION update rules ever tenth tick
if tickIndex%10 == 0 {
@@ -70,31 +78,56 @@ func (e *Engine) alertingTicker() {
}
}
func (e *Engine) execDispatcher() {
for job := range e.execQueue {
e.log.Debug("Starting executing alert rule", "alert id", job.Rule.Id)
go e.executeJob(job)
func (e *Engine) execDispatcher(grafanaCtx context.Context) error {
for {
select {
case <-grafanaCtx.Done():
close(e.resultQueue)
return grafanaCtx.Err()
case job := <-e.execQueue:
go e.executeJob(grafanaCtx, job)
}
}
}
func (e *Engine) executeJob(job *Job) {
func (e *Engine) executeJob(grafanaCtx context.Context, job *Job) {
defer func() {
if err := recover(); err != nil {
e.log.Error("Execute Alert Panic", "error", err, "stack", log.Stack(1))
}
}()
job.Running = true
context := NewEvalContext(job.Rule)
e.evalHandler.Eval(context)
job.Running = false
done := make(chan *EvalContext, 1)
go func() {
job.Running = true
context := NewEvalContext(job.Rule)
e.evalHandler.Eval(context)
job.Running = false
done <- context
close(done)
}()
e.resultQueue <- context
select {
case evalContext := <-done:
e.resultQueue <- evalContext
case <-grafanaCtx.Done():
}
}
func (e *Engine) resultDispatcher() {
for result := range e.resultQueue {
go e.handleResponse(result)
func (e *Engine) resultDispatcher(grafanaCtx context.Context) error {
for {
select {
case <-grafanaCtx.Done():
//handle all responses before shutting down.
for result := range e.resultQueue {
e.handleResponse(result)
}
return grafanaCtx.Err()
case result := <-e.resultQueue:
e.handleResponse(result)
}
}
}

View File

@@ -1,6 +1,8 @@
package init
import (
"context"
"github.com/grafana/grafana/pkg/services/alerting"
_ "github.com/grafana/grafana/pkg/services/alerting/conditions"
_ "github.com/grafana/grafana/pkg/services/alerting/notifiers"
@@ -11,11 +13,11 @@ import (
var engine *alerting.Engine
func Init() {
func Init(ctx context.Context) error {
if !setting.AlertingEnabled {
return
return nil
}
engine = alerting.NewEngine()
engine.Start()
return engine.Start(ctx)
}