From c133a001258857261326cca7be21ea2904dbfe96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torkel=20=C3=96degaard?= Date: Tue, 17 May 2016 14:31:52 +0200 Subject: [PATCH] feat(alerting): minor progress on scheduler --- pkg/api/alerting/alerting.go | 34 --------- pkg/cmd/grafana-server/main.go | 2 + pkg/services/alerting/alerting.go | 114 ++++++++++++++++++++++++++++++ 3 files changed, 116 insertions(+), 34 deletions(-) delete mode 100644 pkg/api/alerting/alerting.go create mode 100644 pkg/services/alerting/alerting.go diff --git a/pkg/api/alerting/alerting.go b/pkg/api/alerting/alerting.go deleted file mode 100644 index 5a6dabf828b..00000000000 --- a/pkg/api/alerting/alerting.go +++ /dev/null @@ -1,34 +0,0 @@ -package alerting - -import ( - "time" - - m "github.com/grafana/grafana/pkg/models" -) - -func Init() { - go dispatcher() -} - -func dispatcher() { - - ticker := time.NewTicker(time.Second) - - for { - select { - case <-ticker.C: - scheduleJobs() - } - } -} - -func scheduleJobs() { - -} - -type Scheduler interface { -} - -type Executor interface { - Execute(rule *m.AlertRule) -} diff --git a/pkg/cmd/grafana-server/main.go b/pkg/cmd/grafana-server/main.go index b2c66ba185e..9da0e19c023 100644 --- a/pkg/cmd/grafana-server/main.go +++ b/pkg/cmd/grafana-server/main.go @@ -16,6 +16,7 @@ import ( "github.com/grafana/grafana/pkg/login" "github.com/grafana/grafana/pkg/metrics" "github.com/grafana/grafana/pkg/plugins" + "github.com/grafana/grafana/pkg/services/alerting" "github.com/grafana/grafana/pkg/services/eventpublisher" "github.com/grafana/grafana/pkg/services/notifications" "github.com/grafana/grafana/pkg/services/search" @@ -64,6 +65,7 @@ func main() { social.NewOAuthService() eventpublisher.Init() plugins.Init() + alerting.Init() if err := notifications.Init(); err != nil { log.Fatal(3, "Notification service failed to initialize", err) diff --git a/pkg/services/alerting/alerting.go b/pkg/services/alerting/alerting.go new file mode 100644 index 00000000000..f062e23f8b1 --- /dev/null +++ b/pkg/services/alerting/alerting.go @@ -0,0 +1,114 @@ +package alerting + +import ( + "time" + + "github.com/grafana/grafana/pkg/log" + m "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/setting" +) + +func Init() { + if !setting.AlertingEnabled { + return + } + + log.Info("Alerting: Initializing scheduler...") + + scheduler := NewScheduler() + go scheduler.Dispatch() + go scheduler.Executor() +} + +type Scheduler struct { + jobs []*AlertJob + runQueue chan *AlertJob +} + +func NewScheduler() *Scheduler { + return &Scheduler{ + jobs: make([]*AlertJob, 0), + runQueue: make(chan *AlertJob, 1000), + } +} + +func (s *Scheduler) Dispatch() { + reschedule := time.NewTicker(time.Second * 10) + secondTicker := time.NewTicker(time.Second) + + s.updateJobs() + + for { + select { + case <-secondTicker.C: + s.queueJobs() + case <-reschedule.C: + s.updateJobs() + } + } +} + +func (s *Scheduler) updateJobs() { + log.Info("Scheduler:updateJobs()") + + jobs := make([]*AlertJob, 0) + jobs = append(jobs, &AlertJob{ + name: "ID_1_Each 10s", + frequency: 10, + offset: 1, + }) + jobs = append(jobs, &AlertJob{ + name: "ID_2_Each 10s", + frequency: 10, + offset: 2, + }) + jobs = append(jobs, &AlertJob{ + name: "ID_3_Each 10s", + frequency: 10, + offset: 3, + }) + + jobs = append(jobs, &AlertJob{ + name: "ID_4_Each 5s", + frequency: 5, + }) + + s.jobs = jobs +} + +func (s *Scheduler) queueJobs() { + log.Info("Scheduler:queueJobs()") + + now := time.Now().Unix() + + for _, job := range s.jobs { + if now%job.frequency == 0 { + log.Info("Scheduler: Putting job on to run queue: %s", job.name) + s.runQueue <- job + } + } +} + +func (s *Scheduler) Executor() { + + for job := range s.runQueue { + log.Info("Executor: queue length %d", len(s.runQueue)) + log.Info("Executor: executing %s", job.name) + time.Sleep(1000) + } +} + +type AlertJob struct { + id int64 + name string + frequency int64 + offset int64 + delay bool +} + +type RuleReader interface { +} + +type Executor interface { + Execute(rule *m.AlertRule) +}