feat(alerting): abstract graphite from executor

This commit is contained in:
bergquist 2016-05-27 12:06:41 +02:00
parent 3d66ec816d
commit 422234d03a
5 changed files with 35 additions and 23 deletions

View File

@ -1,4 +1,4 @@
package alerting package models
type TimeSeries struct { type TimeSeries struct {
Name string `json:"name"` Name string `json:"name"`

View File

@ -21,7 +21,7 @@ func Init() {
scheduler := NewScheduler() scheduler := NewScheduler()
go scheduler.Dispatch(&AlertRuleReader{}) go scheduler.Dispatch(&AlertRuleReader{})
go scheduler.Executor(&GraphiteExecutor{}) go scheduler.Executor(&ExecutorImpl{})
go scheduler.HandleResponses() go scheduler.HandleResponses()
} }

View File

@ -6,10 +6,6 @@ import (
"time" "time"
) )
type Executor interface {
Execute(rule m.AlertRule, responseQueue chan *AlertResult)
}
type DummieExecutor struct{} type DummieExecutor struct{}
func (this *DummieExecutor) Execute(rule m.AlertRule, responseQueue chan *AlertResult) { func (this *DummieExecutor) Execute(rule m.AlertRule, responseQueue chan *AlertResult) {

View File

@ -2,17 +2,34 @@ package alerting
import ( import (
m "github.com/grafana/grafana/pkg/models" m "github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/alerting/graphite"
) )
func (this *GraphiteExecutor) executeRules(series []GraphiteSerie, rule m.AlertRule) *AlertResult { type Executor interface {
Execute(rule m.AlertRule, responseQueue chan *AlertResult)
}
type ExecutorImpl struct{}
func (this *ExecutorImpl) Execute(rule m.AlertRule, responseQueue chan *AlertResult) {
response, err := graphite.GraphiteClient{}.GetSeries(rule)
if err != nil {
responseQueue <- &AlertResult{State: "CRITICAL", Id: rule.Id}
}
responseQueue <- this.executeRules(response, rule)
}
func (this *ExecutorImpl) executeRules(series m.TimeSeriesSlice, rule m.AlertRule) *AlertResult {
for _, v := range series { for _, v := range series {
var avg float64 var avg float64
var sum float64 var sum float64
for _, dp := range v.Datapoints { for _, dp := range v.Points {
sum += dp[0] sum += dp[0]
} }
avg = sum / float64(len(v.Datapoints)) avg = sum / float64(len(v.Points))
if float64(rule.CritLevel) < avg { if float64(rule.CritLevel) < avg {
return &AlertResult{State: m.AlertStateCritical, Id: rule.Id, ActualValue: avg} return &AlertResult{State: m.AlertStateCritical, Id: rule.Id, ActualValue: avg}

View File

@ -1,4 +1,4 @@
package alerting package graphite
import ( import (
"fmt" "fmt"
@ -11,7 +11,7 @@ import (
"time" "time"
) )
type GraphiteExecutor struct{} type GraphiteClient struct{}
type GraphiteSerie struct { type GraphiteSerie struct {
Datapoints [][2]float64 Datapoints [][2]float64
@ -20,17 +20,7 @@ type GraphiteSerie struct {
type GraphiteResponse []GraphiteSerie type GraphiteResponse []GraphiteSerie
func (this *GraphiteExecutor) Execute(rule m.AlertRule, responseQueue chan *AlertResult) { func (this GraphiteClient) GetSeries(rule m.AlertRule) (m.TimeSeriesSlice, error) {
response, err := this.getSeries(rule)
if err != nil {
responseQueue <- &AlertResult{State: "CRITICAL", Id: rule.Id}
}
responseQueue <- this.executeRules(response, rule)
}
func (this *GraphiteExecutor) getSeries(rule m.AlertRule) (GraphiteResponse, error) {
query := &m.GetDataSourceByIdQuery{Id: rule.DatasourceId, OrgId: rule.OrgId} query := &m.GetDataSourceByIdQuery{Id: rule.DatasourceId, OrgId: rule.OrgId}
if err := bus.Dispatch(query); err != nil { if err := bus.Dispatch(query); err != nil {
return nil, err return nil, err
@ -61,7 +51,16 @@ func (this *GraphiteExecutor) getSeries(rule m.AlertRule) (GraphiteResponse, err
return nil, fmt.Errorf("error!") return nil, fmt.Errorf("error!")
} }
return response, nil timeSeries := make([]*m.TimeSeries, 0)
for _, v := range response {
timeSeries = append(timeSeries, &m.TimeSeries{
Name: v.Target,
Points: v.Datapoints,
})
}
return timeSeries, nil
} }
func getTargetFromRule(rule m.AlertRule) string { func getTargetFromRule(rule m.AlertRule) string {