From 0ce55600bb69aa5e490bc0c6d7f8ef11ee3ee2f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torkel=20=C3=96degaard?= Date: Wed, 20 Jul 2016 14:28:02 +0200 Subject: [PATCH] feat(alerting): progress on alerting backend --- pkg/services/alerting/conditions.go | 105 +++++++++++++++++++++-- pkg/services/alerting/conditions_test.go | 26 +++++- pkg/services/alerting/interfaces.go | 10 ++- pkg/services/alerting/models.go | 8 +- pkg/tsdb/graphite/graphite.go | 4 +- pkg/tsdb/request.go | 2 + 6 files changed, 138 insertions(+), 17 deletions(-) diff --git a/pkg/services/alerting/conditions.go b/pkg/services/alerting/conditions.go index 82b27de0da0..e4818508e6c 100644 --- a/pkg/services/alerting/conditions.go +++ b/pkg/services/alerting/conditions.go @@ -3,25 +3,97 @@ package alerting import ( "encoding/json" "errors" + "fmt" + "github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/components/simplejson" + m "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/tsdb" ) type QueryCondition struct { - Query AlertQuery - Reducer QueryReducer - Evaluator AlertEvaluator + Query AlertQuery + Reducer QueryReducer + Evaluator AlertEvaluator + HandleRequest tsdb.HandleRequestFunc } func (c *QueryCondition) Eval(context *AlertResultContext) { + seriesList, err := c.executeQuery(context) + if err != nil { + context.Error = err + return + } + + for _, series := range seriesList { + reducedValue := c.Reducer.Reduce(series) + pass := c.Evaluator.Eval(series, reducedValue) + if pass { + context.Triggered = true + break + } + } +} + +func (c *QueryCondition) executeQuery(context *AlertResultContext) (tsdb.TimeSeriesSlice, error) { + getDsInfo := &m.GetDataSourceByIdQuery{ + Id: c.Query.DatasourceId, + OrgId: context.Rule.OrgId, + } + + if err := bus.Dispatch(getDsInfo); err != nil { + return nil, fmt.Errorf("Could not find datasource") + } + + req := c.getRequestForAlertRule(getDsInfo.Result) + result := make(tsdb.TimeSeriesSlice, 0) + + resp, err := c.HandleRequest(req) + if err != nil { + return nil, fmt.Errorf("Alerting: GetSeries() tsdb.HandleRequest() error %v", err) + } + + for _, v := range resp.Results { + if v.Error != nil { + return nil, fmt.Errorf("Alerting: GetSeries() tsdb.HandleRequest() response error %v", v) + } + + result = append(result, v.Series...) + } + + return result, nil +} + +func (c *QueryCondition) getRequestForAlertRule(datasource *m.DataSource) *tsdb.Request { + req := &tsdb.Request{ + TimeRange: tsdb.TimeRange{ + From: c.Query.From, + To: c.Query.To, + }, + Queries: []*tsdb.Query{ + { + RefId: "A", + Query: c.Query.Model.Get("target").MustString(), + DataSource: &tsdb.DataSourceInfo{ + Id: datasource.Id, + Name: datasource.Name, + PluginId: datasource.Type, + Url: datasource.Url, + }, + }, + }, + } + + return req } func NewQueryCondition(model *simplejson.Json) (*QueryCondition, error) { condition := QueryCondition{} + condition.HandleRequest = tsdb.HandleRequest queryJson := model.Get("query") - condition.Query.Query = queryJson.Get("query").MustString() + condition.Query.Model = queryJson.Get("model") condition.Query.From = queryJson.Get("params").MustArray()[1].(string) condition.Query.To = queryJson.Get("params").MustArray()[2].(string) condition.Query.DatasourceId = queryJson.Get("datasourceId").MustInt64() @@ -43,8 +115,18 @@ type SimpleReducer struct { Type string } -func (s *SimpleReducer) Reduce() float64 { - return 0 +func (s *SimpleReducer) Reduce(series *tsdb.TimeSeries) float64 { + var value float64 = 0 + + switch s.Type { + case "avg": + for _, point := range series.Points { + value += point[0] + } + value = value / float64(len(series.Points)) + } + + return value } func NewSimpleReducer(typ string) *SimpleReducer { @@ -56,8 +138,15 @@ type DefaultAlertEvaluator struct { Threshold float64 } -func (e *DefaultAlertEvaluator) Eval() bool { - return true +func (e *DefaultAlertEvaluator) Eval(series *tsdb.TimeSeries, reducedValue float64) bool { + switch e.Type { + case ">": + return reducedValue > e.Threshold + case "<": + return reducedValue < e.Threshold + } + + return false } func NewDefaultAlertEvaluator(model *simplejson.Json) (*DefaultAlertEvaluator, error) { diff --git a/pkg/services/alerting/conditions_test.go b/pkg/services/alerting/conditions_test.go index 90645270881..bbeb16597c6 100644 --- a/pkg/services/alerting/conditions_test.go +++ b/pkg/services/alerting/conditions_test.go @@ -3,7 +3,10 @@ package alerting import ( "testing" + "github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/components/simplejson" + m "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/tsdb" . "github.com/smartystreets/goconvey/convey" ) @@ -11,6 +14,11 @@ func TestQueryCondition(t *testing.T) { Convey("when evaluating query condition", t, func() { + bus.AddHandler("test", func(query *m.GetDataSourceByIdQuery) error { + query.Result = &m.DataSource{Id: 1, Type: "graphite"} + return nil + }) + Convey("Given avg() and > 100", func() { jsonModel, err := simplejson.NewJson([]byte(`{ @@ -29,9 +37,25 @@ func TestQueryCondition(t *testing.T) { So(err, ShouldBeNil) Convey("Should set result to triggered when avg is above 100", func() { - context := &AlertResultContext{} + context := &AlertResultContext{ + Rule: &AlertRule{}, + } + + condition.HandleRequest = func(req *tsdb.Request) (*tsdb.Response, error) { + return &tsdb.Response{ + Results: map[string]*tsdb.QueryResult{ + "A": &tsdb.QueryResult{ + Series: tsdb.TimeSeriesSlice{ + tsdb.NewTimeSeries("test1", [][2]float64{{120, 0}}), + }, + }, + }, + }, nil + } + condition.Eval(context) + So(context.Error, ShouldBeNil) So(context.Triggered, ShouldBeTrue) }) }) diff --git a/pkg/services/alerting/interfaces.go b/pkg/services/alerting/interfaces.go index 9fd374675e8..bb86a766427 100644 --- a/pkg/services/alerting/interfaces.go +++ b/pkg/services/alerting/interfaces.go @@ -1,6 +1,10 @@ package alerting -import "time" +import ( + "time" + + "github.com/grafana/grafana/pkg/tsdb" +) type AlertHandler interface { Execute(rule *AlertRule, resultChan chan *AlertResultContext) @@ -20,9 +24,9 @@ type AlertCondition interface { } type QueryReducer interface { - Reduce() float64 + Reduce(timeSeries *tsdb.TimeSeries) float64 } type AlertEvaluator interface { - Eval() bool + Eval(timeSeries *tsdb.TimeSeries, reducedValue float64) bool } diff --git a/pkg/services/alerting/models.go b/pkg/services/alerting/models.go index 80976d88475..58c025ffeb4 100644 --- a/pkg/services/alerting/models.go +++ b/pkg/services/alerting/models.go @@ -1,6 +1,10 @@ package alerting -import "time" +import ( + "time" + + "github.com/grafana/grafana/pkg/components/simplejson" +) type AlertJob struct { Offset int64 @@ -45,7 +49,7 @@ type Level struct { } type AlertQuery struct { - Query string + Model *simplejson.Json DatasourceId int64 From string To string diff --git a/pkg/tsdb/graphite/graphite.go b/pkg/tsdb/graphite/graphite.go index 98f4c648048..b8f3e16a362 100644 --- a/pkg/tsdb/graphite/graphite.go +++ b/pkg/tsdb/graphite/graphite.go @@ -38,9 +38,7 @@ func (e *GraphiteExecutor) Execute(queries tsdb.QuerySlice, context *tsdb.QueryC } for _, query := range queries { - params["target"] = []string{ - query.Query, - } + params["target"] = []string{query.Query} } client := http.Client{Timeout: time.Duration(10 * time.Second)} diff --git a/pkg/tsdb/request.go b/pkg/tsdb/request.go index 2c96a3ff3ce..2e5e5eec25a 100644 --- a/pkg/tsdb/request.go +++ b/pkg/tsdb/request.go @@ -1,5 +1,7 @@ package tsdb +type HandleRequestFunc func(req *Request) (*Response, error) + func HandleRequest(req *Request) (*Response, error) { context := NewQueryContext(req.Queries, req.TimeRange)