feat(alerting): alerting scheduling distribution, only distibutes it on seconds for now, not sub second distribution, #5854

This commit is contained in:
Torkel Ödegaard
2016-09-05 14:26:08 +02:00
parent e2f5bf1666
commit 4c5461d4ba
2 changed files with 29 additions and 9 deletions

View File

@@ -1,10 +1,11 @@
package alerting package alerting
type Job struct { type Job struct {
Offset int64 Offset int64
Delay bool OffsetWait bool
Running bool Delay bool
Rule *Rule Running bool
Rule *Rule
} }
type ResultLogEntry struct { type ResultLogEntry struct {

View File

@@ -1,6 +1,7 @@
package alerting package alerting
import ( import (
"math"
"time" "time"
"github.com/grafana/grafana/pkg/log" "github.com/grafana/grafana/pkg/log"
@@ -34,8 +35,8 @@ func (s *SchedulerImpl) Update(rules []*Rule) {
} }
job.Rule = rule job.Rule = rule
job.Offset = int64(i) job.Offset = ((rule.Frequency * 1000) / int64(len(rules))) * int64(i)
job.Offset = int64(math.Floor(float64(job.Offset) / 1000))
jobs[rule.Id] = job jobs[rule.Id] = job
} }
@@ -46,9 +47,27 @@ func (s *SchedulerImpl) Tick(tickTime time.Time, execQueue chan *Job) {
now := tickTime.Unix() now := tickTime.Unix()
for _, job := range s.jobs { for _, job := range s.jobs {
if now%job.Rule.Frequency == 0 && job.Running == false { if job.Running {
s.log.Debug("Scheduler: Putting job on to exec queue", "name", job.Rule.Name) continue
execQueue <- job }
if job.OffsetWait && now%job.Offset == 0 {
job.OffsetWait = false
s.enque(job, execQueue)
continue
}
if now%job.Rule.Frequency == 0 {
if job.Offset > 0 {
job.OffsetWait = true
} else {
s.enque(job, execQueue)
}
} }
} }
} }
func (s *SchedulerImpl) enque(job *Job, execQueue chan *Job) {
s.log.Debug("Scheduler: Putting job on to exec queue", "name", job.Rule.Name, "id", job.Rule.Id)
execQueue <- job
}