tech(alerting): use pointers for updating alertjobs

This commit is contained in:
bergquist 2016-06-03 08:33:04 +02:00
parent 910253bc42
commit cc65dd8bcf
6 changed files with 50 additions and 40 deletions

View File

@ -122,5 +122,5 @@ type AlertResult struct {
ActualValue float64
Duration float64
Description string
Rule AlertRule
AlertJob *AlertJob
}

View File

@ -20,8 +20,8 @@ func Init() {
reader := NewRuleReader()
go scheduler.dispatch(reader)
go scheduler.Executor(&ExecutorImpl{})
go scheduler.HandleResponses()
go scheduler.executor(&ExecutorImpl{})
go scheduler.handleResponses()
}
@ -65,11 +65,22 @@ func (scheduler *Scheduler) updateJobs(alertRuleFn func() []m.AlertRule) {
for i := 0; i < len(rules); i++ {
rule := rules[i]
jobs[rule.Id] = &m.AlertJob{
Rule: rule,
Offset: int64(i),
Running: false,
/*
jobs[rule.Id] = &m.AlertJob{
Offset: int64(i),
Running: false,
Rule: rule,
}
*/
job := &m.AlertJob{}
if scheduler.jobs[rule.Id] != nil {
job = scheduler.jobs[rule.Id]
}
job.Rule = rule
job.Offset = int64(i)
jobs[rule.Id] = job
}
log.Debug("Scheduler: Selected %d jobs", len(jobs))
@ -86,35 +97,33 @@ func (scheduler *Scheduler) queueJobs() {
}
}
func (scheduler *Scheduler) Executor(executor Executor) {
func (scheduler *Scheduler) executor(executor Executor) {
for job := range scheduler.runQueue {
//log.Info("Executor: queue length %d", len(this.runQueue))
log.Info("Executor: executing %s", job.Rule.Title)
scheduler.jobs[job.Rule.Id].Running = true
scheduler.MeasureAndExecute(executor, job)
job.Running = true
scheduler.measureAndExecute(executor, job)
}
}
func (scheduler *Scheduler) HandleResponses() {
func (scheduler *Scheduler) handleResponses() {
for response := range scheduler.responseQueue {
log.Info("Response: alert(%d) status(%s) actual(%v)", response.Id, response.State, response.ActualValue)
if scheduler.jobs[response.Id] != nil {
scheduler.jobs[response.Id].Running = false
}
log.Info("Response: alert(%d) status(%s) actual(%v) running(%v)", response.Id, response.State, response.ActualValue, response.AlertJob.Running)
response.AlertJob.Running = false
cmd := m.UpdateAlertStateCommand{
cmd := &m.UpdateAlertStateCommand{
AlertId: response.Id,
NewState: response.State,
Info: response.Description,
}
if err := bus.Dispatch(&cmd); err != nil {
log.Error(1, "failed to save state", err)
if err := bus.Dispatch(cmd); err != nil {
log.Error(2, "failed to save state %v", err)
}
}
}
func (scheduler *Scheduler) MeasureAndExecute(exec Executor, job *m.AlertJob) {
func (scheduler *Scheduler) measureAndExecute(exec Executor, job *m.AlertJob) {
now := time.Now()
responseChan := make(chan *m.AlertResult, 1)
@ -126,7 +135,7 @@ func (scheduler *Scheduler) MeasureAndExecute(exec Executor, job *m.AlertJob) {
Id: job.Rule.Id,
State: "timed out",
Duration: float64(time.Since(now).Nanoseconds()) / float64(1000000),
Rule: job.Rule,
AlertJob: job,
}
case result := <-responseChan:
result.Duration = float64(time.Since(now).Nanoseconds()) / float64(1000000)

View File

@ -26,7 +26,7 @@ func GetSeries(job *m.AlertJob) (m.TimeSeriesSlice, error) {
}
if query.Result.Type == m.DS_GRAPHITE {
return GraphiteClient{}.GetSeries(job, query.Result)
return GraphiteClient{}.GetSeries(*job, query.Result)
}
return nil, fmt.Errorf("Grafana does not support alerts for %s", query.Result.Type)

View File

@ -22,7 +22,7 @@ type GraphiteSerie struct {
type GraphiteResponse []GraphiteSerie
func (this GraphiteClient) GetSeries(rule *m.AlertJob, datasource m.DataSource) (m.TimeSeriesSlice, error) {
func (this GraphiteClient) GetSeries(rule m.AlertJob, datasource m.DataSource) (m.TimeSeriesSlice, error) {
v := url.Values{
"format": []string{"json"},
"target": []string{getTargetFromRule(rule.Rule)},
@ -39,9 +39,6 @@ func (this GraphiteClient) GetSeries(rule *m.AlertJob, datasource m.DataSource)
Timeout: 5 * time.Second,
}.Do()
response := GraphiteResponse{}
res.Body.FromJsonTo(&response)
if err != nil {
return nil, err
}
@ -50,6 +47,9 @@ func (this GraphiteClient) GetSeries(rule *m.AlertJob, datasource m.DataSource)
return nil, fmt.Errorf("expected httpstatus 200, found %d", res.StatusCode)
}
response := GraphiteResponse{}
res.Body.FromJsonTo(&response)
timeSeries := make([]*m.TimeSeries, 0)
for _, v := range response {

View File

@ -3,10 +3,11 @@ package alerting
import (
"fmt"
"math"
"github.com/grafana/grafana/pkg/log"
m "github.com/grafana/grafana/pkg/models"
b "github.com/grafana/grafana/pkg/services/alerting/datasources"
"math"
)
type Executor interface {
@ -80,13 +81,15 @@ func (this *ExecutorImpl) Execute(job *m.AlertJob, responseQueue chan *m.AlertRe
response, err := b.GetSeries(job)
if err != nil {
responseQueue <- &m.AlertResult{State: "PENDING", Id: job.Rule.Id, Rule: job.Rule}
responseQueue <- &m.AlertResult{State: "PENDING", Id: job.Rule.Id, AlertJob: job}
}
responseQueue <- this.ValidateRule(job.Rule, response)
result := this.validateRule(job.Rule, response)
result.AlertJob = job
responseQueue <- result
}
func (this *ExecutorImpl) ValidateRule(rule m.AlertRule, series m.TimeSeriesSlice) *m.AlertResult {
func (this *ExecutorImpl) validateRule(rule m.AlertRule, series m.TimeSeriesSlice) *m.AlertResult {
for _, serie := range series {
if aggregator[rule.Aggregator] == nil {
continue
@ -103,7 +106,6 @@ func (this *ExecutorImpl) ValidateRule(rule m.AlertRule, series m.TimeSeriesSlic
Id: rule.Id,
ActualValue: aggValue,
Description: fmt.Sprintf("Actual value: %1.2f for %s", aggValue, serie.Name),
Rule: rule,
}
}
@ -116,10 +118,9 @@ func (this *ExecutorImpl) ValidateRule(rule m.AlertRule, series m.TimeSeriesSlic
Id: rule.Id,
Description: fmt.Sprintf("Actual value: %1.2f for %s", aggValue, serie.Name),
ActualValue: aggValue,
Rule: rule,
}
}
}
return &m.AlertResult{State: m.AlertStateOk, Id: rule.Id, Rule: rule, Description: "Alert is OK!"}
return &m.AlertResult{State: m.AlertStateOk, Id: rule.Id, Description: "Alert is OK!"}
}

View File

@ -18,7 +18,7 @@ func TestAlertingExecutor(t *testing.T) {
m.NewTimeSeries("test1", [][2]float64{{2, 0}}),
}
result := executor.ValidateRule(rule, timeSeries)
result := executor.validateRule(rule, timeSeries)
So(result.State, ShouldEqual, m.AlertStateOk)
})
@ -29,7 +29,7 @@ func TestAlertingExecutor(t *testing.T) {
m.NewTimeSeries("test1", [][2]float64{{2, 0}}),
}
result := executor.ValidateRule(rule, timeSeries)
result := executor.validateRule(rule, timeSeries)
So(result.State, ShouldEqual, m.AlertStateCritical)
})
@ -40,7 +40,7 @@ func TestAlertingExecutor(t *testing.T) {
m.NewTimeSeries("test1", [][2]float64{{9, 0}, {9, 0}}),
}
result := executor.ValidateRule(rule, timeSeries)
result := executor.validateRule(rule, timeSeries)
So(result.State, ShouldEqual, m.AlertStateCritical)
})
@ -51,7 +51,7 @@ func TestAlertingExecutor(t *testing.T) {
m.NewTimeSeries("test1", [][2]float64{{9, 0}, {9, 0}}),
}
result := executor.ValidateRule(rule, timeSeries)
result := executor.validateRule(rule, timeSeries)
So(result.State, ShouldEqual, m.AlertStateOk)
})
@ -62,7 +62,7 @@ func TestAlertingExecutor(t *testing.T) {
m.NewTimeSeries("test1", [][2]float64{{11, 0}, {9, 0}}),
}
result := executor.ValidateRule(rule, timeSeries)
result := executor.validateRule(rule, timeSeries)
So(result.State, ShouldEqual, m.AlertStateOk)
})
@ -73,7 +73,7 @@ func TestAlertingExecutor(t *testing.T) {
m.NewTimeSeries("test1", [][2]float64{{1, 0}, {11, 0}}),
}
result := executor.ValidateRule(rule, timeSeries)
result := executor.validateRule(rule, timeSeries)
So(result.State, ShouldEqual, m.AlertStateCritical)
})
})
@ -87,7 +87,7 @@ func TestAlertingExecutor(t *testing.T) {
m.NewTimeSeries("test1", [][2]float64{{2, 0}}),
}
result := executor.ValidateRule(rule, timeSeries)
result := executor.validateRule(rule, timeSeries)
So(result.State, ShouldEqual, m.AlertStateOk)
})
@ -99,7 +99,7 @@ func TestAlertingExecutor(t *testing.T) {
m.NewTimeSeries("test1", [][2]float64{{11, 0}}),
}
result := executor.ValidateRule(rule, timeSeries)
result := executor.validateRule(rule, timeSeries)
So(result.State, ShouldEqual, m.AlertStateCritical)
})
})