diff --git a/pkg/cmd/grafana-server/main.go b/pkg/cmd/grafana-server/main.go index f795a3ae42d..9da0e19c023 100644 --- a/pkg/cmd/grafana-server/main.go +++ b/pkg/cmd/grafana-server/main.go @@ -16,7 +16,7 @@ import ( "github.com/grafana/grafana/pkg/login" "github.com/grafana/grafana/pkg/metrics" "github.com/grafana/grafana/pkg/plugins" - //"github.com/grafana/grafana/pkg/services/alerting" + "github.com/grafana/grafana/pkg/services/alerting" "github.com/grafana/grafana/pkg/services/eventpublisher" "github.com/grafana/grafana/pkg/services/notifications" "github.com/grafana/grafana/pkg/services/search" @@ -65,7 +65,7 @@ func main() { social.NewOAuthService() eventpublisher.Init() plugins.Init() - //alerting.Init() + alerting.Init() if err := notifications.Init(); err != nil { log.Fatal(3, "Notification service failed to initialize", err) diff --git a/pkg/models/alerts.go b/pkg/models/alerts.go index 79fc20cd938..c5cba44022d 100644 --- a/pkg/models/alerts.go +++ b/pkg/models/alerts.go @@ -19,17 +19,23 @@ type AlertRule struct { WarnOperator string `json:"warnOperator"` CritOperator string `json:"critOperator"` Interval string `json:"interval"` - //Frequency int64 `json:"frequency"` - Title string `json:"title"` - Description string `json:"description"` - QueryRange string `json:"queryRange"` - Aggregator string `json:"aggregator"` - State string `json:"state"` + Frequency int64 `json:"frequency"` + Title string `json:"title"` + Description string `json:"description"` + QueryRange string `json:"queryRange"` + Aggregator string `json:"aggregator"` + State string `json:"state"` Created time.Time `json:"created"` Updated time.Time `json:"updated"` } +type HeartBeat struct { + ServerId string + Updated time.Time + Created time.Time +} + type AlertRuleChange struct { Id int64 `json:"id"` OrgId int64 `json:"-"` diff --git a/pkg/services/alerting/alerting.go b/pkg/services/alerting/alerting.go index f062e23f8b1..7407133dd0f 100644 --- a/pkg/services/alerting/alerting.go +++ b/pkg/services/alerting/alerting.go @@ -1,8 +1,11 @@ package alerting import ( + "math/rand" + "strconv" "time" + //"github.com/go-xorm/xorm" "github.com/grafana/grafana/pkg/log" m "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/setting" @@ -23,19 +26,35 @@ func Init() { type Scheduler struct { jobs []*AlertJob runQueue chan *AlertJob + + serverId string + serverPosition int + clusterSize int } func NewScheduler() *Scheduler { return &Scheduler{ jobs: make([]*AlertJob, 0), runQueue: make(chan *AlertJob, 1000), + serverId: strconv.Itoa(rand.Intn(1000)), } } +func (s *Scheduler) heartBeat() { + //write heartBeat to db. + //get the modulus position of active servers + + log.Info("Heartbeat: Sending heartbeat from " + s.serverId) + s.clusterSize = 1 + s.serverPosition = 1 +} + func (s *Scheduler) Dispatch() { reschedule := time.NewTicker(time.Second * 10) secondTicker := time.NewTicker(time.Second) + ticker := time.NewTicker(time.Second * 5) + s.heartBeat() s.updateJobs() for { @@ -44,41 +63,45 @@ func (s *Scheduler) Dispatch() { s.queueJobs() case <-reschedule.C: s.updateJobs() + case <-ticker.C: + s.heartBeat() } } } +func (s *Scheduler) getAlertRules() []m.AlertRule { + return []m.AlertRule{ + {Id: 1, Title: "alert rule 1", Interval: "10s", Frequency: 10}, + {Id: 2, Title: "alert rule 2", Interval: "10s", Frequency: 10}, + {Id: 3, Title: "alert rule 3", Interval: "10s", Frequency: 10}, + {Id: 4, Title: "alert rule 4", Interval: "10s", Frequency: 5}, + {Id: 5, Title: "alert rule 5", Interval: "10s", Frequency: 5}, + {Id: 6, Title: "alert rule 6", Interval: "10s", Frequency: 1}, + } +} + func (s *Scheduler) updateJobs() { - log.Info("Scheduler:updateJobs()") + log.Info("Scheduler: UpdateJobs()") jobs := make([]*AlertJob, 0) - jobs = append(jobs, &AlertJob{ - name: "ID_1_Each 10s", - frequency: 10, - offset: 1, - }) - jobs = append(jobs, &AlertJob{ - name: "ID_2_Each 10s", - frequency: 10, - offset: 2, - }) - jobs = append(jobs, &AlertJob{ - name: "ID_3_Each 10s", - frequency: 10, - offset: 3, - }) + rules := s.getAlertRules() - jobs = append(jobs, &AlertJob{ - name: "ID_4_Each 5s", - frequency: 5, - }) + for i := s.serverPosition - 1; i < len(rules); i = i + s.clusterSize { + rule := rules[i] + jobs = append(jobs, &AlertJob{ + name: rule.Title, + frequency: rule.Frequency, + rule: rule, + offset: int64(len(jobs)), + }) + } + + log.Debug("Scheduler: Selected %d jobs", len(jobs)) s.jobs = jobs } func (s *Scheduler) queueJobs() { - log.Info("Scheduler:queueJobs()") - now := time.Now().Unix() for _, job := range s.jobs { @@ -104,6 +127,7 @@ type AlertJob struct { frequency int64 offset int64 delay bool + rule m.AlertRule } type RuleReader interface { diff --git a/pkg/services/sqlstore/migrations/alert_mig.go b/pkg/services/sqlstore/migrations/alert_mig.go index ddac692231e..ef120a98791 100644 --- a/pkg/services/sqlstore/migrations/alert_mig.go +++ b/pkg/services/sqlstore/migrations/alert_mig.go @@ -5,11 +5,13 @@ import ( ) func addAlertMigrations(mg *Migrator) { + alertV1 := Table{ Name: "alert_rule", Columns: []*Column{ {Name: "id", Type: DB_BigInt, IsPrimaryKey: true, IsAutoIncrement: true}, {Name: "dashboard_id", Type: DB_BigInt, Nullable: false}, + //{Name: "datasource_id", Type: DB_BigInt, Nullable: false}, {Name: "panel_id", Type: DB_BigInt, Nullable: false}, {Name: "org_id", Type: DB_BigInt, Nullable: false}, {Name: "query", Type: DB_Text, Nullable: false}, @@ -19,6 +21,7 @@ func addAlertMigrations(mg *Migrator) { {Name: "crit_level", Type: DB_BigInt, Nullable: false}, {Name: "crit_operator", Type: DB_NVarchar, Length: 10, Nullable: false}, {Name: "interval", Type: DB_NVarchar, Length: 255, Nullable: false}, + {Name: "frequency", Type: DB_BigInt, Nullable: false}, {Name: "title", Type: DB_NVarchar, Length: 255, Nullable: false}, {Name: "description", Type: DB_NVarchar, Length: 255, Nullable: false}, {Name: "query_range", Type: DB_NVarchar, Length: 255, Nullable: false}, @@ -58,4 +61,17 @@ func addAlertMigrations(mg *Migrator) { } mg.AddMigration("create alert_state_log table v1", NewAddTableMigration(alert_state_log)) + + alert_heartbeat := Table{ + Name: "alert_heartbeat", + Columns: []*Column{ + {Name: "id", Type: DB_BigInt, IsPrimaryKey: true, IsAutoIncrement: true}, + {Name: "server_id", Type: DB_NVarchar, Length: 50, Nullable: false}, + {Name: "created", Type: DB_DateTime, Nullable: false}, + {Name: "updated", Type: DB_DateTime, Nullable: false}, + }, + } + + mg.AddMigration("create alert_heartbeat table v1", NewAddTableMigration(alert_heartbeat)) + }