feat(alerting): refactoring

This commit is contained in:
Torkel Ödegaard 2016-07-27 16:29:28 +02:00
parent 6aaf4c97a2
commit 717cce014b
24 changed files with 214 additions and 523 deletions

View File

@ -84,7 +84,7 @@ func AlertTest(c *middleware.Context, dto dtos.AlertTestCommand) Response {
} }
if err := bus.Dispatch(&backendCmd); err != nil { if err := bus.Dispatch(&backendCmd); err != nil {
if validationErr, ok := err.(alerting.AlertValidationError); ok { if validationErr, ok := err.(alerting.ValidationError); ok {
return ApiError(422, validationErr.Error(), nil) return ApiError(422, validationErr.Error(), nil)
} }
return ApiError(500, "Failed to test rule", err) return ApiError(500, "Failed to test rule", err)
@ -139,42 +139,6 @@ func DelAlert(c *middleware.Context) Response {
return Json(200, resp) return Json(200, resp)
} }
// // GET /api/alerts/events/:id
// func GetAlertStates(c *middleware.Context) Response {
// alertId := c.ParamsInt64(":alertId")
//
// query := models.GetAlertsStateQuery{
// AlertId: alertId,
// }
//
// if err := bus.Dispatch(&query); err != nil {
// return ApiError(500, "Failed get alert state log", err)
// }
//
// return Json(200, query.Result)
// }
//
// // PUT /api/alerts/events/:id
// func PutAlertState(c *middleware.Context, cmd models.UpdateAlertStateCommand) Response {
// cmd.AlertId = c.ParamsInt64(":alertId")
// cmd.OrgId = c.OrgId
//
// query := models.GetAlertByIdQuery{Id: cmd.AlertId}
// if err := bus.Dispatch(&query); err != nil {
// return ApiError(500, "Failed to get alertstate", err)
// }
//
// if query.Result.OrgId != 0 && query.Result.OrgId != c.OrgId {
// return ApiError(500, "Alert not found", nil)
// }
//
// if err := bus.Dispatch(&cmd); err != nil {
// return ApiError(500, "Failed to set new state", err)
// }
//
// return Json(200, cmd.Result)
// }
func GetAlertNotifications(c *middleware.Context) Response { func GetAlertNotifications(c *middleware.Context) Response {
query := &models.GetAlertNotificationsQuery{OrgId: c.OrgId} query := &models.GetAlertNotificationsQuery{OrgId: c.OrgId}

View File

@ -33,17 +33,17 @@ func NewDefaultAlertEvaluator(model *simplejson.Json) (*DefaultAlertEvaluator, e
evaluator.Type = model.Get("type").MustString() evaluator.Type = model.Get("type").MustString()
if evaluator.Type == "" { if evaluator.Type == "" {
return nil, alerting.AlertValidationError{Reason: "Evaluator missing type property"} return nil, alerting.ValidationError{Reason: "Evaluator missing type property"}
} }
params := model.Get("params").MustArray() params := model.Get("params").MustArray()
if len(params) == 0 { if len(params) == 0 {
return nil, alerting.AlertValidationError{Reason: "Evaluator missing threshold parameter"} return nil, alerting.ValidationError{Reason: "Evaluator missing threshold parameter"}
} }
threshold, ok := params[0].(json.Number) threshold, ok := params[0].(json.Number)
if !ok { if !ok {
return nil, alerting.AlertValidationError{Reason: "Evaluator has invalid threshold parameter"} return nil, alerting.ValidationError{Reason: "Evaluator has invalid threshold parameter"}
} }
evaluator.Threshold, _ = threshold.Float64() evaluator.Threshold, _ = threshold.Float64()

View File

@ -11,7 +11,7 @@ import (
) )
func init() { func init() {
alerting.RegisterCondition("query", func(model *simplejson.Json, index int) (alerting.AlertCondition, error) { alerting.RegisterCondition("query", func(model *simplejson.Json, index int) (alerting.Condition, error) {
return NewQueryCondition(model, index) return NewQueryCondition(model, index)
}) })
} }
@ -31,7 +31,7 @@ type AlertQuery struct {
To string To string
} }
func (c *QueryCondition) Eval(context *alerting.AlertResultContext) { func (c *QueryCondition) Eval(context *alerting.EvalContext) {
seriesList, err := c.executeQuery(context) seriesList, err := c.executeQuery(context)
if err != nil { if err != nil {
context.Error = err context.Error = err
@ -43,13 +43,13 @@ func (c *QueryCondition) Eval(context *alerting.AlertResultContext) {
pass := c.Evaluator.Eval(series, reducedValue) pass := c.Evaluator.Eval(series, reducedValue)
if context.IsTestRun { if context.IsTestRun {
context.Logs = append(context.Logs, &alerting.AlertResultLogEntry{ context.Logs = append(context.Logs, &alerting.ResultLogEntry{
Message: fmt.Sprintf("Condition[%d]: Eval: %v, Metric: %s, Value: %1.3f", c.Index, pass, series.Name, reducedValue), Message: fmt.Sprintf("Condition[%d]: Eval: %v, Metric: %s, Value: %1.3f", c.Index, pass, series.Name, reducedValue),
}) })
} }
if pass { if pass {
context.Events = append(context.Events, &alerting.AlertEvent{ context.Events = append(context.Events, &alerting.Event{
Metric: series.Name, Metric: series.Name,
Value: reducedValue, Value: reducedValue,
}) })
@ -59,7 +59,7 @@ func (c *QueryCondition) Eval(context *alerting.AlertResultContext) {
} }
} }
func (c *QueryCondition) executeQuery(context *alerting.AlertResultContext) (tsdb.TimeSeriesSlice, error) { func (c *QueryCondition) executeQuery(context *alerting.EvalContext) (tsdb.TimeSeriesSlice, error) {
getDsInfo := &m.GetDataSourceByIdQuery{ getDsInfo := &m.GetDataSourceByIdQuery{
Id: c.Query.DatasourceId, Id: c.Query.DatasourceId,
OrgId: context.Rule.OrgId, OrgId: context.Rule.OrgId,
@ -85,7 +85,7 @@ func (c *QueryCondition) executeQuery(context *alerting.AlertResultContext) (tsd
result = append(result, v.Series...) result = append(result, v.Series...)
if context.IsTestRun { if context.IsTestRun {
context.Logs = append(context.Logs, &alerting.AlertResultLogEntry{ context.Logs = append(context.Logs, &alerting.ResultLogEntry{
Message: fmt.Sprintf("Condition[%d]: Query Result", c.Index), Message: fmt.Sprintf("Condition[%d]: Query Result", c.Index),
Data: v.Series, Data: v.Series,
}) })

View File

@ -63,7 +63,7 @@ type queryConditionTestContext struct {
reducer string reducer string
evaluator string evaluator string
series tsdb.TimeSeriesSlice series tsdb.TimeSeriesSlice
result *alerting.AlertResultContext result *alerting.EvalContext
condition *QueryCondition condition *QueryCondition
} }
@ -107,8 +107,8 @@ func queryConditionScenario(desc string, fn queryConditionScenarioFunc) {
}) })
ctx := &queryConditionTestContext{} ctx := &queryConditionTestContext{}
ctx.result = &alerting.AlertResultContext{ ctx.result = &alerting.EvalContext{
Rule: &alerting.AlertRule{}, Rule: &alerting.Rule{},
} }
fn(ctx) fn(ctx)

View File

@ -8,27 +8,27 @@ import (
) )
type Engine struct { type Engine struct {
execQueue chan *AlertJob execQueue chan *Job
resultQueue chan *AlertResultContext resultQueue chan *EvalContext
clock clock.Clock clock clock.Clock
ticker *Ticker ticker *Ticker
scheduler Scheduler scheduler Scheduler
handler AlertHandler evalHandler EvalHandler
ruleReader RuleReader ruleReader RuleReader
log log.Logger log log.Logger
responseHandler ResultHandler resultHandler ResultHandler
} }
func NewEngine() *Engine { func NewEngine() *Engine {
e := &Engine{ e := &Engine{
ticker: NewTicker(time.Now(), time.Second*0, clock.New()), ticker: NewTicker(time.Now(), time.Second*0, clock.New()),
execQueue: make(chan *AlertJob, 1000), execQueue: make(chan *Job, 1000),
resultQueue: make(chan *AlertResultContext, 1000), resultQueue: make(chan *EvalContext, 1000),
scheduler: NewScheduler(), scheduler: NewScheduler(),
handler: NewHandler(), evalHandler: NewEvalHandler(),
ruleReader: NewRuleReader(), ruleReader: NewRuleReader(),
log: log.New("alerting.engine"), log: log.New("alerting.engine"),
responseHandler: NewResultHandler(), resultHandler: NewResultHandler(),
} }
return e return e
@ -39,7 +39,7 @@ func (e *Engine) Start() {
go e.alertingTicker() go e.alertingTicker()
go e.execDispatch() go e.execDispatch()
go e.resultHandler() go e.resultDispatch()
} }
func (e *Engine) Stop() { func (e *Engine) Stop() {
@ -77,7 +77,7 @@ func (e *Engine) execDispatch() {
} }
} }
func (e *Engine) executeJob(job *AlertJob) { func (e *Engine) executeJob(job *Job) {
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
e.log.Error("Execute Alert Panic", "error", err, "stack", log.Stack(1)) e.log.Error("Execute Alert Panic", "error", err, "stack", log.Stack(1))
@ -85,14 +85,14 @@ func (e *Engine) executeJob(job *AlertJob) {
}() }()
job.Running = true job.Running = true
context := NewAlertResultContext(job.Rule) context := NewEvalContext(job.Rule)
e.handler.Execute(context) e.evalHandler.Eval(context)
job.Running = false job.Running = false
e.resultQueue <- context e.resultQueue <- context
} }
func (e *Engine) resultHandler() { func (e *Engine) resultDispatch() {
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
e.log.Error("Engine Panic, stopping resultHandler", "error", err, "stack", log.Stack(1)) e.log.Error("Engine Panic, stopping resultHandler", "error", err, "stack", log.Stack(1))
@ -105,7 +105,7 @@ func (e *Engine) resultHandler() {
if result.Error != nil { if result.Error != nil {
e.log.Error("Alert Rule Result Error", "ruleId", result.Rule.Id, "error", result.Error, "retry") e.log.Error("Alert Rule Result Error", "ruleId", result.Rule.Id, "error", result.Error, "retry")
} else { } else {
e.responseHandler.Handle(result) e.resultHandler.Handle(result)
} }
} }
} }

View File

@ -0,0 +1,59 @@
package alerting
import (
"fmt"
"time"
"github.com/grafana/grafana/pkg/log"
)
var (
descriptionFmt = "Actual value: %1.2f for %s. "
)
type DefaultEvalHandler struct {
log log.Logger
alertJobTimeout time.Duration
}
func NewEvalHandler() *DefaultEvalHandler {
return &DefaultEvalHandler{
log: log.New("alerting.handler"),
alertJobTimeout: time.Second * 5,
}
}
func (e *DefaultEvalHandler) Eval(context *EvalContext) {
go e.eval(context)
select {
case <-time.After(e.alertJobTimeout):
context.Error = fmt.Errorf("Timeout")
context.EndTime = time.Now()
e.log.Debug("Job Execution timeout", "alertId", context.Rule.Id)
case <-context.DoneChan:
e.log.Debug("Job Execution done", "timeMs", context.GetDurationMs(), "alertId", context.Rule.Id, "firing", context.Firing)
}
}
func (e *DefaultEvalHandler) eval(context *EvalContext) {
for _, condition := range context.Rule.Conditions {
condition.Eval(context)
// break if condition could not be evaluated
if context.Error != nil {
break
}
// break if result has not triggered yet
if context.Firing == false {
break
}
}
context.EndTime = time.Now()
context.DoneChan <- true
}

View File

@ -0,0 +1,45 @@
package alerting
import (
"testing"
. "github.com/smartystreets/goconvey/convey"
)
type conditionStub struct {
firing bool
}
func (c *conditionStub) Eval(context *EvalContext) {
context.Firing = c.firing
}
func TestAlertingExecutor(t *testing.T) {
Convey("Test alert execution", t, func() {
handler := NewEvalHandler()
Convey("Show return triggered with single passing condition", func() {
context := NewEvalContext(&Rule{
Conditions: []Condition{&conditionStub{
firing: true,
}},
})
handler.eval(context)
So(context.Firing, ShouldEqual, true)
})
Convey("Show return false with not passing condition", func() {
context := NewEvalContext(&Rule{
Conditions: []Condition{
&conditionStub{firing: true},
&conditionStub{firing: false},
},
})
handler.eval(context)
So(context.Firing, ShouldEqual, false)
})
})
}

View File

@ -94,7 +94,7 @@ func (e *DashAlertExtractor) GetAlerts() ([]*m.Alert, error) {
} }
if !alert.Severity.IsValid() { if !alert.Severity.IsValid() {
return nil, AlertValidationError{Reason: "Invalid alert Severity"} return nil, ValidationError{Reason: "Invalid alert Severity"}
} }
for _, condition := range jsonAlert.Get("conditions").MustArray() { for _, condition := range jsonAlert.Get("conditions").MustArray() {
@ -105,7 +105,7 @@ func (e *DashAlertExtractor) GetAlerts() ([]*m.Alert, error) {
panelQuery := findPanelQueryByRefId(panel, queryRefId) panelQuery := findPanelQueryByRefId(panel, queryRefId)
if panelQuery == nil { if panelQuery == nil {
return nil, AlertValidationError{Reason: "Alert refes to query that cannot be found"} return nil, ValidationError{Reason: "Alert refes to query that cannot be found"}
} }
dsName := "" dsName := ""
@ -127,7 +127,7 @@ func (e *DashAlertExtractor) GetAlerts() ([]*m.Alert, error) {
alert.Settings = jsonAlert alert.Settings = jsonAlert
// validate // validate
_, err := NewAlertRuleFromDBModel(alert) _, err := NewRuleFromDBAlert(alert)
if err == nil && alert.ValidToSave() { if err == nil && alert.ValidToSave() {
alerts = append(alerts, alert) alerts = append(alerts, alert)
} else { } else {

View File

@ -13,7 +13,7 @@ func TestAlertRuleExtraction(t *testing.T) {
Convey("Parsing alert rules from dashboard json", t, func() { Convey("Parsing alert rules from dashboard json", t, func() {
RegisterCondition("query", func(model *simplejson.Json, index int) (AlertCondition, error) { RegisterCondition("query", func(model *simplejson.Json, index int) (Condition, error) {
return &FakeCondition{}, nil return &FakeCondition{}, nil
}) })

View File

@ -1,159 +0,0 @@
package alerting
import (
"fmt"
"time"
"github.com/grafana/grafana/pkg/log"
)
var (
descriptionFmt = "Actual value: %1.2f for %s. "
)
type HandlerImpl struct {
log log.Logger
alertJobTimeout time.Duration
}
func NewHandler() *HandlerImpl {
return &HandlerImpl{
log: log.New("alerting.handler"),
alertJobTimeout: time.Second * 5,
}
}
func (e *HandlerImpl) Execute(context *AlertResultContext) {
go e.eval(context)
select {
case <-time.After(e.alertJobTimeout):
context.Error = fmt.Errorf("Timeout")
context.EndTime = time.Now()
e.log.Debug("Job Execution timeout", "alertId", context.Rule.Id)
case <-context.DoneChan:
e.log.Debug("Job Execution done", "timeMs", context.GetDurationMs(), "alertId", context.Rule.Id, "firing", context.Firing)
}
}
func (e *HandlerImpl) eval(context *AlertResultContext) {
for _, condition := range context.Rule.Conditions {
condition.Eval(context)
// break if condition could not be evaluated
if context.Error != nil {
break
}
// break if result has not triggered yet
if context.Firing == false {
break
}
}
context.EndTime = time.Now()
context.DoneChan <- true
}
// func (e *HandlerImpl) executeQuery(job *AlertJob) (tsdb.TimeSeriesSlice, error) {
// getDsInfo := &m.GetDataSourceByIdQuery{
// Id: job.Rule.Query.DatasourceId,
// OrgId: job.Rule.OrgId,
// }
//
// if err := bus.Dispatch(getDsInfo); err != nil {
// return nil, fmt.Errorf("Could not find datasource")
// }
//
// req := e.GetRequestForAlertRule(job.Rule, getDsInfo.Result)
// result := make(tsdb.TimeSeriesSlice, 0)
//
// resp, err := tsdb.HandleRequest(req)
// if err != nil {
// return nil, fmt.Errorf("Alerting: GetSeries() tsdb.HandleRequest() error %v", err)
// }
//
// for _, v := range resp.Results {
// if v.Error != nil {
// return nil, fmt.Errorf("Alerting: GetSeries() tsdb.HandleRequest() response error %v", v)
// }
//
// result = append(result, v.Series...)
// }
//
// return result, nil
// }
//
// func (e *HandlerImpl) GetRequestForAlertRule(rule *AlertRule, datasource *m.DataSource) *tsdb.Request {
// e.log.Debug("GetRequest", "query", rule.Query.Query, "from", rule.Query.From, "datasourceId", datasource.Id)
// req := &tsdb.Request{
// TimeRange: tsdb.TimeRange{
// From: "-" + rule.Query.From,
// To: rule.Query.To,
// },
// Queries: []*tsdb.Query{
// {
// RefId: "A",
// Query: rule.Query.Query,
// DataSource: &tsdb.DataSourceInfo{
// Id: datasource.Id,
// Name: datasource.Name,
// PluginId: datasource.Type,
// Url: datasource.Url,
// },
// },
// },
// }
//
// return req
// }
//
// func (e *HandlerImpl) evaluateRule(rule *AlertRule, series tsdb.TimeSeriesSlice) *AlertResult {
// e.log.Debug("Evaluating Alerting Rule", "seriesCount", len(series), "ruleName", rule.Name)
//
// triggeredAlert := make([]*TriggeredAlert, 0)
//
// for _, serie := range series {
// e.log.Debug("Evaluating series", "series", serie.Name)
// transformedValue, _ := rule.Transformer.Transform(serie)
//
// critResult := evalCondition(rule.Critical, transformedValue)
// condition2 := fmt.Sprintf("%v %s %v ", transformedValue, rule.Critical.Operator, rule.Critical.Value)
// e.log.Debug("Alert execution Crit", "name", serie.Name, "condition", condition2, "result", critResult)
// if critResult {
// triggeredAlert = append(triggeredAlert, &TriggeredAlert{
// State: alertstates.Critical,
// Value: transformedValue,
// Metric: serie.Name,
// })
// continue
// }
//
// warnResult := evalCondition(rule.Warning, transformedValue)
// condition := fmt.Sprintf("%v %s %v ", transformedValue, rule.Warning.Operator, rule.Warning.Value)
// e.log.Debug("Alert execution Warn", "name", serie.Name, "condition", condition, "result", warnResult)
// if warnResult {
// triggeredAlert = append(triggeredAlert, &TriggeredAlert{
// State: alertstates.Warn,
// Value: transformedValue,
// Metric: serie.Name,
// })
// }
// }
//
// executionState := alertstates.Ok
// for _, raised := range triggeredAlert {
// if raised.State == alertstates.Critical {
// executionState = alertstates.Critical
// }
//
// if executionState != alertstates.Critical && raised.State == alertstates.Warn {
// executionState = alertstates.Warn
// }
// }
//
// return &AlertResult{State: executionState, TriggeredAlerts: triggeredAlert}
// }

View File

@ -1,164 +0,0 @@
package alerting
import (
"testing"
. "github.com/smartystreets/goconvey/convey"
)
type conditionStub struct {
firing bool
}
func (c *conditionStub) Eval(context *AlertResultContext) {
context.Firing = c.firing
}
func TestAlertingExecutor(t *testing.T) {
Convey("Test alert execution", t, func() {
handler := NewHandler()
Convey("Show return triggered with single passing condition", func() {
context := NewAlertResultContext(&AlertRule{
Conditions: []AlertCondition{&conditionStub{
firing: true,
}},
})
handler.eval(context)
So(context.Firing, ShouldEqual, true)
})
Convey("Show return false with not passing condition", func() {
context := NewAlertResultContext(&AlertRule{
Conditions: []AlertCondition{
&conditionStub{firing: true},
&conditionStub{firing: false},
},
})
handler.eval(context)
So(context.Firing, ShouldEqual, false)
})
// Convey("Show return critical since below 2", func() {
// rule := &AlertRule{
// Critical: Level{Value: 10, Operator: "<"},
// Transformer: transformers.NewAggregationTransformer("avg"),
// }
//
// timeSeries := []*tsdb.TimeSeries{
// tsdb.NewTimeSeries("test1", [][2]float64{{2, 0}}),
// }
//
// result := executor.evaluateRule(rule, timeSeries)
// So(result.State, ShouldEqual, alertstates.Critical)
// })
//
// Convey("Show return critical since sum is above 10", func() {
// rule := &AlertRule{
// Critical: Level{Value: 10, Operator: ">"},
// Transformer: transformers.NewAggregationTransformer("sum"),
// }
//
// timeSeries := []*tsdb.TimeSeries{
// tsdb.NewTimeSeries("test1", [][2]float64{{9, 0}, {9, 0}}),
// }
//
// result := executor.evaluateRule(rule, timeSeries)
// So(result.State, ShouldEqual, alertstates.Critical)
// })
//
// Convey("Show return ok since avg is below 10", func() {
// rule := &AlertRule{
// Critical: Level{Value: 10, Operator: ">"},
// Transformer: transformers.NewAggregationTransformer("avg"),
// }
//
// timeSeries := []*tsdb.TimeSeries{
// tsdb.NewTimeSeries("test1", [][2]float64{{9, 0}, {9, 0}}),
// }
//
// result := executor.evaluateRule(rule, timeSeries)
// So(result.State, ShouldEqual, alertstates.Ok)
// })
//
// Convey("Show return ok since min is below 10", func() {
// rule := &AlertRule{
// Critical: Level{Value: 10, Operator: ">"},
// Transformer: transformers.NewAggregationTransformer("avg"),
// }
//
// timeSeries := []*tsdb.TimeSeries{
// tsdb.NewTimeSeries("test1", [][2]float64{{11, 0}, {9, 0}}),
// }
//
// result := executor.evaluateRule(rule, timeSeries)
// So(result.State, ShouldEqual, alertstates.Ok)
// })
//
// Convey("Show return ok since max is above 10", func() {
// rule := &AlertRule{
// Critical: Level{Value: 10, Operator: ">"},
// Transformer: transformers.NewAggregationTransformer("max"),
// }
//
// timeSeries := []*tsdb.TimeSeries{
// tsdb.NewTimeSeries("test1", [][2]float64{{6, 0}, {11, 0}}),
// }
//
// result := executor.evaluateRule(rule, timeSeries)
// So(result.State, ShouldEqual, alertstates.Critical)
// })
//
// })
//
// Convey("muliple time series", func() {
// Convey("both are ok", func() {
// rule := &AlertRule{
// Critical: Level{Value: 10, Operator: ">"},
// Transformer: transformers.NewAggregationTransformer("avg"),
// }
//
// timeSeries := []*tsdb.TimeSeries{
// tsdb.NewTimeSeries("test1", [][2]float64{{2, 0}}),
// tsdb.NewTimeSeries("test1", [][2]float64{{2, 0}}),
// }
//
// result := executor.evaluateRule(rule, timeSeries)
// So(result.State, ShouldEqual, alertstates.Ok)
// })
//
// Convey("first serie is good, second is critical", func() {
// rule := &AlertRule{
// Critical: Level{Value: 10, Operator: ">"},
// Transformer: transformers.NewAggregationTransformer("avg"),
// }
//
// timeSeries := []*tsdb.TimeSeries{
// tsdb.NewTimeSeries("test1", [][2]float64{{2, 0}}),
// tsdb.NewTimeSeries("test1", [][2]float64{{11, 0}}),
// }
//
// result := executor.evaluateRule(rule, timeSeries)
// So(result.State, ShouldEqual, alertstates.Critical)
// })
//
// Convey("first serie is warn, second is critical", func() {
// rule := &AlertRule{
// Critical: Level{Value: 10, Operator: ">"},
// Warning: Level{Value: 5, Operator: ">"},
// Transformer: transformers.NewAggregationTransformer("avg"),
// }
//
// timeSeries := []*tsdb.TimeSeries{
// tsdb.NewTimeSeries("test1", [][2]float64{{6, 0}}),
// tsdb.NewTimeSeries("test1", [][2]float64{{11, 0}}),
// }
//
// result := executor.evaluateRule(rule, timeSeries)
// So(result.State, ShouldEqual, alertstates.Critical)
// })
// })
})
}

View File

@ -2,20 +2,20 @@ package alerting
import "time" import "time"
type AlertHandler interface { type EvalHandler interface {
Execute(context *AlertResultContext) Eval(context *EvalContext)
} }
type Scheduler interface { type Scheduler interface {
Tick(time time.Time, execQueue chan *AlertJob) Tick(time time.Time, execQueue chan *Job)
Update(rules []*AlertRule) Update(rules []*Rule)
} }
type Notifier interface { type Notifier interface {
Notify(alertResult *AlertResultContext) Notify(alertResult *EvalContext)
GetType() string GetType() string
} }
type AlertCondition interface { type Condition interface {
Eval(result *AlertResultContext) Eval(result *EvalContext)
} }

View File

@ -6,50 +6,50 @@ import (
"github.com/grafana/grafana/pkg/log" "github.com/grafana/grafana/pkg/log"
) )
type AlertJob struct { type Job struct {
Offset int64 Offset int64
Delay bool Delay bool
Running bool Running bool
Rule *AlertRule Rule *Rule
} }
type AlertResultContext struct { type EvalContext struct {
Firing bool Firing bool
IsTestRun bool IsTestRun bool
Events []*AlertEvent Events []*Event
Logs []*AlertResultLogEntry Logs []*ResultLogEntry
Error error Error error
Description string Description string
StartTime time.Time StartTime time.Time
EndTime time.Time EndTime time.Time
Rule *AlertRule Rule *Rule
DoneChan chan bool DoneChan chan bool
CancelChan chan bool CancelChan chan bool
log log.Logger log log.Logger
} }
func (a *AlertResultContext) GetDurationMs() float64 { func (a *EvalContext) GetDurationMs() float64 {
return float64(a.EndTime.Nanosecond()-a.StartTime.Nanosecond()) / float64(1000000) return float64(a.EndTime.Nanosecond()-a.StartTime.Nanosecond()) / float64(1000000)
} }
func NewAlertResultContext(rule *AlertRule) *AlertResultContext { func NewEvalContext(rule *Rule) *EvalContext {
return &AlertResultContext{ return &EvalContext{
StartTime: time.Now(), StartTime: time.Now(),
Rule: rule, Rule: rule,
Logs: make([]*AlertResultLogEntry, 0), Logs: make([]*ResultLogEntry, 0),
Events: make([]*AlertEvent, 0), Events: make([]*Event, 0),
DoneChan: make(chan bool, 1), DoneChan: make(chan bool, 1),
CancelChan: make(chan bool, 1), CancelChan: make(chan bool, 1),
log: log.New("alerting.engine"), log: log.New("alerting.engine"),
} }
} }
type AlertResultLogEntry struct { type ResultLogEntry struct {
Message string Message string
Data interface{} Data interface{}
} }
type AlertEvent struct { type Event struct {
Value float64 Value float64
Metric string Metric string
State string State string

View File

@ -22,7 +22,7 @@ func (n *RootNotifier) GetType() string {
return "root" return "root"
} }
func (n *RootNotifier) Notify(context *AlertResultContext) { func (n *RootNotifier) Notify(context *EvalContext) {
n.log.Info("Sending notifications for", "ruleId", context.Rule.Id) n.log.Info("Sending notifications for", "ruleId", context.Rule.Id)
notifiers, err := n.getNotifiers(context.Rule.OrgId, context.Rule.Notifications) notifiers, err := n.getNotifiers(context.Rule.OrgId, context.Rule.Notifications)
@ -63,62 +63,8 @@ func (n *RootNotifier) getNotifierFor(model *m.AlertNotification) (Notifier, err
} }
return factory(model) return factory(model)
// if model.Type == "email" {
// addressesString := model.Settings.Get("addresses").MustString()
//
// if addressesString == "" {
// return nil, fmt.Errorf("Could not find addresses in settings")
// }
//
// NotifierBase: NotifierBase{
// Name: model.Name,
// Type: model.Type,
// },
// Addresses: strings.Split(addressesString, "\n"),
// log: log.New("alerting.notification.email"),
// }, nil
// }
// url := settings.Get("url").MustString()
// if url == "" {
// return nil, fmt.Errorf("Could not find url propertie in settings")
// }
//
// return &WebhookNotifier{
// Url: url,
// User: settings.Get("user").MustString(),
// Password: settings.Get("password").MustString(),
// log: log.New("alerting.notification.webhook"),
// }, nil
} }
// type WebhookNotifier struct {
// Url string
// User string
// Password string
// log log.Logger
// }
//
// func (this *WebhookNotifier) Dispatch(context *AlertResultContext) {
// this.log.Info("Sending webhook")
//
// bodyJSON := simplejson.New()
// bodyJSON.Set("name", context.AlertJob.Rule.Name)
// bodyJSON.Set("state", context.State)
// bodyJSON.Set("trigged", context.TriggeredAlerts)
//
// body, _ := bodyJSON.MarshalJSON()
//
// cmd := &m.SendWebhook{
// Url: this.Url,
// User: this.User,
// Password: this.Password,
// Body: string(body),
// }
//
// bus.Dispatch(cmd)
// }
type NotifierFactory func(notification *m.AlertNotification) (Notifier, error) type NotifierFactory func(notification *m.AlertNotification) (Notifier, error)
var notifierFactories map[string]NotifierFactory = make(map[string]NotifierFactory) var notifierFactories map[string]NotifierFactory = make(map[string]NotifierFactory)

View File

@ -9,7 +9,7 @@ import (
"github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/setting"
) )
func getRuleLink(rule *alerting.AlertRule) (string, error) { func getRuleLink(rule *alerting.Rule) (string, error) {
slugQuery := &m.GetDashboardSlugByIdQuery{Id: rule.DashboardId} slugQuery := &m.GetDashboardSlugByIdQuery{Id: rule.DashboardId}
if err := bus.Dispatch(slugQuery); err != nil { if err := bus.Dispatch(slugQuery); err != nil {
return "", err return "", err

View File

@ -23,7 +23,7 @@ func NewEmailNotifier(model *m.AlertNotification) (alerting.Notifier, error) {
addressesString := model.Settings.Get("addresses").MustString() addressesString := model.Settings.Get("addresses").MustString()
if addressesString == "" { if addressesString == "" {
return nil, alerting.AlertValidationError{Reason: "Could not find addresses in settings"} return nil, alerting.ValidationError{Reason: "Could not find addresses in settings"}
} }
return &EmailNotifier{ return &EmailNotifier{
@ -36,7 +36,7 @@ func NewEmailNotifier(model *m.AlertNotification) (alerting.Notifier, error) {
}, nil }, nil
} }
func (this *EmailNotifier) Notify(context *alerting.AlertResultContext) { func (this *EmailNotifier) Notify(context *alerting.EvalContext) {
this.log.Info("Sending alert notification to", "addresses", this.Addresses) this.log.Info("Sending alert notification to", "addresses", this.Addresses)
ruleLink, err := getRuleLink(context.Rule) ruleLink, err := getRuleLink(context.Rule)

View File

@ -17,7 +17,7 @@ func init() {
func NewSlackNotifier(model *m.AlertNotification) (alerting.Notifier, error) { func NewSlackNotifier(model *m.AlertNotification) (alerting.Notifier, error) {
url := model.Settings.Get("url").MustString() url := model.Settings.Get("url").MustString()
if url == "" { if url == "" {
return nil, alerting.AlertValidationError{Reason: "Could not find url property in settings"} return nil, alerting.ValidationError{Reason: "Could not find url property in settings"}
} }
return &SlackNotifier{ return &SlackNotifier{
@ -36,7 +36,7 @@ type SlackNotifier struct {
log log.Logger log log.Logger
} }
func (this *SlackNotifier) Notify(context *alerting.AlertResultContext) { func (this *SlackNotifier) Notify(context *alerting.EvalContext) {
this.log.Info("Executing slack notification", "ruleId", context.Rule.Id, "notification", this.Name) this.log.Info("Executing slack notification", "ruleId", context.Rule.Id, "notification", this.Name)
rule := context.Rule rule := context.Rule

View File

@ -15,7 +15,7 @@ func init() {
func NewWebHookNotifier(model *m.AlertNotification) (alerting.Notifier, error) { func NewWebHookNotifier(model *m.AlertNotification) (alerting.Notifier, error) {
url := model.Settings.Get("url").MustString() url := model.Settings.Get("url").MustString()
if url == "" { if url == "" {
return nil, alerting.AlertValidationError{Reason: "Could not find url property in settings"} return nil, alerting.ValidationError{Reason: "Could not find url property in settings"}
} }
return &WebhookNotifier{ return &WebhookNotifier{
@ -38,7 +38,7 @@ type WebhookNotifier struct {
log log.Logger log log.Logger
} }
func (this *WebhookNotifier) Notify(context *alerting.AlertResultContext) { func (this *WebhookNotifier) Notify(context *alerting.EvalContext) {
this.log.Info("Sending webhook") this.log.Info("Sending webhook")
bodyJSON := simplejson.New() bodyJSON := simplejson.New()

View File

@ -10,10 +10,10 @@ import (
) )
type RuleReader interface { type RuleReader interface {
Fetch() []*AlertRule Fetch() []*Rule
} }
type AlertRuleReader struct { type DefaultRuleReader struct {
sync.RWMutex sync.RWMutex
serverID string serverID string
serverPosition int serverPosition int
@ -21,8 +21,8 @@ type AlertRuleReader struct {
log log.Logger log log.Logger
} }
func NewRuleReader() *AlertRuleReader { func NewRuleReader() *DefaultRuleReader {
ruleReader := &AlertRuleReader{ ruleReader := &DefaultRuleReader{
log: log.New("alerting.ruleReader"), log: log.New("alerting.ruleReader"),
} }
@ -30,7 +30,7 @@ func NewRuleReader() *AlertRuleReader {
return ruleReader return ruleReader
} }
func (arr *AlertRuleReader) initReader() { func (arr *DefaultRuleReader) initReader() {
heartbeat := time.NewTicker(time.Second * 10) heartbeat := time.NewTicker(time.Second * 10)
for { for {
@ -41,17 +41,17 @@ func (arr *AlertRuleReader) initReader() {
} }
} }
func (arr *AlertRuleReader) Fetch() []*AlertRule { func (arr *DefaultRuleReader) Fetch() []*Rule {
cmd := &m.GetAllAlertsQuery{} cmd := &m.GetAllAlertsQuery{}
if err := bus.Dispatch(cmd); err != nil { if err := bus.Dispatch(cmd); err != nil {
arr.log.Error("Could not load alerts", "error", err) arr.log.Error("Could not load alerts", "error", err)
return []*AlertRule{} return []*Rule{}
} }
res := make([]*AlertRule, 0) res := make([]*Rule, 0)
for _, ruleDef := range cmd.Result { for _, ruleDef := range cmd.Result {
if model, err := NewAlertRuleFromDBModel(ruleDef); err != nil { if model, err := NewRuleFromDBAlert(ruleDef); err != nil {
arr.log.Error("Could not build alert model for rule", "ruleId", ruleDef.Id, "error", err) arr.log.Error("Could not build alert model for rule", "ruleId", ruleDef.Id, "error", err)
} else { } else {
res = append(res, model) res = append(res, model)
@ -61,7 +61,7 @@ func (arr *AlertRuleReader) Fetch() []*AlertRule {
return res return res
} }
func (arr *AlertRuleReader) heartbeat() { func (arr *DefaultRuleReader) heartbeat() {
//Lets cheat on this until we focus on clustering //Lets cheat on this until we focus on clustering
//log.Info("Heartbeat: Sending heartbeat from " + this.serverId) //log.Info("Heartbeat: Sending heartbeat from " + this.serverId)

View File

@ -7,22 +7,22 @@ import (
) )
type ResultHandler interface { type ResultHandler interface {
Handle(result *AlertResultContext) Handle(result *EvalContext)
} }
type ResultHandlerImpl struct { type DefaultResultHandler struct {
notifier Notifier notifier Notifier
log log.Logger log log.Logger
} }
func NewResultHandler() *ResultHandlerImpl { func NewResultHandler() *DefaultResultHandler {
return &ResultHandlerImpl{ return &DefaultResultHandler{
log: log.New("alerting.resultHandler"), log: log.New("alerting.resultHandler"),
notifier: NewRootNotifier(), notifier: NewRootNotifier(),
} }
} }
func (handler *ResultHandlerImpl) Handle(result *AlertResultContext) { func (handler *DefaultResultHandler) Handle(result *EvalContext) {
var newState m.AlertStateType var newState m.AlertStateType
if result.Error != nil { if result.Error != nil {

View File

@ -10,7 +10,7 @@ import (
m "github.com/grafana/grafana/pkg/models" m "github.com/grafana/grafana/pkg/models"
) )
type AlertRule struct { type Rule struct {
Id int64 Id int64
OrgId int64 OrgId int64
DashboardId int64 DashboardId int64
@ -20,15 +20,15 @@ type AlertRule struct {
Description string Description string
State m.AlertStateType State m.AlertStateType
Severity m.AlertSeverityType Severity m.AlertSeverityType
Conditions []AlertCondition Conditions []Condition
Notifications []int64 Notifications []int64
} }
type AlertValidationError struct { type ValidationError struct {
Reason string Reason string
} }
func (e AlertValidationError) Error() string { func (e ValidationError) Error() string {
return e.Reason return e.Reason
} }
@ -56,8 +56,8 @@ func getTimeDurationStringToSeconds(str string) int64 {
return int64(value * multiplier) return int64(value * multiplier)
} }
func NewAlertRuleFromDBModel(ruleDef *m.Alert) (*AlertRule, error) { func NewRuleFromDBAlert(ruleDef *m.Alert) (*Rule, error) {
model := &AlertRule{} model := &Rule{}
model.Id = ruleDef.Id model.Id = ruleDef.Id
model.OrgId = ruleDef.OrgId model.OrgId = ruleDef.OrgId
model.DashboardId = ruleDef.DashboardId model.DashboardId = ruleDef.DashboardId
@ -71,7 +71,7 @@ func NewAlertRuleFromDBModel(ruleDef *m.Alert) (*AlertRule, error) {
for _, v := range ruleDef.Settings.Get("notifications").MustArray() { for _, v := range ruleDef.Settings.Get("notifications").MustArray() {
jsonModel := simplejson.NewFromAny(v) jsonModel := simplejson.NewFromAny(v)
if id, err := jsonModel.Get("id").Int64(); err != nil { if id, err := jsonModel.Get("id").Int64(); err != nil {
return nil, AlertValidationError{Reason: "Invalid notification schema"} return nil, ValidationError{Reason: "Invalid notification schema"}
} else { } else {
model.Notifications = append(model.Notifications, id) model.Notifications = append(model.Notifications, id)
} }
@ -81,7 +81,7 @@ func NewAlertRuleFromDBModel(ruleDef *m.Alert) (*AlertRule, error) {
conditionModel := simplejson.NewFromAny(condition) conditionModel := simplejson.NewFromAny(condition)
conditionType := conditionModel.Get("type").MustString() conditionType := conditionModel.Get("type").MustString()
if factory, exist := conditionFactories[conditionType]; !exist { if factory, exist := conditionFactories[conditionType]; !exist {
return nil, AlertValidationError{Reason: "Unknown alert condition: " + conditionType} return nil, ValidationError{Reason: "Unknown alert condition: " + conditionType}
} else { } else {
if queryCondition, err := factory(conditionModel, index); err != nil { if queryCondition, err := factory(conditionModel, index); err != nil {
return nil, err return nil, err
@ -98,7 +98,7 @@ func NewAlertRuleFromDBModel(ruleDef *m.Alert) (*AlertRule, error) {
return model, nil return model, nil
} }
type ConditionFactory func(model *simplejson.Json, index int) (AlertCondition, error) type ConditionFactory func(model *simplejson.Json, index int) (Condition, error)
var conditionFactories map[string]ConditionFactory = make(map[string]ConditionFactory) var conditionFactories map[string]ConditionFactory = make(map[string]ConditionFactory)

View File

@ -10,12 +10,12 @@ import (
type FakeCondition struct{} type FakeCondition struct{}
func (f *FakeCondition) Eval(context *AlertResultContext) {} func (f *FakeCondition) Eval(context *EvalContext) {}
func TestAlertRuleModel(t *testing.T) { func TestAlertRuleModel(t *testing.T) {
Convey("Testing alert rule", t, func() { Convey("Testing alert rule", t, func() {
RegisterCondition("test", func(model *simplejson.Json, index int) (AlertCondition, error) { RegisterCondition("test", func(model *simplejson.Json, index int) (Condition, error) {
return &FakeCondition{}, nil return &FakeCondition{}, nil
}) })
@ -72,7 +72,7 @@ func TestAlertRuleModel(t *testing.T) {
Settings: alertJSON, Settings: alertJSON,
} }
alertRule, err := NewAlertRuleFromDBModel(alert) alertRule, err := NewRuleFromDBAlert(alert)
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(alertRule.Conditions, ShouldHaveLength, 1) So(alertRule.Conditions, ShouldHaveLength, 1)

View File

@ -7,28 +7,28 @@ import (
) )
type SchedulerImpl struct { type SchedulerImpl struct {
jobs map[int64]*AlertJob jobs map[int64]*Job
log log.Logger log log.Logger
} }
func NewScheduler() Scheduler { func NewScheduler() Scheduler {
return &SchedulerImpl{ return &SchedulerImpl{
jobs: make(map[int64]*AlertJob, 0), jobs: make(map[int64]*Job, 0),
log: log.New("alerting.scheduler"), log: log.New("alerting.scheduler"),
} }
} }
func (s *SchedulerImpl) Update(alerts []*AlertRule) { func (s *SchedulerImpl) Update(rules []*Rule) {
s.log.Debug("Scheduling update", "alerts.count", len(alerts)) s.log.Debug("Scheduling update", "rules.count", len(rules))
jobs := make(map[int64]*AlertJob, 0) jobs := make(map[int64]*Job, 0)
for i, rule := range alerts { for i, rule := range rules {
var job *AlertJob var job *Job
if s.jobs[rule.Id] != nil { if s.jobs[rule.Id] != nil {
job = s.jobs[rule.Id] job = s.jobs[rule.Id]
} else { } else {
job = &AlertJob{ job = &Job{
Running: false, Running: false,
} }
} }
@ -42,7 +42,7 @@ func (s *SchedulerImpl) Update(alerts []*AlertRule) {
s.jobs = jobs s.jobs = jobs
} }
func (s *SchedulerImpl) Tick(tickTime time.Time, execQueue chan *AlertJob) { func (s *SchedulerImpl) Tick(tickTime time.Time, execQueue chan *Job) {
now := tickTime.Unix() now := tickTime.Unix()
for _, job := range s.jobs { for _, job := range s.jobs {

View File

@ -13,7 +13,7 @@ type AlertTestCommand struct {
PanelId int64 PanelId int64
OrgId int64 OrgId int64
Result *AlertResultContext Result *EvalContext
} }
func init() { func init() {
@ -32,7 +32,7 @@ func handleAlertTestCommand(cmd *AlertTestCommand) error {
for _, alert := range alerts { for _, alert := range alerts {
if alert.PanelId == cmd.PanelId { if alert.PanelId == cmd.PanelId {
rule, err := NewAlertRuleFromDBModel(alert) rule, err := NewRuleFromDBAlert(alert)
if err != nil { if err != nil {
return err return err
} }
@ -45,13 +45,13 @@ func handleAlertTestCommand(cmd *AlertTestCommand) error {
return fmt.Errorf("Could not find alert with panel id %d", cmd.PanelId) return fmt.Errorf("Could not find alert with panel id %d", cmd.PanelId)
} }
func testAlertRule(rule *AlertRule) *AlertResultContext { func testAlertRule(rule *Rule) *EvalContext {
handler := NewHandler() handler := NewEvalHandler()
context := NewAlertResultContext(rule) context := NewEvalContext(rule)
context.IsTestRun = true context.IsTestRun = true
handler.Execute(context) handler.Eval(context)
return context return context
} }