diff --git a/pkg/services/alerting/alerting.go b/pkg/services/alerting/alerting.go index 7407133dd0f..911d4a06f5a 100644 --- a/pkg/services/alerting/alerting.go +++ b/pkg/services/alerting/alerting.go @@ -5,7 +5,6 @@ import ( "strconv" "time" - //"github.com/go-xorm/xorm" "github.com/grafana/grafana/pkg/log" m "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/setting" @@ -19,14 +18,16 @@ func Init() { log.Info("Alerting: Initializing scheduler...") scheduler := NewScheduler() - go scheduler.Dispatch() - go scheduler.Executor() + go scheduler.Dispatch(&AlertRuleReader{}) + go scheduler.Executor(&DummieExecutor{}) } type Scheduler struct { jobs []*AlertJob runQueue chan *AlertJob + alertRuleFetcher RuleReader + serverId string serverPosition int clusterSize int @@ -49,42 +50,31 @@ func (s *Scheduler) heartBeat() { s.serverPosition = 1 } -func (s *Scheduler) Dispatch() { +func (s *Scheduler) Dispatch(reader RuleReader) { reschedule := time.NewTicker(time.Second * 10) secondTicker := time.NewTicker(time.Second) ticker := time.NewTicker(time.Second * 5) s.heartBeat() - s.updateJobs() + s.updateJobs(reader) for { select { case <-secondTicker.C: s.queueJobs() case <-reschedule.C: - s.updateJobs() + s.updateJobs(reader) case <-ticker.C: s.heartBeat() } } } -func (s *Scheduler) getAlertRules() []m.AlertRule { - return []m.AlertRule{ - {Id: 1, Title: "alert rule 1", Interval: "10s", Frequency: 10}, - {Id: 2, Title: "alert rule 2", Interval: "10s", Frequency: 10}, - {Id: 3, Title: "alert rule 3", Interval: "10s", Frequency: 10}, - {Id: 4, Title: "alert rule 4", Interval: "10s", Frequency: 5}, - {Id: 5, Title: "alert rule 5", Interval: "10s", Frequency: 5}, - {Id: 6, Title: "alert rule 6", Interval: "10s", Frequency: 1}, - } -} - -func (s *Scheduler) updateJobs() { - log.Info("Scheduler: UpdateJobs()") +func (s *Scheduler) updateJobs(reader RuleReader) { + log.Debug("Scheduler: UpdateJobs()") jobs := make([]*AlertJob, 0) - rules := s.getAlertRules() + rules := reader.Fetch() for i := s.serverPosition - 1; i < len(rules); i = i + s.clusterSize { rule := rules[i] @@ -112,12 +102,12 @@ func (s *Scheduler) queueJobs() { } } -func (s *Scheduler) Executor() { +func (s *Scheduler) Executor(executor Executor) { for job := range s.runQueue { log.Info("Executor: queue length %d", len(s.runQueue)) log.Info("Executor: executing %s", job.name) - time.Sleep(1000) + executor.Execute(job.rule) } } @@ -130,9 +120,36 @@ type AlertJob struct { rule m.AlertRule } +type AlertResult struct { + id int64 + state string + duration time.Time +} + type RuleReader interface { + Fetch() []m.AlertRule +} + +type AlertRuleReader struct{} + +func (this AlertRuleReader) Fetch() []m.AlertRule { + return []m.AlertRule{ + {Id: 1, Title: "alert rule 1", Interval: "10s", Frequency: 10}, + {Id: 2, Title: "alert rule 2", Interval: "10s", Frequency: 10}, + {Id: 3, Title: "alert rule 3", Interval: "10s", Frequency: 10}, + {Id: 4, Title: "alert rule 4", Interval: "10s", Frequency: 5}, + {Id: 5, Title: "alert rule 5", Interval: "10s", Frequency: 5}, + {Id: 6, Title: "alert rule 6", Interval: "10s", Frequency: 1}, + } } type Executor interface { - Execute(rule *m.AlertRule) + Execute(rule m.AlertRule) (err error, result AlertResult) +} + +type DummieExecutor struct{} + +func (this DummieExecutor) Execute(rule m.AlertRule) (err error, result AlertResult) { + time.Sleep(1000) + return nil, AlertResult{state: "OK", id: rule.Id} } diff --git a/pkg/services/alerting/alerting_test.go b/pkg/services/alerting/alerting_test.go new file mode 100644 index 00000000000..d806a5d69ca --- /dev/null +++ b/pkg/services/alerting/alerting_test.go @@ -0,0 +1 @@ +package alerting