From 422234d03a10989c6e511233aedbd34acf7aeb30 Mon Sep 17 00:00:00 2001 From: bergquist Date: Fri, 27 May 2016 12:06:41 +0200 Subject: [PATCH] feat(alerting): abstract graphite from executor --- .../types.go => models/timeseries.go} | 2 +- pkg/services/alerting/alerting.go | 2 +- pkg/services/alerting/dummie_executor.go | 4 --- .../{rule_executor.go => executor.go} | 23 +++++++++++++--- .../graphite.go} | 27 +++++++++---------- 5 files changed, 35 insertions(+), 23 deletions(-) rename pkg/{services/alerting/types.go => models/timeseries.go} (88%) rename pkg/services/alerting/{rule_executor.go => executor.go} (52%) rename pkg/services/alerting/{graphite_executor.go => graphite/graphite.go} (72%) diff --git a/pkg/services/alerting/types.go b/pkg/models/timeseries.go similarity index 88% rename from pkg/services/alerting/types.go rename to pkg/models/timeseries.go index a9301035934..ccb220d39ed 100644 --- a/pkg/services/alerting/types.go +++ b/pkg/models/timeseries.go @@ -1,4 +1,4 @@ -package alerting +package models type TimeSeries struct { Name string `json:"name"` diff --git a/pkg/services/alerting/alerting.go b/pkg/services/alerting/alerting.go index 1c811e87e8c..9895cf98297 100644 --- a/pkg/services/alerting/alerting.go +++ b/pkg/services/alerting/alerting.go @@ -21,7 +21,7 @@ func Init() { scheduler := NewScheduler() go scheduler.Dispatch(&AlertRuleReader{}) - go scheduler.Executor(&GraphiteExecutor{}) + go scheduler.Executor(&ExecutorImpl{}) go scheduler.HandleResponses() } diff --git a/pkg/services/alerting/dummie_executor.go b/pkg/services/alerting/dummie_executor.go index c5662ce9ea8..5bf0c2dd663 100644 --- a/pkg/services/alerting/dummie_executor.go +++ b/pkg/services/alerting/dummie_executor.go @@ -6,10 +6,6 @@ import ( "time" ) -type Executor interface { - Execute(rule m.AlertRule, responseQueue chan *AlertResult) -} - type DummieExecutor struct{} func (this *DummieExecutor) Execute(rule m.AlertRule, responseQueue chan *AlertResult) { diff --git a/pkg/services/alerting/rule_executor.go b/pkg/services/alerting/executor.go similarity index 52% rename from pkg/services/alerting/rule_executor.go rename to pkg/services/alerting/executor.go index 0512e4e11e1..75fe1546d4a 100644 --- a/pkg/services/alerting/rule_executor.go +++ b/pkg/services/alerting/executor.go @@ -2,17 +2,34 @@ package alerting import ( m "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/services/alerting/graphite" ) -func (this *GraphiteExecutor) executeRules(series []GraphiteSerie, rule m.AlertRule) *AlertResult { +type Executor interface { + Execute(rule m.AlertRule, responseQueue chan *AlertResult) +} + +type ExecutorImpl struct{} + +func (this *ExecutorImpl) Execute(rule m.AlertRule, responseQueue chan *AlertResult) { + response, err := graphite.GraphiteClient{}.GetSeries(rule) + + if err != nil { + responseQueue <- &AlertResult{State: "CRITICAL", Id: rule.Id} + } + + responseQueue <- this.executeRules(response, rule) +} + +func (this *ExecutorImpl) executeRules(series m.TimeSeriesSlice, rule m.AlertRule) *AlertResult { for _, v := range series { var avg float64 var sum float64 - for _, dp := range v.Datapoints { + for _, dp := range v.Points { sum += dp[0] } - avg = sum / float64(len(v.Datapoints)) + avg = sum / float64(len(v.Points)) if float64(rule.CritLevel) < avg { return &AlertResult{State: m.AlertStateCritical, Id: rule.Id, ActualValue: avg} diff --git a/pkg/services/alerting/graphite_executor.go b/pkg/services/alerting/graphite/graphite.go similarity index 72% rename from pkg/services/alerting/graphite_executor.go rename to pkg/services/alerting/graphite/graphite.go index 61519408206..49d5a59c158 100644 --- a/pkg/services/alerting/graphite_executor.go +++ b/pkg/services/alerting/graphite/graphite.go @@ -1,4 +1,4 @@ -package alerting +package graphite import ( "fmt" @@ -11,7 +11,7 @@ import ( "time" ) -type GraphiteExecutor struct{} +type GraphiteClient struct{} type GraphiteSerie struct { Datapoints [][2]float64 @@ -20,17 +20,7 @@ type GraphiteSerie struct { type GraphiteResponse []GraphiteSerie -func (this *GraphiteExecutor) Execute(rule m.AlertRule, responseQueue chan *AlertResult) { - response, err := this.getSeries(rule) - - if err != nil { - responseQueue <- &AlertResult{State: "CRITICAL", Id: rule.Id} - } - - responseQueue <- this.executeRules(response, rule) -} - -func (this *GraphiteExecutor) getSeries(rule m.AlertRule) (GraphiteResponse, error) { +func (this GraphiteClient) GetSeries(rule m.AlertRule) (m.TimeSeriesSlice, error) { query := &m.GetDataSourceByIdQuery{Id: rule.DatasourceId, OrgId: rule.OrgId} if err := bus.Dispatch(query); err != nil { return nil, err @@ -61,7 +51,16 @@ func (this *GraphiteExecutor) getSeries(rule m.AlertRule) (GraphiteResponse, err return nil, fmt.Errorf("error!") } - return response, nil + timeSeries := make([]*m.TimeSeries, 0) + + for _, v := range response { + timeSeries = append(timeSeries, &m.TimeSeries{ + Name: v.Target, + Points: v.Datapoints, + }) + } + + return timeSeries, nil } func getTargetFromRule(rule m.AlertRule) string {