diff --git a/pkg/services/alerting/dashboard_parser.go b/pkg/services/alerting/dashboard_parser.go index 88001435581..c3c9b9b643b 100644 --- a/pkg/services/alerting/dashboard_parser.go +++ b/pkg/services/alerting/dashboard_parser.go @@ -108,6 +108,12 @@ func ConvetAlertModelToAlertRule(ruleDef *m.AlertRuleModel) (*AlertRule, error) model.Transform = ruleDef.Expression.Get("transform").Get("type").MustString() model.TransformParams = *ruleDef.Expression.Get("transform") + if model.Transform == "aggregation" { + model.Transformer = &AggregationTransformer{ + Method: ruleDef.Expression.Get("transform").Get("method").MustString(), + } + } + query := ruleDef.Expression.Get("query") model.Query = AlertQuery{ Query: query.Get("query").MustString(), diff --git a/pkg/services/alerting/evaluator.go b/pkg/services/alerting/evaluator.go new file mode 100644 index 00000000000..efa7231b435 --- /dev/null +++ b/pkg/services/alerting/evaluator.go @@ -0,0 +1,15 @@ +package alerting + +type compareFn func(float64, float64) bool + +func evalCondition(level Level, result float64) bool { + return operators[level.Operator](result, level.Level) +} + +var operators = map[string]compareFn{ + ">": func(num1, num2 float64) bool { return num1 > num2 }, + ">=": func(num1, num2 float64) bool { return num1 >= num2 }, + "<": func(num1, num2 float64) bool { return num1 < num2 }, + "<=": func(num1, num2 float64) bool { return num1 <= num2 }, + "": func(num1, num2 float64) bool { return false }, +} diff --git a/pkg/services/alerting/executor.go b/pkg/services/alerting/executor.go index cfdc18edb2c..1520bc494c0 100644 --- a/pkg/services/alerting/executor.go +++ b/pkg/services/alerting/executor.go @@ -3,8 +3,6 @@ package alerting import ( "fmt" - "math" - "github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/log" m "github.com/grafana/grafana/pkg/models" @@ -26,63 +24,6 @@ func NewExecutor() *ExecutorImpl { } } -type compareFn func(float64, float64) bool -type aggregationFn func(*tsdb.TimeSeries) float64 - -var operators = map[string]compareFn{ - ">": func(num1, num2 float64) bool { return num1 > num2 }, - ">=": func(num1, num2 float64) bool { return num1 >= num2 }, - "<": func(num1, num2 float64) bool { return num1 < num2 }, - "<=": func(num1, num2 float64) bool { return num1 <= num2 }, - "": func(num1, num2 float64) bool { return false }, -} -var aggregator = map[string]aggregationFn{ - "avg": func(series *tsdb.TimeSeries) float64 { - sum := float64(0) - - for _, v := range series.Points { - sum += v[0] - } - - return sum / float64(len(series.Points)) - }, - "sum": func(series *tsdb.TimeSeries) float64 { - sum := float64(0) - - for _, v := range series.Points { - sum += v[0] - } - - return sum - }, - "min": func(series *tsdb.TimeSeries) float64 { - min := series.Points[0][0] - - for _, v := range series.Points { - if v[0] < min { - min = v[0] - } - } - - return min - }, - "max": func(series *tsdb.TimeSeries) float64 { - max := series.Points[0][0] - - for _, v := range series.Points { - if v[0] > max { - max = v[0] - } - } - - return max - }, - "mean": func(series *tsdb.TimeSeries) float64 { - midPosition := int64(math.Floor(float64(len(series.Points)) / float64(2))) - return series.Points[midPosition][0] - }, -} - func (e *ExecutorImpl) Execute(job *AlertJob, resultQueue chan *AlertResult) { timeSeries, err := e.executeQuery(job) if err != nil { @@ -156,32 +97,25 @@ func (e *ExecutorImpl) evaluateRule(rule *AlertRule, series tsdb.TimeSeriesSlice for _, serie := range series { e.log.Debug("Evaluating series", "series", serie.Name) + transformedValue, _ := rule.Transformer.Transform(serie) - if aggregator["avg"] == nil { - continue - } - - var aggValue = aggregator["avg"](serie) - var critOperartor = operators[rule.Critical.Operator] - var critResult = critOperartor(aggValue, rule.Critical.Level) - - e.log.Debug("Alert execution Crit", "name", serie.Name, "aggValue", aggValue, "operator", rule.Critical.Operator, "level", rule.Critical.Level, "result", critResult) + critResult := evalCondition(rule.Critical, transformedValue) + e.log.Debug("Alert execution Crit", "name", serie.Name, "transformedValue", transformedValue, "operator", rule.Critical.Operator, "level", rule.Critical.Level, "result", critResult) if critResult { return &AlertResult{ State: alertstates.Critical, - ActualValue: aggValue, - Description: fmt.Sprintf(descriptionFmt, aggValue, serie.Name), + ActualValue: transformedValue, + Description: fmt.Sprintf(descriptionFmt, transformedValue, serie.Name), } } - var warnOperartor = operators[rule.Warning.Operator] - var warnResult = warnOperartor(aggValue, rule.Warning.Level) - e.log.Debug("Alert execution Warn", "name", serie.Name, "aggValue", aggValue, "operator", rule.Warning.Operator, "level", rule.Warning.Level, "result", warnResult) + warnResult := evalCondition(rule.Warning, transformedValue) + e.log.Debug("Alert execution Warn", "name", serie.Name, "transformedValue", transformedValue, "operator", rule.Warning.Operator, "level", rule.Warning.Level, "result", warnResult) if warnResult { return &AlertResult{ State: alertstates.Warn, - Description: fmt.Sprintf(descriptionFmt, aggValue, serie.Name), - ActualValue: aggValue, + Description: fmt.Sprintf(descriptionFmt, transformedValue, serie.Name), + ActualValue: transformedValue, } } } diff --git a/pkg/services/alerting/executor_test.go b/pkg/services/alerting/executor_test.go index 4da75bdcc16..70074bbcf60 100644 --- a/pkg/services/alerting/executor_test.go +++ b/pkg/services/alerting/executor_test.go @@ -14,7 +14,10 @@ func TestAlertingExecutor(t *testing.T) { Convey("single time serie", func() { Convey("Show return ok since avg is above 2", func() { - rule := &AlertRule{Critical: Level{Level: 10, Operator: ">"}} + rule := &AlertRule{ + Critical: Level{Level: 10, Operator: ">"}, + Transformer: &AggregationTransformer{Method: "avg"}, + } timeSeries := []*tsdb.TimeSeries{ tsdb.NewTimeSeries("test1", [][2]float64{{2, 0}}), @@ -25,7 +28,10 @@ func TestAlertingExecutor(t *testing.T) { }) Convey("Show return critical since below 2", func() { - rule := &AlertRule{Critical: Level{Level: 10, Operator: "<"}} + rule := &AlertRule{ + Critical: Level{Level: 10, Operator: "<"}, + Transformer: &AggregationTransformer{Method: "avg"}, + } timeSeries := []*tsdb.TimeSeries{ tsdb.NewTimeSeries("test1", [][2]float64{{2, 0}}), @@ -49,7 +55,10 @@ func TestAlertingExecutor(t *testing.T) { */ Convey("Show return ok since avg is below 10", func() { - rule := &AlertRule{Critical: Level{Level: 10, Operator: ">"}} + rule := &AlertRule{ + Critical: Level{Level: 10, Operator: ">"}, + Transformer: &AggregationTransformer{Method: "avg"}, + } timeSeries := []*tsdb.TimeSeries{ tsdb.NewTimeSeries("test1", [][2]float64{{9, 0}, {9, 0}}), @@ -60,7 +69,10 @@ func TestAlertingExecutor(t *testing.T) { }) Convey("Show return ok since min is below 10", func() { - rule := &AlertRule{Critical: Level{Level: 10, Operator: ">"}} + rule := &AlertRule{ + Critical: Level{Level: 10, Operator: ">"}, + Transformer: &AggregationTransformer{Method: "avg"}, + } timeSeries := []*tsdb.TimeSeries{ tsdb.NewTimeSeries("test1", [][2]float64{{11, 0}, {9, 0}}), @@ -85,7 +97,10 @@ func TestAlertingExecutor(t *testing.T) { Convey("muliple time series", func() { Convey("both are ok", func() { - rule := &AlertRule{Critical: Level{Level: 10, Operator: ">"}} + rule := &AlertRule{ + Critical: Level{Level: 10, Operator: ">"}, + Transformer: &AggregationTransformer{Method: "avg"}, + } timeSeries := []*tsdb.TimeSeries{ tsdb.NewTimeSeries("test1", [][2]float64{{2, 0}}), @@ -97,7 +112,10 @@ func TestAlertingExecutor(t *testing.T) { }) Convey("first serie is good, second is critical", func() { - rule := &AlertRule{Critical: Level{Level: 10, Operator: ">"}} + rule := &AlertRule{ + Critical: Level{Level: 10, Operator: ">"}, + Transformer: &AggregationTransformer{Method: "avg"}, + } timeSeries := []*tsdb.TimeSeries{ tsdb.NewTimeSeries("test1", [][2]float64{{2, 0}}), diff --git a/pkg/services/alerting/models.go b/pkg/services/alerting/models.go index 4f3796d69ec..c13669ea3d7 100644 --- a/pkg/services/alerting/models.go +++ b/pkg/services/alerting/models.go @@ -2,7 +2,6 @@ package alerting import ( "github.com/grafana/grafana/pkg/components/simplejson" - "github.com/grafana/grafana/pkg/tsdb" ) type AlertJob struct { @@ -36,10 +35,7 @@ type AlertRule struct { Query AlertQuery Transform string TransformParams simplejson.Json -} - -type Transformer interface { - Transform(tsdb tsdb.TimeSeriesSlice) float64 + Transformer Transformer } type Level struct { diff --git a/pkg/services/alerting/transformer.go b/pkg/services/alerting/transformer.go new file mode 100644 index 00000000000..1f574e6fce6 --- /dev/null +++ b/pkg/services/alerting/transformer.go @@ -0,0 +1,73 @@ +package alerting + +import ( + "fmt" + "math" + + "github.com/grafana/grafana/pkg/tsdb" +) + +type Transformer interface { + Transform(timeserie *tsdb.TimeSeries) (float64, error) +} + +type AggregationTransformer struct { + Method string +} + +func (at *AggregationTransformer) Transform(timeserie *tsdb.TimeSeries) (float64, error) { + + if at.Method == "avg" { + sum := float64(0) + for _, point := range timeserie.Points { + sum += point[0] + } + + return sum / float64(len(timeserie.Points)), nil + } + + //"sum": func(series *tsdb.TimeSeries) float64 { + if at.Method == "sum" { + sum := float64(0) + + for _, v := range timeserie.Points { + sum += v[0] + } + + return sum, nil + } + + //"min": func(series *tsdb.TimeSeries) float64 { + if at.Method == "min" { + min := timeserie.Points[0][0] + + for _, v := range timeserie.Points { + if v[0] < min { + min = v[0] + } + } + + return min, nil + } + + //"max": func(series *tsdb.TimeSeries) float64 { + if at.Method == "max" { + max := timeserie.Points[0][0] + + for _, v := range timeserie.Points { + if v[0] > max { + max = v[0] + } + } + + return max, nil + } + + //"mean": func(series *tsdb.TimeSeries) float64 { + if at.Method == "mean" { + midPosition := int64(math.Floor(float64(len(timeserie.Points)) / float64(2))) + return timeserie.Points[midPosition][0], nil + } + + return float64(0), fmt.Errorf("Missing method") +}