mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
feat(alerting): implement transform objects
This commit is contained in:
parent
c18017381b
commit
83c422e6ef
@ -108,6 +108,12 @@ func ConvetAlertModelToAlertRule(ruleDef *m.AlertRuleModel) (*AlertRule, error)
|
|||||||
model.Transform = ruleDef.Expression.Get("transform").Get("type").MustString()
|
model.Transform = ruleDef.Expression.Get("transform").Get("type").MustString()
|
||||||
model.TransformParams = *ruleDef.Expression.Get("transform")
|
model.TransformParams = *ruleDef.Expression.Get("transform")
|
||||||
|
|
||||||
|
if model.Transform == "aggregation" {
|
||||||
|
model.Transformer = &AggregationTransformer{
|
||||||
|
Method: ruleDef.Expression.Get("transform").Get("method").MustString(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
query := ruleDef.Expression.Get("query")
|
query := ruleDef.Expression.Get("query")
|
||||||
model.Query = AlertQuery{
|
model.Query = AlertQuery{
|
||||||
Query: query.Get("query").MustString(),
|
Query: query.Get("query").MustString(),
|
||||||
|
15
pkg/services/alerting/evaluator.go
Normal file
15
pkg/services/alerting/evaluator.go
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
package alerting
|
||||||
|
|
||||||
|
type compareFn func(float64, float64) bool
|
||||||
|
|
||||||
|
func evalCondition(level Level, result float64) bool {
|
||||||
|
return operators[level.Operator](result, level.Level)
|
||||||
|
}
|
||||||
|
|
||||||
|
var operators = map[string]compareFn{
|
||||||
|
">": func(num1, num2 float64) bool { return num1 > num2 },
|
||||||
|
">=": func(num1, num2 float64) bool { return num1 >= num2 },
|
||||||
|
"<": func(num1, num2 float64) bool { return num1 < num2 },
|
||||||
|
"<=": func(num1, num2 float64) bool { return num1 <= num2 },
|
||||||
|
"": func(num1, num2 float64) bool { return false },
|
||||||
|
}
|
@ -3,8 +3,6 @@ package alerting
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"math"
|
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/bus"
|
"github.com/grafana/grafana/pkg/bus"
|
||||||
"github.com/grafana/grafana/pkg/log"
|
"github.com/grafana/grafana/pkg/log"
|
||||||
m "github.com/grafana/grafana/pkg/models"
|
m "github.com/grafana/grafana/pkg/models"
|
||||||
@ -26,63 +24,6 @@ func NewExecutor() *ExecutorImpl {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type compareFn func(float64, float64) bool
|
|
||||||
type aggregationFn func(*tsdb.TimeSeries) float64
|
|
||||||
|
|
||||||
var operators = map[string]compareFn{
|
|
||||||
">": func(num1, num2 float64) bool { return num1 > num2 },
|
|
||||||
">=": func(num1, num2 float64) bool { return num1 >= num2 },
|
|
||||||
"<": func(num1, num2 float64) bool { return num1 < num2 },
|
|
||||||
"<=": func(num1, num2 float64) bool { return num1 <= num2 },
|
|
||||||
"": func(num1, num2 float64) bool { return false },
|
|
||||||
}
|
|
||||||
var aggregator = map[string]aggregationFn{
|
|
||||||
"avg": func(series *tsdb.TimeSeries) float64 {
|
|
||||||
sum := float64(0)
|
|
||||||
|
|
||||||
for _, v := range series.Points {
|
|
||||||
sum += v[0]
|
|
||||||
}
|
|
||||||
|
|
||||||
return sum / float64(len(series.Points))
|
|
||||||
},
|
|
||||||
"sum": func(series *tsdb.TimeSeries) float64 {
|
|
||||||
sum := float64(0)
|
|
||||||
|
|
||||||
for _, v := range series.Points {
|
|
||||||
sum += v[0]
|
|
||||||
}
|
|
||||||
|
|
||||||
return sum
|
|
||||||
},
|
|
||||||
"min": func(series *tsdb.TimeSeries) float64 {
|
|
||||||
min := series.Points[0][0]
|
|
||||||
|
|
||||||
for _, v := range series.Points {
|
|
||||||
if v[0] < min {
|
|
||||||
min = v[0]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return min
|
|
||||||
},
|
|
||||||
"max": func(series *tsdb.TimeSeries) float64 {
|
|
||||||
max := series.Points[0][0]
|
|
||||||
|
|
||||||
for _, v := range series.Points {
|
|
||||||
if v[0] > max {
|
|
||||||
max = v[0]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return max
|
|
||||||
},
|
|
||||||
"mean": func(series *tsdb.TimeSeries) float64 {
|
|
||||||
midPosition := int64(math.Floor(float64(len(series.Points)) / float64(2)))
|
|
||||||
return series.Points[midPosition][0]
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *ExecutorImpl) Execute(job *AlertJob, resultQueue chan *AlertResult) {
|
func (e *ExecutorImpl) Execute(job *AlertJob, resultQueue chan *AlertResult) {
|
||||||
timeSeries, err := e.executeQuery(job)
|
timeSeries, err := e.executeQuery(job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -156,32 +97,25 @@ func (e *ExecutorImpl) evaluateRule(rule *AlertRule, series tsdb.TimeSeriesSlice
|
|||||||
|
|
||||||
for _, serie := range series {
|
for _, serie := range series {
|
||||||
e.log.Debug("Evaluating series", "series", serie.Name)
|
e.log.Debug("Evaluating series", "series", serie.Name)
|
||||||
|
transformedValue, _ := rule.Transformer.Transform(serie)
|
||||||
|
|
||||||
if aggregator["avg"] == nil {
|
critResult := evalCondition(rule.Critical, transformedValue)
|
||||||
continue
|
e.log.Debug("Alert execution Crit", "name", serie.Name, "transformedValue", transformedValue, "operator", rule.Critical.Operator, "level", rule.Critical.Level, "result", critResult)
|
||||||
}
|
|
||||||
|
|
||||||
var aggValue = aggregator["avg"](serie)
|
|
||||||
var critOperartor = operators[rule.Critical.Operator]
|
|
||||||
var critResult = critOperartor(aggValue, rule.Critical.Level)
|
|
||||||
|
|
||||||
e.log.Debug("Alert execution Crit", "name", serie.Name, "aggValue", aggValue, "operator", rule.Critical.Operator, "level", rule.Critical.Level, "result", critResult)
|
|
||||||
if critResult {
|
if critResult {
|
||||||
return &AlertResult{
|
return &AlertResult{
|
||||||
State: alertstates.Critical,
|
State: alertstates.Critical,
|
||||||
ActualValue: aggValue,
|
ActualValue: transformedValue,
|
||||||
Description: fmt.Sprintf(descriptionFmt, aggValue, serie.Name),
|
Description: fmt.Sprintf(descriptionFmt, transformedValue, serie.Name),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var warnOperartor = operators[rule.Warning.Operator]
|
warnResult := evalCondition(rule.Warning, transformedValue)
|
||||||
var warnResult = warnOperartor(aggValue, rule.Warning.Level)
|
e.log.Debug("Alert execution Warn", "name", serie.Name, "transformedValue", transformedValue, "operator", rule.Warning.Operator, "level", rule.Warning.Level, "result", warnResult)
|
||||||
e.log.Debug("Alert execution Warn", "name", serie.Name, "aggValue", aggValue, "operator", rule.Warning.Operator, "level", rule.Warning.Level, "result", warnResult)
|
|
||||||
if warnResult {
|
if warnResult {
|
||||||
return &AlertResult{
|
return &AlertResult{
|
||||||
State: alertstates.Warn,
|
State: alertstates.Warn,
|
||||||
Description: fmt.Sprintf(descriptionFmt, aggValue, serie.Name),
|
Description: fmt.Sprintf(descriptionFmt, transformedValue, serie.Name),
|
||||||
ActualValue: aggValue,
|
ActualValue: transformedValue,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -14,7 +14,10 @@ func TestAlertingExecutor(t *testing.T) {
|
|||||||
|
|
||||||
Convey("single time serie", func() {
|
Convey("single time serie", func() {
|
||||||
Convey("Show return ok since avg is above 2", func() {
|
Convey("Show return ok since avg is above 2", func() {
|
||||||
rule := &AlertRule{Critical: Level{Level: 10, Operator: ">"}}
|
rule := &AlertRule{
|
||||||
|
Critical: Level{Level: 10, Operator: ">"},
|
||||||
|
Transformer: &AggregationTransformer{Method: "avg"},
|
||||||
|
}
|
||||||
|
|
||||||
timeSeries := []*tsdb.TimeSeries{
|
timeSeries := []*tsdb.TimeSeries{
|
||||||
tsdb.NewTimeSeries("test1", [][2]float64{{2, 0}}),
|
tsdb.NewTimeSeries("test1", [][2]float64{{2, 0}}),
|
||||||
@ -25,7 +28,10 @@ func TestAlertingExecutor(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
Convey("Show return critical since below 2", func() {
|
Convey("Show return critical since below 2", func() {
|
||||||
rule := &AlertRule{Critical: Level{Level: 10, Operator: "<"}}
|
rule := &AlertRule{
|
||||||
|
Critical: Level{Level: 10, Operator: "<"},
|
||||||
|
Transformer: &AggregationTransformer{Method: "avg"},
|
||||||
|
}
|
||||||
|
|
||||||
timeSeries := []*tsdb.TimeSeries{
|
timeSeries := []*tsdb.TimeSeries{
|
||||||
tsdb.NewTimeSeries("test1", [][2]float64{{2, 0}}),
|
tsdb.NewTimeSeries("test1", [][2]float64{{2, 0}}),
|
||||||
@ -49,7 +55,10 @@ func TestAlertingExecutor(t *testing.T) {
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
Convey("Show return ok since avg is below 10", func() {
|
Convey("Show return ok since avg is below 10", func() {
|
||||||
rule := &AlertRule{Critical: Level{Level: 10, Operator: ">"}}
|
rule := &AlertRule{
|
||||||
|
Critical: Level{Level: 10, Operator: ">"},
|
||||||
|
Transformer: &AggregationTransformer{Method: "avg"},
|
||||||
|
}
|
||||||
|
|
||||||
timeSeries := []*tsdb.TimeSeries{
|
timeSeries := []*tsdb.TimeSeries{
|
||||||
tsdb.NewTimeSeries("test1", [][2]float64{{9, 0}, {9, 0}}),
|
tsdb.NewTimeSeries("test1", [][2]float64{{9, 0}, {9, 0}}),
|
||||||
@ -60,7 +69,10 @@ func TestAlertingExecutor(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
Convey("Show return ok since min is below 10", func() {
|
Convey("Show return ok since min is below 10", func() {
|
||||||
rule := &AlertRule{Critical: Level{Level: 10, Operator: ">"}}
|
rule := &AlertRule{
|
||||||
|
Critical: Level{Level: 10, Operator: ">"},
|
||||||
|
Transformer: &AggregationTransformer{Method: "avg"},
|
||||||
|
}
|
||||||
|
|
||||||
timeSeries := []*tsdb.TimeSeries{
|
timeSeries := []*tsdb.TimeSeries{
|
||||||
tsdb.NewTimeSeries("test1", [][2]float64{{11, 0}, {9, 0}}),
|
tsdb.NewTimeSeries("test1", [][2]float64{{11, 0}, {9, 0}}),
|
||||||
@ -85,7 +97,10 @@ func TestAlertingExecutor(t *testing.T) {
|
|||||||
|
|
||||||
Convey("muliple time series", func() {
|
Convey("muliple time series", func() {
|
||||||
Convey("both are ok", func() {
|
Convey("both are ok", func() {
|
||||||
rule := &AlertRule{Critical: Level{Level: 10, Operator: ">"}}
|
rule := &AlertRule{
|
||||||
|
Critical: Level{Level: 10, Operator: ">"},
|
||||||
|
Transformer: &AggregationTransformer{Method: "avg"},
|
||||||
|
}
|
||||||
|
|
||||||
timeSeries := []*tsdb.TimeSeries{
|
timeSeries := []*tsdb.TimeSeries{
|
||||||
tsdb.NewTimeSeries("test1", [][2]float64{{2, 0}}),
|
tsdb.NewTimeSeries("test1", [][2]float64{{2, 0}}),
|
||||||
@ -97,7 +112,10 @@ func TestAlertingExecutor(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
Convey("first serie is good, second is critical", func() {
|
Convey("first serie is good, second is critical", func() {
|
||||||
rule := &AlertRule{Critical: Level{Level: 10, Operator: ">"}}
|
rule := &AlertRule{
|
||||||
|
Critical: Level{Level: 10, Operator: ">"},
|
||||||
|
Transformer: &AggregationTransformer{Method: "avg"},
|
||||||
|
}
|
||||||
|
|
||||||
timeSeries := []*tsdb.TimeSeries{
|
timeSeries := []*tsdb.TimeSeries{
|
||||||
tsdb.NewTimeSeries("test1", [][2]float64{{2, 0}}),
|
tsdb.NewTimeSeries("test1", [][2]float64{{2, 0}}),
|
||||||
|
@ -2,7 +2,6 @@ package alerting
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||||
"github.com/grafana/grafana/pkg/tsdb"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type AlertJob struct {
|
type AlertJob struct {
|
||||||
@ -36,10 +35,7 @@ type AlertRule struct {
|
|||||||
Query AlertQuery
|
Query AlertQuery
|
||||||
Transform string
|
Transform string
|
||||||
TransformParams simplejson.Json
|
TransformParams simplejson.Json
|
||||||
}
|
Transformer Transformer
|
||||||
|
|
||||||
type Transformer interface {
|
|
||||||
Transform(tsdb tsdb.TimeSeriesSlice) float64
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type Level struct {
|
type Level struct {
|
||||||
|
73
pkg/services/alerting/transformer.go
Normal file
73
pkg/services/alerting/transformer.go
Normal file
@ -0,0 +1,73 @@
|
|||||||
|
package alerting
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"math"
|
||||||
|
|
||||||
|
"github.com/grafana/grafana/pkg/tsdb"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Transformer interface {
|
||||||
|
Transform(timeserie *tsdb.TimeSeries) (float64, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type AggregationTransformer struct {
|
||||||
|
Method string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (at *AggregationTransformer) Transform(timeserie *tsdb.TimeSeries) (float64, error) {
|
||||||
|
|
||||||
|
if at.Method == "avg" {
|
||||||
|
sum := float64(0)
|
||||||
|
for _, point := range timeserie.Points {
|
||||||
|
sum += point[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
return sum / float64(len(timeserie.Points)), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//"sum": func(series *tsdb.TimeSeries) float64 {
|
||||||
|
if at.Method == "sum" {
|
||||||
|
sum := float64(0)
|
||||||
|
|
||||||
|
for _, v := range timeserie.Points {
|
||||||
|
sum += v[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
return sum, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//"min": func(series *tsdb.TimeSeries) float64 {
|
||||||
|
if at.Method == "min" {
|
||||||
|
min := timeserie.Points[0][0]
|
||||||
|
|
||||||
|
for _, v := range timeserie.Points {
|
||||||
|
if v[0] < min {
|
||||||
|
min = v[0]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return min, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//"max": func(series *tsdb.TimeSeries) float64 {
|
||||||
|
if at.Method == "max" {
|
||||||
|
max := timeserie.Points[0][0]
|
||||||
|
|
||||||
|
for _, v := range timeserie.Points {
|
||||||
|
if v[0] > max {
|
||||||
|
max = v[0]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return max, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//"mean": func(series *tsdb.TimeSeries) float64 {
|
||||||
|
if at.Method == "mean" {
|
||||||
|
midPosition := int64(math.Floor(float64(len(timeserie.Points)) / float64(2)))
|
||||||
|
return timeserie.Points[midPosition][0], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return float64(0), fmt.Errorf("Missing method")
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user