tech(alerting): remove datasource ref from alertjob

This commit is contained in:
bergquist
2016-06-03 07:14:40 +02:00
parent 0bea0cc5b9
commit 910253bc42
5 changed files with 78 additions and 79 deletions

View File

@@ -114,7 +114,6 @@ type AlertJob struct {
Delay bool Delay bool
Running bool Running bool
Rule AlertRule Rule AlertRule
Datasource DataSource
} }
type AlertResult struct { type AlertResult struct {

View File

@@ -1,21 +1,22 @@
package alerting package alerting
import ( import (
"github.com/grafana/grafana/pkg/bus"
m "github.com/grafana/grafana/pkg/models"
"sync" "sync"
"time" "time"
"github.com/grafana/grafana/pkg/bus"
m "github.com/grafana/grafana/pkg/models"
) )
type RuleReader interface { type RuleReader interface {
Fetch() []m.AlertJob Fetch() []m.AlertRule
} }
type AlertRuleReader struct { type AlertRuleReader struct {
serverId string sync.RWMutex
serverID string
serverPosition int serverPosition int
clusterSize int clusterSize int
mtx sync.RWMutex
} }
func NewRuleReader() *AlertRuleReader { func NewRuleReader() *AlertRuleReader {
@@ -26,27 +27,29 @@ func NewRuleReader() *AlertRuleReader {
} }
var ( var (
alertJobs []m.AlertJob alertJobs []m.AlertRule
) )
func (this *AlertRuleReader) initReader() { func (arr *AlertRuleReader) Fetch() []m.AlertRule {
alertJobs = make([]m.AlertJob, 0) return alertJobs
}
func (arr *AlertRuleReader) initReader() {
alertJobs = make([]m.AlertRule, 0)
heartbeat := time.NewTicker(time.Second * 10) heartbeat := time.NewTicker(time.Second * 10)
this.rr() arr.updateRules()
for { for {
select { select {
case <-heartbeat.C: case <-heartbeat.C:
this.rr() arr.updateRules()
} }
} }
} }
func (this *AlertRuleReader) rr() { func (arr *AlertRuleReader) updateRules() {
this.mtx.Lock() arr.Lock()
defer this.mtx.Unlock() defer arr.Unlock()
rules := make([]m.AlertRule, 0)
/* /*
rules = []m.AlertRule{ rules = []m.AlertRule{
@@ -76,38 +79,19 @@ func (this *AlertRuleReader) rr() {
cmd := &m.GetAlertsQuery{ cmd := &m.GetAlertsQuery{
OrgId: 1, OrgId: 1,
} }
bus.Dispatch(cmd) err := bus.Dispatch(cmd)
rules = cmd.Result
//for i := this.serverPosition - 1; i < len(rules); i += this.clusterSize {
jobs := make([]m.AlertJob, 0) if err == nil {
for _, rule := range rules { alertJobs = cmd.Result
query := &m.GetDataSourceByIdQuery{Id: rule.DatasourceId, OrgId: rule.OrgId}
err := bus.Dispatch(query)
if err != nil {
continue
} }
jobs = append(jobs, m.AlertJob{
Rule: rule,
Datasource: query.Result,
})
}
alertJobs = jobs
} }
func (this *AlertRuleReader) Fetch() []m.AlertJob { func (arr *AlertRuleReader) heartBeat() {
return alertJobs
}
func (this *AlertRuleReader) heartBeat() {
//Lets cheat on this until we focus on clustering //Lets cheat on this until we focus on clustering
//log.Info("Heartbeat: Sending heartbeat from " + this.serverId) //log.Info("Heartbeat: Sending heartbeat from " + this.serverId)
this.clusterSize = 1 arr.clusterSize = 1
this.serverPosition = 1 arr.serverPosition = 1
/* /*
cmd := &m.HeartBeatCommand{ServerId: this.serverId} cmd := &m.HeartBeatCommand{ServerId: this.serverId}

View File

@@ -19,7 +19,7 @@ func Init() {
scheduler := NewScheduler() scheduler := NewScheduler()
reader := NewRuleReader() reader := NewRuleReader()
go scheduler.Dispatch(reader) go scheduler.dispatch(reader)
go scheduler.Executor(&ExecutorImpl{}) go scheduler.Executor(&ExecutorImpl{})
go scheduler.HandleResponses() go scheduler.HandleResponses()
@@ -41,62 +41,65 @@ func NewScheduler() *Scheduler {
} }
} }
func (this *Scheduler) Dispatch(reader RuleReader) { func (scheduler *Scheduler) dispatch(reader RuleReader) {
reschedule := time.NewTicker(time.Second * 10) reschedule := time.NewTicker(time.Second * 10)
secondTicker := time.NewTicker(time.Second) secondTicker := time.NewTicker(time.Second)
this.updateJobs(reader.Fetch) scheduler.updateJobs(reader.Fetch)
for { for {
select { select {
case <-secondTicker.C: case <-secondTicker.C:
this.queueJobs() scheduler.queueJobs()
case <-reschedule.C: case <-reschedule.C:
this.updateJobs(reader.Fetch) scheduler.updateJobs(reader.Fetch)
} }
} }
} }
func (this *Scheduler) updateJobs(f func() []m.AlertJob) { func (scheduler *Scheduler) updateJobs(alertRuleFn func() []m.AlertRule) {
log.Debug("Scheduler: UpdateJobs()") log.Debug("Scheduler: UpdateJobs()")
jobs := make(map[int64]*m.AlertJob, 0) jobs := make(map[int64]*m.AlertJob, 0)
rules := f() rules := alertRuleFn()
for i := 0; i < len(rules); i++ { for i := 0; i < len(rules); i++ {
rule := rules[i] rule := rules[i]
rule.Offset = int64(i) jobs[rule.Id] = &m.AlertJob{
jobs[rule.Rule.Id] = &rule Rule: rule,
Offset: int64(i),
Running: false,
}
} }
log.Debug("Scheduler: Selected %d jobs", len(jobs)) log.Debug("Scheduler: Selected %d jobs", len(jobs))
this.jobs = jobs scheduler.jobs = jobs
} }
func (this *Scheduler) queueJobs() { func (scheduler *Scheduler) queueJobs() {
now := time.Now().Unix() now := time.Now().Unix()
for _, job := range this.jobs { for _, job := range scheduler.jobs {
if now%job.Rule.Frequency == 0 && job.Running == false { if now%job.Rule.Frequency == 0 && job.Running == false {
log.Info("Scheduler: Putting job on to run queue: %s", job.Rule.Title) log.Info("Scheduler: Putting job on to run queue: %s", job.Rule.Title)
this.runQueue <- job scheduler.runQueue <- job
} }
} }
} }
func (this *Scheduler) Executor(executor Executor) { func (scheduler *Scheduler) Executor(executor Executor) {
for job := range this.runQueue { for job := range scheduler.runQueue {
//log.Info("Executor: queue length %d", len(this.runQueue)) //log.Info("Executor: queue length %d", len(this.runQueue))
log.Info("Executor: executing %s", job.Rule.Title) log.Info("Executor: executing %s", job.Rule.Title)
this.jobs[job.Rule.Id].Running = true scheduler.jobs[job.Rule.Id].Running = true
this.MeasureAndExecute(executor, job) scheduler.MeasureAndExecute(executor, job)
} }
} }
func (this *Scheduler) HandleResponses() { func (scheduler *Scheduler) HandleResponses() {
for response := range this.responseQueue { for response := range scheduler.responseQueue {
log.Info("Response: alert(%d) status(%s) actual(%v)", response.Id, response.State, response.ActualValue) log.Info("Response: alert(%d) status(%s) actual(%v)", response.Id, response.State, response.ActualValue)
if this.jobs[response.Id] != nil { if scheduler.jobs[response.Id] != nil {
this.jobs[response.Id].Running = false scheduler.jobs[response.Id].Running = false
} }
cmd := m.UpdateAlertStateCommand{ cmd := m.UpdateAlertStateCommand{
@@ -111,7 +114,7 @@ func (this *Scheduler) HandleResponses() {
} }
} }
func (this *Scheduler) MeasureAndExecute(exec Executor, job *m.AlertJob) { func (scheduler *Scheduler) MeasureAndExecute(exec Executor, job *m.AlertJob) {
now := time.Now() now := time.Now()
responseChan := make(chan *m.AlertResult, 1) responseChan := make(chan *m.AlertResult, 1)
@@ -119,7 +122,7 @@ func (this *Scheduler) MeasureAndExecute(exec Executor, job *m.AlertJob) {
select { select {
case <-time.After(time.Second * 5): case <-time.After(time.Second * 5):
this.responseQueue <- &m.AlertResult{ scheduler.responseQueue <- &m.AlertResult{
Id: job.Rule.Id, Id: job.Rule.Id,
State: "timed out", State: "timed out",
Duration: float64(time.Since(now).Nanoseconds()) / float64(1000000), Duration: float64(time.Since(now).Nanoseconds()) / float64(1000000),
@@ -128,6 +131,6 @@ func (this *Scheduler) MeasureAndExecute(exec Executor, job *m.AlertJob) {
case result := <-responseChan: case result := <-responseChan:
result.Duration = float64(time.Since(now).Nanoseconds()) / float64(1000000) result.Duration = float64(time.Since(now).Nanoseconds()) / float64(1000000)
log.Info("Schedular: exeuction took %vms", result.Duration) log.Info("Schedular: exeuction took %vms", result.Duration)
this.responseQueue <- result scheduler.responseQueue <- result
} }
} }

View File

@@ -3,19 +3,31 @@ package graphite
import ( import (
"fmt" "fmt"
"github.com/grafana/grafana/pkg/bus"
m "github.com/grafana/grafana/pkg/models" m "github.com/grafana/grafana/pkg/models"
) )
// AlertDatasource is bacon // AlertDatasource is bacon
type AlertDatasource interface { type AlertDatasource interface {
GetSeries(job *m.AlertJob) (m.TimeSeriesSlice, error) GetSeries(job *m.AlertJob, datasource m.DataSource) (m.TimeSeriesSlice, error)
} }
// GetSeries returns timeseries data from the datasource // GetSeries returns timeseries data from the datasource
func GetSeries(job *m.AlertJob) (m.TimeSeriesSlice, error) { func GetSeries(job *m.AlertJob) (m.TimeSeriesSlice, error) {
if job.Datasource.Type == m.DS_GRAPHITE { query := &m.GetDataSourceByIdQuery{
return GraphiteClient{}.GetSeries(job) Id: job.Rule.DatasourceId,
OrgId: job.Rule.OrgId,
} }
return nil, fmt.Errorf("Grafana does not support alerts for %s", job.Datasource.Type) err := bus.Dispatch(query)
if err != nil {
return nil, fmt.Errorf("Could not find datasource for %d", job.Rule.DatasourceId)
}
if query.Result.Type == m.DS_GRAPHITE {
return GraphiteClient{}.GetSeries(job, query.Result)
}
return nil, fmt.Errorf("Grafana does not support alerts for %s", query.Result.Type)
} }

View File

@@ -2,14 +2,15 @@ package graphite
import ( import (
"fmt" "fmt"
"github.com/franela/goreq"
"github.com/grafana/grafana/pkg/cmd/grafana-cli/log"
"github.com/grafana/grafana/pkg/components/simplejson"
m "github.com/grafana/grafana/pkg/models"
"net/http" "net/http"
"net/url" "net/url"
"strconv" "strconv"
"time" "time"
"github.com/franela/goreq"
"github.com/grafana/grafana/pkg/cmd/grafana-cli/log"
"github.com/grafana/grafana/pkg/components/simplejson"
m "github.com/grafana/grafana/pkg/models"
) )
type GraphiteClient struct{} type GraphiteClient struct{}
@@ -21,7 +22,7 @@ type GraphiteSerie struct {
type GraphiteResponse []GraphiteSerie type GraphiteResponse []GraphiteSerie
func (this GraphiteClient) GetSeries(rule *m.AlertJob) (m.TimeSeriesSlice, error) { func (this GraphiteClient) GetSeries(rule *m.AlertJob, datasource m.DataSource) (m.TimeSeriesSlice, error) {
v := url.Values{ v := url.Values{
"format": []string{"json"}, "format": []string{"json"},
"target": []string{getTargetFromRule(rule.Rule)}, "target": []string{getTargetFromRule(rule.Rule)},
@@ -33,7 +34,7 @@ func (this GraphiteClient) GetSeries(rule *m.AlertJob) (m.TimeSeriesSlice, error
res, err := goreq.Request{ res, err := goreq.Request{
Method: "POST", Method: "POST",
Uri: rule.Datasource.Url + "/render", Uri: datasource.Url + "/render",
Body: v.Encode(), Body: v.Encode(),
Timeout: 5 * time.Second, Timeout: 5 * time.Second,
}.Do() }.Do()