mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
feat(alerting): add copy of AlertRule to AlertResult
This commit is contained in:
@@ -122,4 +122,5 @@ type AlertResult struct {
|
||||
State string
|
||||
ActualValue float64
|
||||
Duration float64
|
||||
Rule AlertRule
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ var (
|
||||
|
||||
func (this *AlertRuleReader) initReader() {
|
||||
alertJobs = make([]m.AlertJob, 0)
|
||||
heartbeat := time.NewTicker(time.Second * 5)
|
||||
heartbeat := time.NewTicker(time.Second * 10)
|
||||
this.rr()
|
||||
|
||||
for {
|
||||
|
||||
@@ -43,7 +43,7 @@ func NewScheduler() *Scheduler {
|
||||
}
|
||||
|
||||
func (this *Scheduler) Dispatch(reader RuleReader) {
|
||||
reschedule := time.NewTicker(time.Second * 5)
|
||||
reschedule := time.NewTicker(time.Second * 10)
|
||||
secondTicker := time.NewTicker(time.Second)
|
||||
|
||||
this.updateJobs(reader.Fetch)
|
||||
@@ -66,19 +66,16 @@ func (this *Scheduler) updateJobs(f func() []m.AlertJob) {
|
||||
|
||||
for i := 0; i < len(rules); i++ {
|
||||
rule := rules[i]
|
||||
//jobs[rule.Rule.Id] = &m.AlertJob{Rule: rule, Offset: int64(len(jobs))}
|
||||
rule.Offset = int64(len(jobs))
|
||||
rule.Offset = int64(i)
|
||||
jobs[rule.Rule.Id] = &rule
|
||||
}
|
||||
|
||||
log.Debug("Scheduler: Selected %d jobs", len(jobs))
|
||||
|
||||
this.jobs = jobs
|
||||
}
|
||||
|
||||
func (this *Scheduler) queueJobs() {
|
||||
now := time.Now().Unix()
|
||||
|
||||
for _, job := range this.jobs {
|
||||
if now%job.Rule.Frequency == 0 && job.Running == false {
|
||||
log.Info("Scheduler: Putting job on to run queue: %s", job.Rule.Title)
|
||||
@@ -118,18 +115,23 @@ func (this *Scheduler) HandleResponses() {
|
||||
}
|
||||
}
|
||||
|
||||
func (this *Scheduler) MeasureAndExecute(exec Executor, rule *m.AlertJob) {
|
||||
func (this *Scheduler) MeasureAndExecute(exec Executor, job *m.AlertJob) {
|
||||
now := time.Now()
|
||||
|
||||
response := make(chan *m.AlertResult, 1)
|
||||
go exec.Execute(rule, response)
|
||||
responseChan := make(chan *m.AlertResult, 1)
|
||||
go exec.Execute(job, responseChan)
|
||||
|
||||
select {
|
||||
case <-time.After(time.Second * 5):
|
||||
this.responseQueue <- &m.AlertResult{Id: rule.Rule.Id, State: "timed out", Duration: float64(time.Since(now).Nanoseconds()) / float64(1000000)}
|
||||
case r := <-response:
|
||||
r.Duration = float64(time.Since(now).Nanoseconds()) / float64(1000000)
|
||||
log.Info("Schedular: exeuction took %vms", r.Duration)
|
||||
this.responseQueue <- r
|
||||
this.responseQueue <- &m.AlertResult{
|
||||
Id: job.Rule.Id,
|
||||
State: "timed out",
|
||||
Duration: float64(time.Since(now).Nanoseconds()) / float64(1000000),
|
||||
Rule: job.Rule,
|
||||
}
|
||||
case result := <-responseChan:
|
||||
result.Duration = float64(time.Since(now).Nanoseconds()) / float64(1000000)
|
||||
log.Info("Schedular: exeuction took %vms", result.Duration)
|
||||
this.responseQueue <- result
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package alerting
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
m "github.com/grafana/grafana/pkg/models"
|
||||
"github.com/grafana/grafana/pkg/services/alerting/graphite"
|
||||
)
|
||||
@@ -30,14 +31,22 @@ var aggregator map[string]aggregationFn = map[string]aggregationFn{
|
||||
"mean": func(series *m.TimeSeries) float64 { return series.Mean },
|
||||
}
|
||||
|
||||
func (this *ExecutorImpl) Execute(rule *m.AlertJob, responseQueue chan *m.AlertResult) {
|
||||
response, err := graphite.GraphiteClient{}.GetSeries(rule)
|
||||
|
||||
if err != nil {
|
||||
responseQueue <- &m.AlertResult{State: "PENDING", Id: rule.Rule.Id}
|
||||
func (this *ExecutorImpl) GetSeries(job *m.AlertJob) (m.TimeSeriesSlice, error) {
|
||||
if job.Datasource.Type == m.DS_GRAPHITE {
|
||||
return graphite.GraphiteClient{}.GetSeries(job)
|
||||
}
|
||||
|
||||
responseQueue <- this.ValidateRule(rule.Rule, response)
|
||||
return nil, fmt.Errorf("Grafana does not support alerts for %s", job.Datasource.Type)
|
||||
}
|
||||
|
||||
func (this *ExecutorImpl) Execute(job *m.AlertJob, responseQueue chan *m.AlertResult) {
|
||||
response, err := this.GetSeries(job)
|
||||
|
||||
if err != nil {
|
||||
responseQueue <- &m.AlertResult{State: "PENDING", Id: job.Rule.Id, Rule: job.Rule}
|
||||
}
|
||||
|
||||
responseQueue <- this.ValidateRule(job.Rule, response)
|
||||
}
|
||||
|
||||
func (this *ExecutorImpl) ValidateRule(rule m.AlertRule, series m.TimeSeriesSlice) *m.AlertResult {
|
||||
@@ -49,13 +58,13 @@ func (this *ExecutorImpl) ValidateRule(rule m.AlertRule, series m.TimeSeriesSlic
|
||||
var aggValue = aggregator[rule.Aggregator](serie)
|
||||
|
||||
if operators[rule.CritOperator](aggValue, float64(rule.CritLevel)) {
|
||||
return &m.AlertResult{State: m.AlertStateCritical, Id: rule.Id, ActualValue: aggValue}
|
||||
return &m.AlertResult{State: m.AlertStateCritical, Id: rule.Id, ActualValue: aggValue, Rule: rule}
|
||||
}
|
||||
|
||||
if operators[rule.WarnOperator](aggValue, float64(rule.WarnLevel)) {
|
||||
return &m.AlertResult{State: m.AlertStateWarn, Id: rule.Id, ActualValue: aggValue}
|
||||
return &m.AlertResult{State: m.AlertStateWarn, Id: rule.Id, ActualValue: aggValue, Rule: rule}
|
||||
}
|
||||
}
|
||||
|
||||
return &m.AlertResult{State: m.AlertStateOk, Id: rule.Id}
|
||||
return &m.AlertResult{State: m.AlertStateOk, Id: rule.Id, Rule: rule}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user