From 910253bc42aa83c2f00bd92ffc29978846e4b2e4 Mon Sep 17 00:00:00 2001 From: bergquist Date: Fri, 3 Jun 2016 07:14:40 +0200 Subject: [PATCH] tech(alerting): remove datasource ref from alertjob --- pkg/models/alerts.go | 9 ++- pkg/services/alerting/alert_rule_reader.go | 64 +++++++------------ pkg/services/alerting/alerting.go | 51 ++++++++------- pkg/services/alerting/datasources/backends.go | 20 ++++-- pkg/services/alerting/datasources/graphite.go | 13 ++-- 5 files changed, 78 insertions(+), 79 deletions(-) diff --git a/pkg/models/alerts.go b/pkg/models/alerts.go index bfcac66aaf2..e7ef0c178e2 100644 --- a/pkg/models/alerts.go +++ b/pkg/models/alerts.go @@ -110,11 +110,10 @@ type GetAlertChangesQuery struct { } type AlertJob struct { - Offset int64 - Delay bool - Running bool - Rule AlertRule - Datasource DataSource + Offset int64 + Delay bool + Running bool + Rule AlertRule } type AlertResult struct { diff --git a/pkg/services/alerting/alert_rule_reader.go b/pkg/services/alerting/alert_rule_reader.go index 79d8a114d92..1dbedcedf86 100644 --- a/pkg/services/alerting/alert_rule_reader.go +++ b/pkg/services/alerting/alert_rule_reader.go @@ -1,21 +1,22 @@ package alerting import ( - "github.com/grafana/grafana/pkg/bus" - m "github.com/grafana/grafana/pkg/models" "sync" "time" + + "github.com/grafana/grafana/pkg/bus" + m "github.com/grafana/grafana/pkg/models" ) type RuleReader interface { - Fetch() []m.AlertJob + Fetch() []m.AlertRule } type AlertRuleReader struct { - serverId string + sync.RWMutex + serverID string serverPosition int clusterSize int - mtx sync.RWMutex } func NewRuleReader() *AlertRuleReader { @@ -26,27 +27,29 @@ func NewRuleReader() *AlertRuleReader { } var ( - alertJobs []m.AlertJob + alertJobs []m.AlertRule ) -func (this *AlertRuleReader) initReader() { - alertJobs = make([]m.AlertJob, 0) +func (arr *AlertRuleReader) Fetch() []m.AlertRule { + return alertJobs +} + +func (arr *AlertRuleReader) initReader() { + alertJobs = make([]m.AlertRule, 0) heartbeat := time.NewTicker(time.Second * 10) - this.rr() + arr.updateRules() for { select { case <-heartbeat.C: - this.rr() + arr.updateRules() } } } -func (this *AlertRuleReader) rr() { - this.mtx.Lock() - defer this.mtx.Unlock() - - rules := make([]m.AlertRule, 0) +func (arr *AlertRuleReader) updateRules() { + arr.Lock() + defer arr.Unlock() /* rules = []m.AlertRule{ @@ -76,38 +79,19 @@ func (this *AlertRuleReader) rr() { cmd := &m.GetAlertsQuery{ OrgId: 1, } - bus.Dispatch(cmd) - rules = cmd.Result - //for i := this.serverPosition - 1; i < len(rules); i += this.clusterSize { + err := bus.Dispatch(cmd) - jobs := make([]m.AlertJob, 0) - for _, rule := range rules { - query := &m.GetDataSourceByIdQuery{Id: rule.DatasourceId, OrgId: rule.OrgId} - err := bus.Dispatch(query) - - if err != nil { - continue - } - - jobs = append(jobs, m.AlertJob{ - Rule: rule, - Datasource: query.Result, - }) + if err == nil { + alertJobs = cmd.Result } - - alertJobs = jobs } -func (this *AlertRuleReader) Fetch() []m.AlertJob { - return alertJobs -} - -func (this *AlertRuleReader) heartBeat() { +func (arr *AlertRuleReader) heartBeat() { //Lets cheat on this until we focus on clustering //log.Info("Heartbeat: Sending heartbeat from " + this.serverId) - this.clusterSize = 1 - this.serverPosition = 1 + arr.clusterSize = 1 + arr.serverPosition = 1 /* cmd := &m.HeartBeatCommand{ServerId: this.serverId} diff --git a/pkg/services/alerting/alerting.go b/pkg/services/alerting/alerting.go index d05b143fafc..19fce39b8ee 100644 --- a/pkg/services/alerting/alerting.go +++ b/pkg/services/alerting/alerting.go @@ -19,7 +19,7 @@ func Init() { scheduler := NewScheduler() reader := NewRuleReader() - go scheduler.Dispatch(reader) + go scheduler.dispatch(reader) go scheduler.Executor(&ExecutorImpl{}) go scheduler.HandleResponses() @@ -41,62 +41,65 @@ func NewScheduler() *Scheduler { } } -func (this *Scheduler) Dispatch(reader RuleReader) { +func (scheduler *Scheduler) dispatch(reader RuleReader) { reschedule := time.NewTicker(time.Second * 10) secondTicker := time.NewTicker(time.Second) - this.updateJobs(reader.Fetch) + scheduler.updateJobs(reader.Fetch) for { select { case <-secondTicker.C: - this.queueJobs() + scheduler.queueJobs() case <-reschedule.C: - this.updateJobs(reader.Fetch) + scheduler.updateJobs(reader.Fetch) } } } -func (this *Scheduler) updateJobs(f func() []m.AlertJob) { +func (scheduler *Scheduler) updateJobs(alertRuleFn func() []m.AlertRule) { log.Debug("Scheduler: UpdateJobs()") jobs := make(map[int64]*m.AlertJob, 0) - rules := f() + rules := alertRuleFn() for i := 0; i < len(rules); i++ { rule := rules[i] - rule.Offset = int64(i) - jobs[rule.Rule.Id] = &rule + jobs[rule.Id] = &m.AlertJob{ + Rule: rule, + Offset: int64(i), + Running: false, + } } log.Debug("Scheduler: Selected %d jobs", len(jobs)) - this.jobs = jobs + scheduler.jobs = jobs } -func (this *Scheduler) queueJobs() { +func (scheduler *Scheduler) queueJobs() { now := time.Now().Unix() - for _, job := range this.jobs { + for _, job := range scheduler.jobs { if now%job.Rule.Frequency == 0 && job.Running == false { log.Info("Scheduler: Putting job on to run queue: %s", job.Rule.Title) - this.runQueue <- job + scheduler.runQueue <- job } } } -func (this *Scheduler) Executor(executor Executor) { - for job := range this.runQueue { +func (scheduler *Scheduler) Executor(executor Executor) { + for job := range scheduler.runQueue { //log.Info("Executor: queue length %d", len(this.runQueue)) log.Info("Executor: executing %s", job.Rule.Title) - this.jobs[job.Rule.Id].Running = true - this.MeasureAndExecute(executor, job) + scheduler.jobs[job.Rule.Id].Running = true + scheduler.MeasureAndExecute(executor, job) } } -func (this *Scheduler) HandleResponses() { - for response := range this.responseQueue { +func (scheduler *Scheduler) HandleResponses() { + for response := range scheduler.responseQueue { log.Info("Response: alert(%d) status(%s) actual(%v)", response.Id, response.State, response.ActualValue) - if this.jobs[response.Id] != nil { - this.jobs[response.Id].Running = false + if scheduler.jobs[response.Id] != nil { + scheduler.jobs[response.Id].Running = false } cmd := m.UpdateAlertStateCommand{ @@ -111,7 +114,7 @@ func (this *Scheduler) HandleResponses() { } } -func (this *Scheduler) MeasureAndExecute(exec Executor, job *m.AlertJob) { +func (scheduler *Scheduler) MeasureAndExecute(exec Executor, job *m.AlertJob) { now := time.Now() responseChan := make(chan *m.AlertResult, 1) @@ -119,7 +122,7 @@ func (this *Scheduler) MeasureAndExecute(exec Executor, job *m.AlertJob) { select { case <-time.After(time.Second * 5): - this.responseQueue <- &m.AlertResult{ + scheduler.responseQueue <- &m.AlertResult{ Id: job.Rule.Id, State: "timed out", Duration: float64(time.Since(now).Nanoseconds()) / float64(1000000), @@ -128,6 +131,6 @@ func (this *Scheduler) MeasureAndExecute(exec Executor, job *m.AlertJob) { case result := <-responseChan: result.Duration = float64(time.Since(now).Nanoseconds()) / float64(1000000) log.Info("Schedular: exeuction took %vms", result.Duration) - this.responseQueue <- result + scheduler.responseQueue <- result } } diff --git a/pkg/services/alerting/datasources/backends.go b/pkg/services/alerting/datasources/backends.go index e4930a55900..0a74c15804b 100644 --- a/pkg/services/alerting/datasources/backends.go +++ b/pkg/services/alerting/datasources/backends.go @@ -3,19 +3,31 @@ package graphite import ( "fmt" + "github.com/grafana/grafana/pkg/bus" m "github.com/grafana/grafana/pkg/models" ) // AlertDatasource is bacon type AlertDatasource interface { - GetSeries(job *m.AlertJob) (m.TimeSeriesSlice, error) + GetSeries(job *m.AlertJob, datasource m.DataSource) (m.TimeSeriesSlice, error) } // GetSeries returns timeseries data from the datasource func GetSeries(job *m.AlertJob) (m.TimeSeriesSlice, error) { - if job.Datasource.Type == m.DS_GRAPHITE { - return GraphiteClient{}.GetSeries(job) + query := &m.GetDataSourceByIdQuery{ + Id: job.Rule.DatasourceId, + OrgId: job.Rule.OrgId, } - return nil, fmt.Errorf("Grafana does not support alerts for %s", job.Datasource.Type) + err := bus.Dispatch(query) + + if err != nil { + return nil, fmt.Errorf("Could not find datasource for %d", job.Rule.DatasourceId) + } + + if query.Result.Type == m.DS_GRAPHITE { + return GraphiteClient{}.GetSeries(job, query.Result) + } + + return nil, fmt.Errorf("Grafana does not support alerts for %s", query.Result.Type) } diff --git a/pkg/services/alerting/datasources/graphite.go b/pkg/services/alerting/datasources/graphite.go index d071d3dcc79..767e2ec03de 100644 --- a/pkg/services/alerting/datasources/graphite.go +++ b/pkg/services/alerting/datasources/graphite.go @@ -2,14 +2,15 @@ package graphite import ( "fmt" - "github.com/franela/goreq" - "github.com/grafana/grafana/pkg/cmd/grafana-cli/log" - "github.com/grafana/grafana/pkg/components/simplejson" - m "github.com/grafana/grafana/pkg/models" "net/http" "net/url" "strconv" "time" + + "github.com/franela/goreq" + "github.com/grafana/grafana/pkg/cmd/grafana-cli/log" + "github.com/grafana/grafana/pkg/components/simplejson" + m "github.com/grafana/grafana/pkg/models" ) type GraphiteClient struct{} @@ -21,7 +22,7 @@ type GraphiteSerie struct { type GraphiteResponse []GraphiteSerie -func (this GraphiteClient) GetSeries(rule *m.AlertJob) (m.TimeSeriesSlice, error) { +func (this GraphiteClient) GetSeries(rule *m.AlertJob, datasource m.DataSource) (m.TimeSeriesSlice, error) { v := url.Values{ "format": []string{"json"}, "target": []string{getTargetFromRule(rule.Rule)}, @@ -33,7 +34,7 @@ func (this GraphiteClient) GetSeries(rule *m.AlertJob) (m.TimeSeriesSlice, error res, err := goreq.Request{ Method: "POST", - Uri: rule.Datasource.Url + "/render", + Uri: datasource.Url + "/render", Body: v.Encode(), Timeout: 5 * time.Second, }.Do()