2016-06-06 03:31:21 -05:00
|
|
|
package alerting
|
2016-06-06 04:56:58 -05:00
|
|
|
|
|
|
|
import (
|
2016-09-27 09:38:19 -05:00
|
|
|
"context"
|
2020-07-31 02:45:20 -05:00
|
|
|
"errors"
|
2017-09-11 13:57:08 -05:00
|
|
|
"fmt"
|
2016-06-06 04:56:58 -05:00
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/benbjohnson/clock"
|
2022-04-22 14:09:47 -05:00
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
2022-01-20 04:10:12 -06:00
|
|
|
"go.opentelemetry.io/otel/attribute"
|
2021-11-24 13:56:07 -06:00
|
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
|
2022-08-03 10:17:26 -05:00
|
|
|
"github.com/grafana/grafana/pkg/infra/localcache"
|
2019-05-13 01:45:54 -05:00
|
|
|
"github.com/grafana/grafana/pkg/infra/log"
|
2022-01-20 04:10:12 -06:00
|
|
|
"github.com/grafana/grafana/pkg/infra/tracing"
|
2021-09-22 20:12:12 -05:00
|
|
|
"github.com/grafana/grafana/pkg/infra/usagestats"
|
2022-09-19 02:54:37 -05:00
|
|
|
"github.com/grafana/grafana/pkg/services/annotations"
|
2022-05-19 09:13:02 -05:00
|
|
|
"github.com/grafana/grafana/pkg/services/dashboards"
|
2022-06-27 11:23:15 -05:00
|
|
|
"github.com/grafana/grafana/pkg/services/datasources"
|
2021-10-07 09:33:50 -05:00
|
|
|
"github.com/grafana/grafana/pkg/services/encryption"
|
2022-02-03 06:26:05 -06:00
|
|
|
"github.com/grafana/grafana/pkg/services/notifications"
|
2018-05-24 08:26:27 -05:00
|
|
|
"github.com/grafana/grafana/pkg/services/rendering"
|
2023-01-23 14:10:14 -06:00
|
|
|
"github.com/grafana/grafana/pkg/services/validations"
|
2018-04-27 06:41:58 -05:00
|
|
|
"github.com/grafana/grafana/pkg/setting"
|
2021-11-10 04:52:16 -06:00
|
|
|
"github.com/grafana/grafana/pkg/tsdb/legacydata"
|
2022-09-26 12:35:33 -05:00
|
|
|
"github.com/grafana/grafana/pkg/util/ticker"
|
2016-06-06 04:56:58 -05:00
|
|
|
)
|
|
|
|
|
2019-06-03 03:25:58 -05:00
|
|
|
// AlertEngine is the background process that
|
2019-05-20 05:13:32 -05:00
|
|
|
// schedules alert evaluations and makes sure notifications
|
|
|
|
// are sent.
|
2019-06-03 03:25:58 -05:00
|
|
|
type AlertEngine struct {
|
2021-08-25 08:11:22 -05:00
|
|
|
RenderService rendering.Service
|
2023-01-23 14:10:14 -06:00
|
|
|
RequestValidator validations.PluginRequestValidator
|
2021-11-10 04:52:16 -06:00
|
|
|
DataService legacydata.RequestHandler
|
2021-08-25 08:11:22 -05:00
|
|
|
Cfg *setting.Cfg
|
2018-05-24 08:26:27 -05:00
|
|
|
|
2022-02-28 02:54:56 -06:00
|
|
|
execQueue chan *Job
|
2022-09-26 12:35:33 -05:00
|
|
|
ticker *ticker.T
|
2022-02-28 02:54:56 -06:00
|
|
|
scheduler scheduler
|
|
|
|
evalHandler evalHandler
|
|
|
|
ruleReader ruleReader
|
|
|
|
log log.Logger
|
|
|
|
resultHandler resultHandler
|
|
|
|
usageStatsService usagestats.Service
|
|
|
|
tracer tracing.Tracer
|
2022-08-03 10:17:26 -05:00
|
|
|
AlertStore AlertStore
|
2022-02-28 02:54:56 -06:00
|
|
|
dashAlertExtractor DashAlertExtractor
|
2022-05-19 09:13:02 -05:00
|
|
|
dashboardService dashboards.DashboardService
|
2022-08-03 10:17:26 -05:00
|
|
|
datasourceService datasources.DataSourceService
|
2022-09-19 02:54:37 -05:00
|
|
|
annotationsRepo annotations.Repository
|
2016-06-06 04:56:58 -05:00
|
|
|
}
|
|
|
|
|
2021-11-24 13:56:07 -06:00
|
|
|
// IsDisabled returns true if the alerting service is disabled for this instance.
|
2019-06-03 03:25:58 -05:00
|
|
|
func (e *AlertEngine) IsDisabled() bool {
|
2021-11-24 13:56:07 -06:00
|
|
|
return setting.AlertingEnabled == nil || !*setting.AlertingEnabled || !setting.ExecuteAlerts || e.Cfg.UnifiedAlerting.IsEnabled()
|
2018-04-27 06:41:58 -05:00
|
|
|
}
|
2016-06-06 06:50:47 -05:00
|
|
|
|
2021-08-25 08:11:22 -05:00
|
|
|
// ProvideAlertEngine returns a new AlertEngine.
|
2023-01-23 14:10:14 -06:00
|
|
|
func ProvideAlertEngine(renderer rendering.Service, requestValidator validations.PluginRequestValidator,
|
2021-11-12 05:16:39 -06:00
|
|
|
dataService legacydata.RequestHandler, usageStatsService usagestats.Service, encryptionService encryption.Internal,
|
2022-08-03 10:17:26 -05:00
|
|
|
notificationService *notifications.NotificationService, tracer tracing.Tracer, store AlertStore, cfg *setting.Cfg,
|
2022-09-19 02:54:37 -05:00
|
|
|
dashAlertExtractor DashAlertExtractor, dashboardService dashboards.DashboardService, cacheService *localcache.CacheService, dsService datasources.DataSourceService, annotationsRepo annotations.Repository) *AlertEngine {
|
2021-08-25 08:11:22 -05:00
|
|
|
e := &AlertEngine{
|
2022-02-28 02:54:56 -06:00
|
|
|
Cfg: cfg,
|
|
|
|
RenderService: renderer,
|
|
|
|
RequestValidator: requestValidator,
|
|
|
|
DataService: dataService,
|
|
|
|
usageStatsService: usageStatsService,
|
|
|
|
tracer: tracer,
|
2022-08-03 10:17:26 -05:00
|
|
|
AlertStore: store,
|
2022-02-28 02:54:56 -06:00
|
|
|
dashAlertExtractor: dashAlertExtractor,
|
2022-05-19 09:13:02 -05:00
|
|
|
dashboardService: dashboardService,
|
2022-08-03 10:17:26 -05:00
|
|
|
datasourceService: dsService,
|
2022-09-19 02:54:37 -05:00
|
|
|
annotationsRepo: annotationsRepo,
|
2021-08-25 08:11:22 -05:00
|
|
|
}
|
2018-04-27 06:41:58 -05:00
|
|
|
e.execQueue = make(chan *Job, 1000)
|
2019-05-20 05:13:32 -05:00
|
|
|
e.scheduler = newScheduler()
|
2021-03-08 00:02:49 -06:00
|
|
|
e.evalHandler = NewEvalHandler(e.DataService)
|
2022-08-03 10:17:26 -05:00
|
|
|
e.ruleReader = newRuleReader(store)
|
2018-04-27 06:41:58 -05:00
|
|
|
e.log = log.New("alerting.engine")
|
2022-08-03 10:17:26 -05:00
|
|
|
e.resultHandler = newResultHandler(e.RenderService, store, notificationService, encryptionService.GetDecryptedValue)
|
2021-08-25 08:11:22 -05:00
|
|
|
|
2021-09-22 20:12:12 -05:00
|
|
|
e.registerUsageMetrics()
|
|
|
|
|
2021-08-25 08:11:22 -05:00
|
|
|
return e
|
2018-04-27 06:41:58 -05:00
|
|
|
}
|
2016-09-27 09:38:19 -05:00
|
|
|
|
2019-05-20 05:13:32 -05:00
|
|
|
// Run starts the alerting service background process.
|
2019-06-03 03:25:58 -05:00
|
|
|
func (e *AlertEngine) Run(ctx context.Context) error {
|
2022-04-25 10:19:36 -05:00
|
|
|
reg := prometheus.WrapRegistererWithPrefix("legacy_", prometheus.DefaultRegisterer)
|
2022-09-26 12:35:33 -05:00
|
|
|
e.ticker = ticker.New(clock.New(), 1*time.Second, ticker.NewMetrics(reg, "alerting"))
|
2022-06-01 10:48:10 -05:00
|
|
|
defer e.ticker.Stop()
|
2018-04-27 06:41:58 -05:00
|
|
|
alertGroup, ctx := errgroup.WithContext(ctx)
|
2016-10-03 02:38:03 -05:00
|
|
|
alertGroup.Go(func() error { return e.alertingTicker(ctx) })
|
|
|
|
alertGroup.Go(func() error { return e.runJobDispatcher(ctx) })
|
2016-09-27 09:38:19 -05:00
|
|
|
|
2016-10-03 02:38:03 -05:00
|
|
|
err := alertGroup.Wait()
|
2016-09-28 14:06:00 -05:00
|
|
|
return err
|
2016-06-06 04:56:58 -05:00
|
|
|
}
|
|
|
|
|
2019-06-03 03:25:58 -05:00
|
|
|
func (e *AlertEngine) alertingTicker(grafanaCtx context.Context) error {
|
2016-06-11 06:49:11 -05:00
|
|
|
defer func() {
|
|
|
|
if err := recover(); err != nil {
|
2016-06-22 06:43:11 -05:00
|
|
|
e.log.Error("Scheduler Panic: stopping alertingTicker", "error", err, "stack", log.Stack(1))
|
2016-06-11 06:49:11 -05:00
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2016-06-06 06:50:47 -05:00
|
|
|
tickIndex := 0
|
|
|
|
|
2016-06-06 04:56:58 -05:00
|
|
|
for {
|
|
|
|
select {
|
2016-09-27 09:38:19 -05:00
|
|
|
case <-grafanaCtx.Done():
|
|
|
|
return grafanaCtx.Err()
|
2016-06-06 04:56:58 -05:00
|
|
|
case tick := <-e.ticker.C:
|
2016-06-06 10:11:46 -05:00
|
|
|
// TEMP SOLUTION update rules ever tenth tick
|
2016-06-06 06:50:47 -05:00
|
|
|
if tickIndex%10 == 0 {
|
2021-11-03 08:10:39 -05:00
|
|
|
e.scheduler.Update(e.ruleReader.fetch(grafanaCtx))
|
2016-06-06 06:50:47 -05:00
|
|
|
}
|
|
|
|
|
2016-06-06 04:56:58 -05:00
|
|
|
e.scheduler.Tick(tick, e.execQueue)
|
2016-06-06 07:24:14 -05:00
|
|
|
tickIndex++
|
2016-06-06 04:56:58 -05:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-06-03 03:25:58 -05:00
|
|
|
func (e *AlertEngine) runJobDispatcher(grafanaCtx context.Context) error {
|
2016-10-03 02:38:03 -05:00
|
|
|
dispatcherGroup, alertCtx := errgroup.WithContext(grafanaCtx)
|
|
|
|
|
2016-09-27 09:38:19 -05:00
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-grafanaCtx.Done():
|
2016-10-03 02:38:03 -05:00
|
|
|
return dispatcherGroup.Wait()
|
2016-09-27 09:38:19 -05:00
|
|
|
case job := <-e.execQueue:
|
2018-02-16 10:00:13 -06:00
|
|
|
dispatcherGroup.Go(func() error { return e.processJobWithRetry(alertCtx, job) })
|
2016-09-27 09:38:19 -05:00
|
|
|
}
|
2016-06-06 06:50:47 -05:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-10-03 02:38:03 -05:00
|
|
|
var (
|
2018-04-27 15:14:36 -05:00
|
|
|
unfinishedWorkTimeout = time.Second * 5
|
2016-10-03 02:38:03 -05:00
|
|
|
)
|
|
|
|
|
2019-06-03 03:25:58 -05:00
|
|
|
func (e *AlertEngine) processJobWithRetry(grafanaCtx context.Context, job *Job) error {
|
2018-02-16 10:00:13 -06:00
|
|
|
defer func() {
|
|
|
|
if err := recover(); err != nil {
|
|
|
|
e.log.Error("Alert Panic", "error", err, "stack", log.Stack(1))
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2019-03-29 00:58:37 -05:00
|
|
|
cancelChan := make(chan context.CancelFunc, setting.AlertingMaxAttempts*2)
|
2018-02-16 10:00:13 -06:00
|
|
|
attemptChan := make(chan int, 1)
|
|
|
|
|
|
|
|
// Initialize with first attemptID=1
|
|
|
|
attemptChan <- 1
|
2019-09-03 08:14:28 -05:00
|
|
|
job.SetRunning(true)
|
2018-02-16 10:00:13 -06:00
|
|
|
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-grafanaCtx.Done():
|
|
|
|
// In case grafana server context is cancel, let a chance to job processing
|
|
|
|
// to finish gracefully - by waiting a timeout duration - before forcing its end.
|
|
|
|
unfinishedWorkTimer := time.NewTimer(unfinishedWorkTimeout)
|
|
|
|
select {
|
|
|
|
case <-unfinishedWorkTimer.C:
|
|
|
|
return e.endJob(grafanaCtx.Err(), cancelChan, job)
|
|
|
|
case <-attemptChan:
|
|
|
|
return e.endJob(nil, cancelChan, job)
|
|
|
|
}
|
|
|
|
case attemptID, more := <-attemptChan:
|
|
|
|
if !more {
|
|
|
|
return e.endJob(nil, cancelChan, job)
|
|
|
|
}
|
|
|
|
go e.processJob(attemptID, attemptChan, cancelChan, job)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-06-03 03:25:58 -05:00
|
|
|
func (e *AlertEngine) endJob(err error, cancelChan chan context.CancelFunc, job *Job) error {
|
2019-09-03 08:14:28 -05:00
|
|
|
job.SetRunning(false)
|
2018-02-16 10:00:13 -06:00
|
|
|
close(cancelChan)
|
|
|
|
for cancelFn := range cancelChan {
|
|
|
|
cancelFn()
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2019-06-03 03:25:58 -05:00
|
|
|
func (e *AlertEngine) processJob(attemptID int, attemptChan chan int, cancelChan chan context.CancelFunc, job *Job) {
|
2016-07-21 10:31:46 -05:00
|
|
|
defer func() {
|
|
|
|
if err := recover(); err != nil {
|
2016-10-03 02:38:03 -05:00
|
|
|
e.log.Error("Alert Panic", "error", err, "stack", log.Stack(1))
|
2016-07-21 10:31:46 -05:00
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2019-03-29 00:58:37 -05:00
|
|
|
alertCtx, cancelFn := context.WithTimeout(context.Background(), setting.AlertingEvaluationTimeout)
|
2018-02-16 10:00:13 -06:00
|
|
|
cancelChan <- cancelFn
|
2022-01-20 04:10:12 -06:00
|
|
|
alertCtx, span := e.tracer.Start(alertCtx, "alert execution")
|
2022-09-19 02:54:37 -05:00
|
|
|
evalContext := NewEvalContext(alertCtx, job.Rule, e.RequestValidator, e.AlertStore, e.dashboardService, e.datasourceService, e.annotationsRepo)
|
2017-09-10 07:13:57 -05:00
|
|
|
evalContext.Ctx = alertCtx
|
|
|
|
|
2016-09-27 09:38:19 -05:00
|
|
|
go func() {
|
2016-10-21 07:46:55 -05:00
|
|
|
defer func() {
|
|
|
|
if err := recover(); err != nil {
|
|
|
|
e.log.Error("Alert Panic", "error", err, "stack", log.Stack(1))
|
2022-01-20 04:10:12 -06:00
|
|
|
span.RecordError(fmt.Errorf("%v", err))
|
|
|
|
span.AddEvents(
|
|
|
|
[]string{"error", "message"},
|
|
|
|
[]tracing.EventValue{
|
|
|
|
{Str: fmt.Sprintf("%v", err)},
|
|
|
|
{Str: "failed to execute alert rule. panic was recovered."},
|
|
|
|
})
|
|
|
|
span.End()
|
2018-02-16 10:00:13 -06:00
|
|
|
close(attemptChan)
|
2016-10-21 07:46:55 -05:00
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2016-10-03 02:38:03 -05:00
|
|
|
e.evalHandler.Eval(evalContext)
|
2017-09-11 11:48:20 -05:00
|
|
|
|
2022-01-20 04:10:12 -06:00
|
|
|
span.SetAttributes("alertId", evalContext.Rule.ID, attribute.Key("alertId").Int64(evalContext.Rule.ID))
|
|
|
|
span.SetAttributes("dashboardId", evalContext.Rule.DashboardID, attribute.Key("dashboardId").Int64(evalContext.Rule.DashboardID))
|
|
|
|
span.SetAttributes("firing", evalContext.Firing, attribute.Key("firing").Bool(evalContext.Firing))
|
|
|
|
span.SetAttributes("nodatapoints", evalContext.NoDataFound, attribute.Key("nodatapoints").Bool(evalContext.NoDataFound))
|
|
|
|
span.SetAttributes("attemptID", attemptID, attribute.Key("attemptID").Int(attemptID))
|
2018-02-16 10:00:13 -06:00
|
|
|
|
2017-09-18 07:53:30 -05:00
|
|
|
if evalContext.Error != nil {
|
2022-01-20 04:10:12 -06:00
|
|
|
span.RecordError(evalContext.Error)
|
|
|
|
span.AddEvents(
|
|
|
|
[]string{"error", "message"},
|
|
|
|
[]tracing.EventValue{
|
|
|
|
{Str: fmt.Sprintf("%v", evalContext.Error)},
|
|
|
|
{Str: "alerting execution attempt failed"},
|
|
|
|
})
|
|
|
|
|
2019-03-29 00:58:37 -05:00
|
|
|
if attemptID < setting.AlertingMaxAttempts {
|
2022-01-20 04:10:12 -06:00
|
|
|
span.End()
|
2019-06-03 03:25:58 -05:00
|
|
|
e.log.Debug("Job Execution attempt triggered retry", "timeMs", evalContext.GetDurationMs(), "alertId", evalContext.Rule.ID, "name", evalContext.Rule.Name, "firing", evalContext.Firing, "attemptID", attemptID)
|
2018-02-16 10:00:13 -06:00
|
|
|
attemptChan <- (attemptID + 1)
|
|
|
|
return
|
|
|
|
}
|
2017-09-18 07:53:30 -05:00
|
|
|
}
|
|
|
|
|
2019-01-28 00:56:31 -06:00
|
|
|
// create new context with timeout for notifications
|
2019-03-29 00:58:37 -05:00
|
|
|
resultHandleCtx, resultHandleCancelFn := context.WithTimeout(context.Background(), setting.AlertingNotificationTimeout)
|
2019-01-15 02:30:51 -06:00
|
|
|
cancelChan <- resultHandleCancelFn
|
2019-01-28 00:56:31 -06:00
|
|
|
|
|
|
|
// override the context used for evaluation with a new context for notifications.
|
|
|
|
// This makes it possible for notifiers to execute when datasources
|
2020-04-29 14:37:21 -05:00
|
|
|
// don't respond within the timeout limit. We should rewrite this so notifications
|
|
|
|
// don't reuse the evalContext and get its own context.
|
2019-01-15 02:30:51 -06:00
|
|
|
evalContext.Ctx = resultHandleCtx
|
2018-03-21 14:48:29 -05:00
|
|
|
evalContext.Rule.State = evalContext.GetNewState()
|
2019-10-22 07:08:18 -05:00
|
|
|
if err := e.resultHandler.handle(evalContext); err != nil {
|
2020-07-16 07:39:01 -05:00
|
|
|
switch {
|
2020-07-31 02:45:20 -05:00
|
|
|
case errors.Is(err, context.Canceled):
|
2019-10-22 07:08:18 -05:00
|
|
|
e.log.Debug("Result handler returned context.Canceled")
|
2020-07-31 02:45:20 -05:00
|
|
|
case errors.Is(err, context.DeadlineExceeded):
|
2019-10-22 07:08:18 -05:00
|
|
|
e.log.Debug("Result handler returned context.DeadlineExceeded")
|
2020-07-16 07:39:01 -05:00
|
|
|
default:
|
2019-10-22 07:08:18 -05:00
|
|
|
e.log.Error("Failed to handle result", "err", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-01-20 04:10:12 -06:00
|
|
|
span.End()
|
2019-06-03 03:25:58 -05:00
|
|
|
e.log.Debug("Job Execution completed", "timeMs", evalContext.GetDurationMs(), "alertId", evalContext.Rule.ID, "name", evalContext.Rule.Name, "firing", evalContext.Firing, "attemptID", attemptID)
|
2018-02-16 10:00:13 -06:00
|
|
|
close(attemptChan)
|
2016-09-27 09:38:19 -05:00
|
|
|
}()
|
2016-06-06 04:56:58 -05:00
|
|
|
}
|
2021-09-22 20:12:12 -05:00
|
|
|
|
|
|
|
func (e *AlertEngine) registerUsageMetrics() {
|
2021-11-03 08:10:39 -05:00
|
|
|
e.usageStatsService.RegisterMetricsFunc(func(ctx context.Context) (map[string]interface{}, error) {
|
|
|
|
alertingUsageStats, err := e.QueryUsageStats(ctx)
|
2021-09-22 20:12:12 -05:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
alertingOtherCount := 0
|
|
|
|
metrics := map[string]interface{}{}
|
|
|
|
|
|
|
|
for dsType, usageCount := range alertingUsageStats.DatasourceUsage {
|
2021-11-26 11:10:36 -06:00
|
|
|
if e.usageStatsService.ShouldBeReported(ctx, dsType) {
|
2021-09-22 20:12:12 -05:00
|
|
|
metrics[fmt.Sprintf("stats.alerting.ds.%s.count", dsType)] = usageCount
|
|
|
|
} else {
|
|
|
|
alertingOtherCount += usageCount
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
metrics["stats.alerting.ds.other.count"] = alertingOtherCount
|
|
|
|
|
|
|
|
return metrics, nil
|
|
|
|
})
|
|
|
|
}
|