diff --git a/pkg/models/alerts.go b/pkg/models/alerts.go index d0252285a11..9106562ec32 100644 --- a/pkg/models/alerts.go +++ b/pkg/models/alerts.go @@ -112,24 +112,3 @@ type GetAlertChangesQuery struct { Result []AlertRuleChange } - -type AlertJob struct { - Offset int64 - Delay bool - Running bool - RetryCount int - Rule AlertRule -} - -type AlertResult struct { - Id int64 - State string - ActualValue float64 - Duration float64 - Description string - AlertJob *AlertJob -} - -func (ar *AlertResult) IsResultIncomplete() bool { - return ar.State == AlertStatePending -} diff --git a/pkg/models/alerts_state.go b/pkg/models/alerts_state.go index 68012e41503..171eb754412 100644 --- a/pkg/models/alerts_state.go +++ b/pkg/models/alerts_state.go @@ -2,6 +2,8 @@ package models import ( "time" + + "github.com/grafana/grafana/pkg/services/alerting/alertstates" ) type AlertState struct { @@ -13,25 +15,8 @@ type AlertState struct { Info string `json:"info"` } -var ( - VALID_STATES = []string{ - AlertStateOk, - AlertStateWarn, - AlertStateCritical, - AlertStateAcknowledged, - AlertStateMaintenance, - } - - AlertStateOk = "OK" - AlertStateWarn = "WARN" - AlertStateCritical = "CRITICAL" - AlertStateAcknowledged = "ACKNOWLEDGED" - AlertStateMaintenance = "MAINTENANCE" - AlertStatePending = "PENDING" -) - func (this *UpdateAlertStateCommand) IsValidState() bool { - for _, v := range VALID_STATES { + for _, v := range alertstates.ValidStates { if this.NewState == v { return true } diff --git a/pkg/models/timeseries.go b/pkg/models/timeseries.go deleted file mode 100644 index fbd4dd1dc0b..00000000000 --- a/pkg/models/timeseries.go +++ /dev/null @@ -1,15 +0,0 @@ -package models - -type TimeSeries struct { - Name string `json:"name"` - Points [][2]float64 `json:"points"` -} - -type TimeSeriesSlice []*TimeSeries - -func NewTimeSeries(name string, points [][2]float64) *TimeSeries { - return &TimeSeries{ - Name: name, - Points: points, - } -} diff --git a/pkg/services/alerting/alert_rule_reader.go b/pkg/services/alerting/alert_rule_reader.go index 797fe3fa796..5e62a70ef91 100644 --- a/pkg/services/alerting/alert_rule_reader.go +++ b/pkg/services/alerting/alert_rule_reader.go @@ -10,7 +10,7 @@ import ( ) type RuleReader interface { - Fetch() []m.AlertRule + Fetch() []AlertRule } type AlertRuleReader struct { @@ -28,15 +28,15 @@ func NewRuleReader() *AlertRuleReader { } var ( - alertJobs []m.AlertRule + alertJobs []AlertRule ) -func (arr *AlertRuleReader) Fetch() []m.AlertRule { +func (arr *AlertRuleReader) Fetch() []AlertRule { return alertJobs } func (arr *AlertRuleReader) initReader() { - alertJobs = make([]m.AlertRule, 0) + alertJobs = make([]AlertRule, 0) heartbeat := time.NewTicker(time.Second * 10) arr.updateRules() @@ -56,7 +56,7 @@ func (arr *AlertRuleReader) updateRules() { err := bus.Dispatch(cmd) if err == nil { - alertJobs = cmd.Result + //alertJobs = cmd.Result } else { log.Error(1, "AlertRuleReader: Could not load alerts") } diff --git a/pkg/services/alerting/alerting.go b/pkg/services/alerting/alerting.go index 714ebd17a94..62fe0b296d6 100644 --- a/pkg/services/alerting/alerting.go +++ b/pkg/services/alerting/alerting.go @@ -1,9 +1,6 @@ package alerting import ( - "fmt" - "time" - "github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/log" m "github.com/grafana/grafana/pkg/models" @@ -27,111 +24,9 @@ func Init() { go scheduler.dispatch(reader) go scheduler.executor(&ExecutorImpl{}) go scheduler.handleResponses() - } -type Scheduler struct { - jobs map[int64]*m.AlertJob - runQueue chan *m.AlertJob - responseQueue chan *m.AlertResult - - alertRuleFetcher RuleReader -} - -func NewScheduler() *Scheduler { - return &Scheduler{ - jobs: make(map[int64]*m.AlertJob, 0), - runQueue: make(chan *m.AlertJob, 1000), - responseQueue: make(chan *m.AlertResult, 1000), - } -} - -func (scheduler *Scheduler) dispatch(reader RuleReader) { - reschedule := time.NewTicker(time.Second * 10) - secondTicker := time.NewTicker(time.Second) - - scheduler.updateJobs(reader.Fetch) - - for { - select { - case <-secondTicker.C: - scheduler.queueJobs() - case <-reschedule.C: - scheduler.updateJobs(reader.Fetch) - } - } -} - -func (scheduler *Scheduler) updateJobs(alertRuleFn func() []m.AlertRule) { - log.Debug("Scheduler: UpdateJobs()") - - jobs := make(map[int64]*m.AlertJob, 0) - rules := alertRuleFn() - - for i, rule := range rules { - var job *m.AlertJob - if scheduler.jobs[rule.Id] != nil { - job = scheduler.jobs[rule.Id] - } else { - job = &m.AlertJob{ - Running: false, - RetryCount: 0, - } - } - - job.Rule = rule - job.Offset = int64(i) - - jobs[rule.Id] = job - } - - log.Debug("Scheduler: Selected %d jobs", len(jobs)) - scheduler.jobs = jobs -} - -func (scheduler *Scheduler) queueJobs() { - now := time.Now().Unix() - for _, job := range scheduler.jobs { - if now%job.Rule.Frequency == 0 && job.Running == false { - log.Info("Scheduler: Putting job on to run queue: %s", job.Rule.Title) - scheduler.runQueue <- job - } - } -} - -func (scheduler *Scheduler) executor(executor Executor) { - for job := range scheduler.runQueue { - //log.Info("Executor: queue length %d", len(this.runQueue)) - log.Info("Executor: executing %s", job.Rule.Title) - job.Running = true - scheduler.measureAndExecute(executor, job) - } -} - -func (scheduler *Scheduler) handleResponses() { - for response := range scheduler.responseQueue { - log.Info("Response: alert(%d) status(%s) actual(%v) retry(%d)", response.Id, response.State, response.ActualValue, response.AlertJob.RetryCount) - response.AlertJob.Running = false - - if response.IsResultIncomplete() { - response.AlertJob.RetryCount++ - if response.AlertJob.RetryCount < maxRetries { - scheduler.runQueue <- response.AlertJob - } else { - saveState(&m.AlertResult{ - Id: response.Id, - State: m.AlertStateCritical, - Description: fmt.Sprintf("Failed to run check after %d retires", maxRetries), - }) - } - } else { - response.AlertJob.RetryCount = 0 - saveState(response) - } - } -} - -func saveState(response *m.AlertResult) { +func saveState(response *AlertResult) { cmd := &m.UpdateAlertStateCommand{ AlertId: response.Id, NewState: response.State, @@ -142,24 +37,3 @@ func saveState(response *m.AlertResult) { log.Error(2, "failed to save state %v", err) } } - -func (scheduler *Scheduler) measureAndExecute(exec Executor, job *m.AlertJob) { - now := time.Now() - - responseChan := make(chan *m.AlertResult, 1) - go exec.Execute(job, responseChan) - - select { - case <-time.After(time.Second * 5): - scheduler.responseQueue <- &m.AlertResult{ - Id: job.Rule.Id, - State: m.AlertStatePending, - Duration: float64(time.Since(now).Nanoseconds()) / float64(1000000), - AlertJob: job, - } - case result := <-responseChan: - result.Duration = float64(time.Since(now).Nanoseconds()) / float64(1000000) - log.Info("Schedular: exeuction took %vms", result.Duration) - scheduler.responseQueue <- result - } -} diff --git a/pkg/services/alerting/alertstates/states.go b/pkg/services/alerting/alertstates/states.go new file mode 100644 index 00000000000..9989c223e16 --- /dev/null +++ b/pkg/services/alerting/alertstates/states.go @@ -0,0 +1,18 @@ +package alertstates + +var ( + ValidStates = []string{ + Ok, + Warn, + Critical, + Acknowledged, + Maintenance, + } + + Ok = "OK" + Warn = "WARN" + Critical = "CRITICAL" + Acknowledged = "ACKNOWLEDGED" + Maintenance = "MAINTENANCE" + Pending = "PENDING" +) diff --git a/pkg/services/alerting/datasources/backends.go b/pkg/services/alerting/datasources/backends.go index 5b570ab61b5..95ca132d85a 100644 --- a/pkg/services/alerting/datasources/backends.go +++ b/pkg/services/alerting/datasources/backends.go @@ -1,33 +1,3 @@ -package graphite - -import ( - "fmt" - - "github.com/grafana/grafana/pkg/bus" - m "github.com/grafana/grafana/pkg/models" -) - -// AlertDatasource is bacon -type AlertDatasource interface { - GetSeries(job *m.AlertJob, datasource m.DataSource) (m.TimeSeriesSlice, error) -} +package datasources // GetSeries returns timeseries data from the datasource -func GetSeries(job *m.AlertJob) (m.TimeSeriesSlice, error) { - query := &m.GetDataSourceByIdQuery{ - Id: job.Rule.DatasourceId, - OrgId: job.Rule.OrgId, - } - - err := bus.Dispatch(query) - - if err != nil { - return nil, fmt.Errorf("Could not find datasource for %d", job.Rule.DatasourceId) - } - - if query.Result.Type == m.DS_GRAPHITE { - return GraphiteClient{}.GetSeries(*job, query.Result) - } - - return nil, fmt.Errorf("Grafana does not support alerts for %s", query.Result.Type) -} diff --git a/pkg/services/alerting/datasources/graphite.go b/pkg/services/alerting/datasources/graphite.go index c6634aea8b2..73309ca3b66 100644 --- a/pkg/services/alerting/datasources/graphite.go +++ b/pkg/services/alerting/datasources/graphite.go @@ -1,80 +1,80 @@ -package graphite +package datasources -import ( - "bytes" - "encoding/json" - "fmt" - "io/ioutil" - "net/http" - "net/url" - "strconv" - "time" - - "github.com/grafana/grafana/pkg/components/simplejson" - "github.com/grafana/grafana/pkg/log" - m "github.com/grafana/grafana/pkg/models" - "github.com/grafana/grafana/pkg/util" -) - -type GraphiteClient struct{} - -type GraphiteSerie struct { - Datapoints [][2]float64 - Target string -} - -var DefaultClient = &http.Client{ - Timeout: time.Minute, -} - -type GraphiteResponse []GraphiteSerie - -func (client GraphiteClient) GetSeries(rule m.AlertJob, datasource m.DataSource) (m.TimeSeriesSlice, error) { - v := url.Values{ - "format": []string{"json"}, - "target": []string{getTargetFromRule(rule.Rule)}, - "until": []string{"now"}, - "from": []string{"-" + strconv.Itoa(rule.Rule.QueryRange) + "s"}, - } - - log.Trace("Graphite: sending request with querystring: ", v.Encode()) - - req, err := http.NewRequest("POST", datasource.Url+"/render", nil) - - if err != nil { - return nil, fmt.Errorf("Could not create request") - } - - req.Body = ioutil.NopCloser(bytes.NewReader([]byte(v.Encode()))) - - if datasource.BasicAuth { - req.Header.Add("Authorization", util.GetBasicAuthHeader(datasource.User, datasource.Password)) - } - - res, err := DefaultClient.Do(req) - - if err != nil { - return nil, err - } - - if res.StatusCode != http.StatusOK { - return nil, fmt.Errorf("expected httpstatus 200, found %d", res.StatusCode) - } - - response := GraphiteResponse{} - - json.NewDecoder(res.Body).Decode(&response) - - var timeSeries []*m.TimeSeries - for _, v := range response { - timeSeries = append(timeSeries, m.NewTimeSeries(v.Target, v.Datapoints)) - } - - return timeSeries, nil -} - -func getTargetFromRule(rule m.AlertRule) string { - json, _ := simplejson.NewJson([]byte(rule.Query)) - - return json.Get("target").MustString() -} +// import ( +// "bytes" +// "encoding/json" +// "fmt" +// "io/ioutil" +// "net/http" +// "net/url" +// "strconv" +// "time" +// +// "github.com/grafana/grafana/pkg/components/simplejson" +// "github.com/grafana/grafana/pkg/log" +// m "github.com/grafana/grafana/pkg/models" +// "github.com/grafana/grafana/pkg/util" +// ) +// +// type GraphiteClient struct{} +// +// type GraphiteSerie struct { +// Datapoints [][2]float64 +// Target string +// } +// +// var DefaultClient = &http.Client{ +// Timeout: time.Minute, +// } +// +// type GraphiteResponse []GraphiteSerie +// +// func (client GraphiteClient) GetSeries(rule m.AlertJob, datasource m.DataSource) (m.TimeSeriesSlice, error) { +// v := url.Values{ +// "format": []string{"json"}, +// "target": []string{getTargetFromRule(rule.Rule)}, +// "until": []string{"now"}, +// "from": []string{"-" + strconv.Itoa(rule.Rule.QueryRange) + "s"}, +// } +// +// log.Trace("Graphite: sending request with querystring: ", v.Encode()) +// +// req, err := http.NewRequest("POST", datasource.Url+"/render", nil) +// +// if err != nil { +// return nil, fmt.Errorf("Could not create request") +// } +// +// req.Body = ioutil.NopCloser(bytes.NewReader([]byte(v.Encode()))) +// +// if datasource.BasicAuth { +// req.Header.Add("Authorization", util.GetBasicAuthHeader(datasource.User, datasource.Password)) +// } +// +// res, err := DefaultClient.Do(req) +// +// if err != nil { +// return nil, err +// } +// +// if res.StatusCode != http.StatusOK { +// return nil, fmt.Errorf("expected httpstatus 200, found %d", res.StatusCode) +// } +// +// response := GraphiteResponse{} +// +// json.NewDecoder(res.Body).Decode(&response) +// +// var timeSeries []*m.TimeSeries +// for _, v := range response { +// timeSeries = append(timeSeries, m.NewTimeSeries(v.Target, v.Datapoints)) +// } +// +// return timeSeries, nil +// } +// +// func getTargetFromRule(rule m.AlertRule) string { +// json, _ := simplejson.NewJson([]byte(rule.Query)) +// +// return json.Get("target").MustString() +// } diff --git a/pkg/services/alerting/engine.go b/pkg/services/alerting/engine.go new file mode 100644 index 00000000000..d806a5d69ca --- /dev/null +++ b/pkg/services/alerting/engine.go @@ -0,0 +1 @@ +package alerting diff --git a/pkg/services/alerting/executor.go b/pkg/services/alerting/executor.go index 90b974e84d4..ca7e49c9072 100644 --- a/pkg/services/alerting/executor.go +++ b/pkg/services/alerting/executor.go @@ -5,13 +5,15 @@ import ( "math" + "github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/log" m "github.com/grafana/grafana/pkg/models" - b "github.com/grafana/grafana/pkg/services/alerting/datasources" + "github.com/grafana/grafana/pkg/services/alerting/alertstates" + "github.com/grafana/grafana/pkg/tsdb" ) type Executor interface { - Execute(rule *m.AlertJob, responseQueue chan *m.AlertResult) + Execute(rule *AlertJob, responseQueue chan *AlertResult) } var ( @@ -22,7 +24,7 @@ var ( type ExecutorImpl struct{} type compareFn func(float64, float64) bool -type aggregationFn func(*m.TimeSeries) float64 +type aggregationFn func(*tsdb.TimeSeries) float64 var operators = map[string]compareFn{ ">": func(num1, num2 float64) bool { return num1 > num2 }, @@ -32,7 +34,7 @@ var operators = map[string]compareFn{ "": func(num1, num2 float64) bool { return false }, } var aggregator = map[string]aggregationFn{ - "avg": func(series *m.TimeSeries) float64 { + "avg": func(series *tsdb.TimeSeries) float64 { sum := float64(0) for _, v := range series.Points { @@ -41,7 +43,7 @@ var aggregator = map[string]aggregationFn{ return sum / float64(len(series.Points)) }, - "sum": func(series *m.TimeSeries) float64 { + "sum": func(series *tsdb.TimeSeries) float64 { sum := float64(0) for _, v := range series.Points { @@ -50,7 +52,7 @@ var aggregator = map[string]aggregationFn{ return sum }, - "min": func(series *m.TimeSeries) float64 { + "min": func(series *tsdb.TimeSeries) float64 { min := series.Points[0][0] for _, v := range series.Points { @@ -61,7 +63,7 @@ var aggregator = map[string]aggregationFn{ return min }, - "max": func(series *m.TimeSeries) float64 { + "max": func(series *tsdb.TimeSeries) float64 { max := series.Points[0][0] for _, v := range series.Points { @@ -72,17 +74,17 @@ var aggregator = map[string]aggregationFn{ return max }, - "mean": func(series *m.TimeSeries) float64 { + "mean": func(series *tsdb.TimeSeries) float64 { midPosition := int64(math.Floor(float64(len(series.Points)) / float64(2))) return series.Points[midPosition][0] }, } -func (executor *ExecutorImpl) Execute(job *m.AlertJob, responseQueue chan *m.AlertResult) { - response, err := b.GetSeries(job) +func (executor *ExecutorImpl) Execute(job *AlertJob, responseQueue chan *AlertResult) { + response, err := executor.GetSeries(job) if err != nil { - responseQueue <- &m.AlertResult{State: m.AlertStatePending, Id: job.Rule.Id, AlertJob: job} + responseQueue <- &AlertResult{State: alertstates.Pending, Id: job.Rule.Id, AlertJob: job} } result := executor.validateRule(job.Rule, response) @@ -90,7 +92,26 @@ func (executor *ExecutorImpl) Execute(job *m.AlertJob, responseQueue chan *m.Ale responseQueue <- result } -func (executor *ExecutorImpl) validateRule(rule m.AlertRule, series m.TimeSeriesSlice) *m.AlertResult { +func (executor *ExecutorImpl) GetSeries(job *AlertJob) (tsdb.TimeSeriesSlice, error) { + query := &m.GetDataSourceByIdQuery{ + Id: job.Rule.DatasourceId, + OrgId: job.Rule.OrgId, + } + + err := bus.Dispatch(query) + + if err != nil { + return nil, fmt.Errorf("Could not find datasource for %d", job.Rule.DatasourceId) + } + + // if query.Result.Type == m.DS_GRAPHITE { + // return GraphiteClient{}.GetSeries(*job, query.Result) + // } + + return nil, fmt.Errorf("Grafana does not support alerts for %s", query.Result.Type) +} + +func (executor *ExecutorImpl) validateRule(rule AlertRule, series tsdb.TimeSeriesSlice) *AlertResult { for _, serie := range series { if aggregator[rule.Aggregator] == nil { continue @@ -102,8 +123,8 @@ func (executor *ExecutorImpl) validateRule(rule m.AlertRule, series m.TimeSeries log.Trace(resultLogFmt, "Crit", serie.Name, aggValue, rule.CritOperator, rule.CritLevel, critResult) if critResult { - return &m.AlertResult{ - State: m.AlertStateCritical, + return &AlertResult{ + State: alertstates.Critical, Id: rule.Id, ActualValue: aggValue, Description: fmt.Sprintf(descriptionFmt, aggValue, serie.Name), @@ -114,8 +135,8 @@ func (executor *ExecutorImpl) validateRule(rule m.AlertRule, series m.TimeSeries var warnResult = warnOperartor(aggValue, rule.CritLevel) log.Trace(resultLogFmt, "Warn", serie.Name, aggValue, rule.WarnOperator, rule.WarnLevel, warnResult) if warnResult { - return &m.AlertResult{ - State: m.AlertStateWarn, + return &AlertResult{ + State: alertstates.Warn, Id: rule.Id, Description: fmt.Sprintf(descriptionFmt, aggValue, serie.Name), ActualValue: aggValue, @@ -123,5 +144,5 @@ func (executor *ExecutorImpl) validateRule(rule m.AlertRule, series m.TimeSeries } } - return &m.AlertResult{State: m.AlertStateOk, Id: rule.Id, Description: "Alert is OK!"} + return &AlertResult{State: alertstates.Ok, Id: rule.Id, Description: "Alert is OK!"} } diff --git a/pkg/services/alerting/factory/factory.go b/pkg/services/alerting/factory/factory.go new file mode 100644 index 00000000000..e69de29bb2d diff --git a/pkg/services/alerting/models.go b/pkg/services/alerting/models.go new file mode 100644 index 00000000000..3c69371001f --- /dev/null +++ b/pkg/services/alerting/models.go @@ -0,0 +1,43 @@ +package alerting + +import "github.com/grafana/grafana/pkg/services/alerting/alertstates" + +type AlertJob struct { + Offset int64 + Delay bool + Running bool + RetryCount int + Rule AlertRule +} + +type AlertResult struct { + Id int64 + State string + ActualValue float64 + Duration float64 + Description string + AlertJob *AlertJob +} + +func (ar *AlertResult) IsResultIncomplete() bool { + return ar.State == alertstates.Pending +} + +type AlertRule struct { + Id int64 + OrgId int64 + DatasourceId int64 + DashboardId int64 + PanelId int64 + Query string + QueryRefId string + WarnLevel float64 + CritLevel float64 + WarnOperator string + CritOperator string + Frequency int64 + Title string + Description string + QueryRange int + Aggregator string +} diff --git a/pkg/services/alerting/scheduler.go b/pkg/services/alerting/scheduler.go new file mode 100644 index 00000000000..4172e0ca756 --- /dev/null +++ b/pkg/services/alerting/scheduler.go @@ -0,0 +1,129 @@ +package alerting + +import ( + "fmt" + "time" + + "github.com/Unknwon/log" + "github.com/grafana/grafana/pkg/services/alerting/alertstates" +) + +type Scheduler struct { + jobs map[int64]*AlertJob + runQueue chan *AlertJob + responseQueue chan *AlertResult +} + +func NewScheduler() *Scheduler { + return &Scheduler{ + jobs: make(map[int64]*AlertJob, 0), + runQueue: make(chan *AlertJob, 1000), + responseQueue: make(chan *AlertResult, 1000), + } +} + +func (scheduler *Scheduler) dispatch(reader RuleReader) { + reschedule := time.NewTicker(time.Second * 10) + secondTicker := time.NewTicker(time.Second) + + scheduler.updateJobs(reader.Fetch) + + for { + select { + case <-secondTicker.C: + scheduler.queueJobs() + case <-reschedule.C: + scheduler.updateJobs(reader.Fetch) + } + } +} + +func (scheduler *Scheduler) updateJobs(alertRuleFn func() []AlertRule) { + log.Debug("Scheduler: UpdateJobs()") + + jobs := make(map[int64]*AlertJob, 0) + rules := alertRuleFn() + + for i, rule := range rules { + var job *AlertJob + if scheduler.jobs[rule.Id] != nil { + job = scheduler.jobs[rule.Id] + } else { + job = &AlertJob{ + Running: false, + RetryCount: 0, + } + } + + job.Rule = rule + job.Offset = int64(i) + + jobs[rule.Id] = job + } + + log.Debug("Scheduler: Selected %d jobs", len(jobs)) + scheduler.jobs = jobs +} + +func (scheduler *Scheduler) queueJobs() { + now := time.Now().Unix() + for _, job := range scheduler.jobs { + if now%job.Rule.Frequency == 0 && job.Running == false { + log.Info("Scheduler: Putting job on to run queue: %s", job.Rule.Title) + scheduler.runQueue <- job + } + } +} + +func (scheduler *Scheduler) executor(executor Executor) { + for job := range scheduler.runQueue { + //log.Info("Executor: queue length %d", len(this.runQueue)) + log.Info("Executor: executing %s", job.Rule.Title) + job.Running = true + scheduler.measureAndExecute(executor, job) + } +} + +func (scheduler *Scheduler) handleResponses() { + for response := range scheduler.responseQueue { + log.Info("Response: alert(%d) status(%s) actual(%v) retry(%d)", response.Id, response.State, response.ActualValue, response.AlertJob.RetryCount) + response.AlertJob.Running = false + + if response.IsResultIncomplete() { + response.AlertJob.RetryCount++ + if response.AlertJob.RetryCount < maxRetries { + scheduler.runQueue <- response.AlertJob + } else { + saveState(&AlertResult{ + Id: response.Id, + State: alertstates.Critical, + Description: fmt.Sprintf("Failed to run check after %d retires", maxRetries), + }) + } + } else { + response.AlertJob.RetryCount = 0 + saveState(response) + } + } +} + +func (scheduler *Scheduler) measureAndExecute(exec Executor, job *AlertJob) { + now := time.Now() + + responseChan := make(chan *AlertResult, 1) + go exec.Execute(job, responseChan) + + select { + case <-time.After(time.Second * 5): + scheduler.responseQueue <- &AlertResult{ + Id: job.Rule.Id, + State: alertstates.Pending, + Duration: float64(time.Since(now).Nanoseconds()) / float64(1000000), + AlertJob: job, + } + case result := <-responseChan: + result.Duration = float64(time.Since(now).Nanoseconds()) / float64(1000000) + log.Info("Schedular: exeuction took %vms", result.Duration) + scheduler.responseQueue <- result + } +} diff --git a/pkg/tsdb/batch.go b/pkg/tsdb/batch.go new file mode 100644 index 00000000000..92aa1afd2f8 --- /dev/null +++ b/pkg/tsdb/batch.go @@ -0,0 +1,90 @@ +package tsdb + +import "errors" + +type Batch struct { + DataSourceId int64 + Queries QuerySlice + Depends map[string]bool + Done bool + Started bool +} + +type BatchSlice []*Batch + +func newBatch(dsId int64, queries QuerySlice) *Batch { + return &Batch{ + DataSourceId: dsId, + Queries: queries, + Depends: make(map[string]bool), + } +} + +func (bg *Batch) process(context *QueryContext) { + executor := getExecutorFor(bg.Queries[0].DataSource) + + if executor == nil { + bg.Done = true + result := &BatchResult{ + Error: errors.New("Could not find executor for data source type " + bg.Queries[0].DataSource.Type), + QueryResults: make(map[string]*QueryResult), + } + for _, query := range bg.Queries { + result.QueryResults[query.RefId] = &QueryResult{Error: result.Error} + } + context.ResultsChan <- result + return + } + + res := executor.Execute(bg.Queries, context) + bg.Done = true + context.ResultsChan <- res +} + +func (bg *Batch) addQuery(query *Query) { + bg.Queries = append(bg.Queries, query) +} + +func (bg *Batch) allDependenciesAreIn(context *QueryContext) bool { + for key := range bg.Depends { + if _, exists := context.Results[key]; !exists { + return false + } + } + + return true +} + +func getBatches(req *Request) (BatchSlice, error) { + batches := make(BatchSlice, 0) + + for _, query := range req.Queries { + if foundBatch := findMatchingBatchGroup(query, batches); foundBatch != nil { + foundBatch.addQuery(query) + } else { + newBatch := newBatch(query.DataSource.Id, QuerySlice{query}) + batches = append(batches, newBatch) + + for _, refId := range query.Depends { + for _, batch := range batches { + for _, batchQuery := range batch.Queries { + if batchQuery.RefId == refId { + newBatch.Depends[refId] = true + } + } + } + } + } + } + + return batches, nil +} + +func findMatchingBatchGroup(query *Query, batches BatchSlice) *Batch { + for _, batch := range batches { + if batch.DataSourceId == query.DataSource.Id { + return batch + } + } + return nil +} diff --git a/pkg/tsdb/executor.go b/pkg/tsdb/executor.go new file mode 100644 index 00000000000..7317fde23f2 --- /dev/null +++ b/pkg/tsdb/executor.go @@ -0,0 +1,24 @@ +package tsdb + +type Executor interface { + Execute(queries QuerySlice, context *QueryContext) *BatchResult +} + +var registry map[string]GetExecutorFn + +type GetExecutorFn func(dsInfo *DataSourceInfo) Executor + +func init() { + registry = make(map[string]GetExecutorFn) +} + +func getExecutorFor(dsInfo *DataSourceInfo) Executor { + if fn, exists := registry[dsInfo.Type]; exists { + return fn(dsInfo) + } + return nil +} + +func RegisterExecutor(dsType string, fn GetExecutorFn) { + registry[dsType] = fn +} diff --git a/pkg/tsdb/models.go b/pkg/tsdb/models.go new file mode 100644 index 00000000000..e47d49ce6cf --- /dev/null +++ b/pkg/tsdb/models.go @@ -0,0 +1,62 @@ +package tsdb + +import "time" + +type TimeRange struct { + From time.Time + To time.Time +} + +type Request struct { + TimeRange TimeRange + MaxDataPoints int + Queries QuerySlice +} + +type Response struct { + BatchTimings []*BatchTiming + Results map[string]*QueryResult +} + +type DataSourceInfo struct { + Id int64 + Name string + Type string + Url string + Password string + User string + Database string + BasicAuth bool + BasicAuthUser string + BasicAuthPassword string +} + +type BatchTiming struct { + TimeElapsed int64 +} + +type BatchResult struct { + Error error + QueryResults map[string]*QueryResult + Timings *BatchTiming +} + +type QueryResult struct { + Error error + RefId string + Series TimeSeriesSlice +} + +type TimeSeries struct { + Name string + Points [][2]float64 +} + +type TimeSeriesSlice []*TimeSeries + +func NewTimeSeries(name string, points [][2]float64) *TimeSeries { + return &TimeSeries{ + Name: name, + Points: points, + } +} diff --git a/pkg/tsdb/query.go b/pkg/tsdb/query.go new file mode 100644 index 00000000000..bcead660450 --- /dev/null +++ b/pkg/tsdb/query.go @@ -0,0 +1,12 @@ +package tsdb + +type Query struct { + RefId string + Query string + Depends []string + DataSource *DataSourceInfo + Results []*TimeSeries + Exclude bool +} + +type QuerySlice []*Query diff --git a/pkg/tsdb/query_context.go b/pkg/tsdb/query_context.go new file mode 100644 index 00000000000..a1fc4c9bcb5 --- /dev/null +++ b/pkg/tsdb/query_context.go @@ -0,0 +1,21 @@ +package tsdb + +import "sync" + +type QueryContext struct { + TimeRange TimeRange + Queries QuerySlice + Results map[string]*QueryResult + ResultsChan chan *BatchResult + Lock sync.RWMutex + BatchWaits sync.WaitGroup +} + +func NewQueryContext(queries QuerySlice, timeRange TimeRange) *QueryContext { + return &QueryContext{ + TimeRange: timeRange, + Queries: queries, + ResultsChan: make(chan *BatchResult), + Results: make(map[string]*QueryResult), + } +} diff --git a/pkg/tsdb/request.go b/pkg/tsdb/request.go new file mode 100644 index 00000000000..3e7654bb958 --- /dev/null +++ b/pkg/tsdb/request.go @@ -0,0 +1,51 @@ +package tsdb + +func HandleRequest(req *Request) (*Response, error) { + context := NewQueryContext(req.Queries, req.TimeRange) + + batches, err := getBatches(req) + if err != nil { + return nil, err + } + + currentlyExecuting := 0 + + for _, batch := range batches { + if len(batch.Depends) == 0 { + currentlyExecuting += 1 + batch.Started = true + go batch.process(context) + } + } + + response := &Response{} + + for currentlyExecuting != 0 { + select { + case batchResult := <-context.ResultsChan: + currentlyExecuting -= 1 + + response.BatchTimings = append(response.BatchTimings, batchResult.Timings) + + for refId, result := range batchResult.QueryResults { + context.Results[refId] = result + } + + for _, batch := range batches { + // not interested in started batches + if batch.Started { + continue + } + + if batch.allDependenciesAreIn(context) { + currentlyExecuting += 1 + batch.Started = true + go batch.process(context) + } + } + } + } + + response.Results = context.Results + return response, nil +}