From 3d66ec816df1684a7cb9743419bbb0c50c795064 Mon Sep 17 00:00:00 2001 From: bergquist Date: Fri, 27 May 2016 10:34:44 +0200 Subject: [PATCH] chore(alerting): minor refactoring --- pkg/services/alerting/alerting.go | 4 +- pkg/services/alerting/graphite_executor.go | 58 ++++------------------ pkg/services/alerting/rule_executor.go | 35 +++++++++++++ pkg/services/alerting/types.go | 8 +++ 4 files changed, 56 insertions(+), 49 deletions(-) create mode 100644 pkg/services/alerting/rule_executor.go create mode 100644 pkg/services/alerting/types.go diff --git a/pkg/services/alerting/alerting.go b/pkg/services/alerting/alerting.go index 1b9964adcda..1c811e87e8c 100644 --- a/pkg/services/alerting/alerting.go +++ b/pkg/services/alerting/alerting.go @@ -50,7 +50,7 @@ func NewScheduler() *Scheduler { func (this *Scheduler) heartBeat() { //Lets cheat on this until we focus on clustering - log.Info("Heartbeat: Sending heartbeat from " + this.serverId) + //log.Info("Heartbeat: Sending heartbeat from " + this.serverId) this.clusterSize = 1 this.serverPosition = 1 @@ -119,7 +119,7 @@ func (this *Scheduler) queueJobs() { func (this *Scheduler) Executor(executor Executor) { for job := range this.runQueue { - log.Info("Executor: queue length %d", len(this.runQueue)) + //log.Info("Executor: queue length %d", len(this.runQueue)) log.Info("Executor: executing %s", job.rule.Title) this.jobs[job.rule.Id].running = true this.MeasureAndExecute(executor, job) diff --git a/pkg/services/alerting/graphite_executor.go b/pkg/services/alerting/graphite_executor.go index 21eb0bdb90a..61519408206 100644 --- a/pkg/services/alerting/graphite_executor.go +++ b/pkg/services/alerting/graphite_executor.go @@ -1,7 +1,6 @@ package alerting import ( - "encoding/json" "fmt" "github.com/franela/goreq" "github.com/grafana/grafana/pkg/bus" @@ -14,13 +13,12 @@ import ( type GraphiteExecutor struct{} -type Series struct { - Datapoints []DataPoint +type GraphiteSerie struct { + Datapoints [][2]float64 Target string } -type Response []Series -type DataPoint []json.Number +type GraphiteResponse []GraphiteSerie func (this *GraphiteExecutor) Execute(rule m.AlertRule, responseQueue chan *AlertResult) { response, err := this.getSeries(rule) @@ -32,38 +30,7 @@ func (this *GraphiteExecutor) Execute(rule m.AlertRule, responseQueue chan *Aler responseQueue <- this.executeRules(response, rule) } -func (this *GraphiteExecutor) executeRules(series []Series, rule m.AlertRule) *AlertResult { - for _, v := range series { - var avg float64 - var sum float64 - for _, dp := range v.Datapoints { - i, _ := dp[0].Float64() - sum += i - } - - avg = sum / float64(len(v.Datapoints)) - - if float64(rule.CritLevel) < avg { - return &AlertResult{State: m.AlertStateCritical, Id: rule.Id, ActualValue: avg} - } - - if float64(rule.WarnLevel) < avg { - return &AlertResult{State: m.AlertStateWarn, Id: rule.Id, ActualValue: avg} - } - - if float64(rule.CritLevel) < sum { - return &AlertResult{State: m.AlertStateCritical, Id: rule.Id, ActualValue: sum} - } - - if float64(rule.WarnLevel) < sum { - return &AlertResult{State: m.AlertStateWarn, Id: rule.Id, ActualValue: sum} - } - } - - return &AlertResult{State: m.AlertStateOk, Id: rule.Id} -} - -func (this *GraphiteExecutor) getSeries(rule m.AlertRule) (Response, error) { +func (this *GraphiteExecutor) getSeries(rule m.AlertRule) (GraphiteResponse, error) { query := &m.GetDataSourceByIdQuery{Id: rule.DatasourceId, OrgId: rule.OrgId} if err := bus.Dispatch(query); err != nil { return nil, err @@ -71,22 +38,19 @@ func (this *GraphiteExecutor) getSeries(rule m.AlertRule) (Response, error) { v := url.Values{ "format": []string{"json"}, - "target": []string{getTargetFromQuery(rule)}, + "target": []string{getTargetFromRule(rule)}, + "until": []string{"now"}, + "from": []string{"-" + rule.QueryRange}, } - v.Add("from", "-"+rule.QueryRange) - v.Add("until", "now") - - req := goreq.Request{ + res, err := goreq.Request{ Method: "POST", Uri: query.Result.Url + "/render", Body: v.Encode(), Timeout: 500 * time.Millisecond, - } + }.Do() - res, err := req.Do() - - response := Response{} + response := GraphiteResponse{} res.Body.FromJsonTo(&response) if err != nil { @@ -100,7 +64,7 @@ func (this *GraphiteExecutor) getSeries(rule m.AlertRule) (Response, error) { return response, nil } -func getTargetFromQuery(rule m.AlertRule) string { +func getTargetFromRule(rule m.AlertRule) string { json, _ := simplejson.NewJson([]byte(rule.Query)) return json.Get("target").MustString() diff --git a/pkg/services/alerting/rule_executor.go b/pkg/services/alerting/rule_executor.go new file mode 100644 index 00000000000..0512e4e11e1 --- /dev/null +++ b/pkg/services/alerting/rule_executor.go @@ -0,0 +1,35 @@ +package alerting + +import ( + m "github.com/grafana/grafana/pkg/models" +) + +func (this *GraphiteExecutor) executeRules(series []GraphiteSerie, rule m.AlertRule) *AlertResult { + for _, v := range series { + var avg float64 + var sum float64 + for _, dp := range v.Datapoints { + sum += dp[0] + } + + avg = sum / float64(len(v.Datapoints)) + + if float64(rule.CritLevel) < avg { + return &AlertResult{State: m.AlertStateCritical, Id: rule.Id, ActualValue: avg} + } + + if float64(rule.WarnLevel) < avg { + return &AlertResult{State: m.AlertStateWarn, Id: rule.Id, ActualValue: avg} + } + + if float64(rule.CritLevel) < sum { + return &AlertResult{State: m.AlertStateCritical, Id: rule.Id, ActualValue: sum} + } + + if float64(rule.WarnLevel) < sum { + return &AlertResult{State: m.AlertStateWarn, Id: rule.Id, ActualValue: sum} + } + } + + return &AlertResult{State: m.AlertStateOk, Id: rule.Id} +} diff --git a/pkg/services/alerting/types.go b/pkg/services/alerting/types.go new file mode 100644 index 00000000000..a9301035934 --- /dev/null +++ b/pkg/services/alerting/types.go @@ -0,0 +1,8 @@ +package alerting + +type TimeSeries struct { + Name string `json:"name"` + Points [][2]float64 `json:"points"` +} + +type TimeSeriesSlice []*TimeSeries