diff --git a/pkg/services/ngalert/notifier/alert_reception.go b/pkg/services/ngalert/notifier/alert_reception.go index 47296bf70e8..01f7f60375c 100644 --- a/pkg/services/ngalert/notifier/alert_reception.go +++ b/pkg/services/ngalert/notifier/alert_reception.go @@ -2,6 +2,7 @@ package notifier import ( "context" + "sync" "time" "github.com/go-kit/kit/log" @@ -29,7 +30,8 @@ type AlertProvider struct { // will be stored in memory and provided via an iterator, for example // GetPendingLegacy() AlertIterator, and the external code will use this // iterator to send to the stage. - stage notify.Stage + stage notify.Stage + stageMtx sync.Mutex } // NewAlertProvider returns AlertProvider that also supports legacy alerts via PutPostableAlert. @@ -77,8 +79,10 @@ func (ap *AlertProvider) PutPostableAlert(alerts ...*PostableAlert) error { } for recv, alerts := range groupedAlerts { + ap.stageMtx.Lock() ctx := notify.WithReceiverName(context.Background(), recv) _, _, err := ap.stage.Exec(ctx, ap.logger, alerts...) + ap.stageMtx.Unlock() if err != nil { return err } @@ -87,6 +91,12 @@ func (ap *AlertProvider) PutPostableAlert(alerts ...*PostableAlert) error { return nil } +func (ap *AlertProvider) SetStage(s notify.Stage) { + ap.stageMtx.Lock() + defer ap.stageMtx.Unlock() + ap.stage = s +} + func alertForDelivery(a *PostableAlert) *types.Alert { lbls := model.LabelSet{} annotations := model.LabelSet{} diff --git a/pkg/services/ngalert/notifier/alertmanager.go b/pkg/services/ngalert/notifier/alertmanager.go index da0c29a129a..dceb95f0aae 100644 --- a/pkg/services/ngalert/notifier/alertmanager.go +++ b/pkg/services/ngalert/notifier/alertmanager.go @@ -20,6 +20,7 @@ import ( "github.com/prometheus/alertmanager/types" "github.com/prometheus/client_golang/prometheus" + "github.com/grafana/alerting-api/pkg/api" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/registry" "github.com/grafana/grafana/pkg/setting" @@ -31,12 +32,14 @@ type Alertmanager struct { // notificationLog keeps tracks of which notifications we've fired already. notificationLog *nflog.Log // silences keeps the track of which notifications we should not fire due to user configuration. - silences *silence.Silences - marker types.Marker - alerts *AlertProvider - dispatcher *dispatch.Dispatcher + silences *silence.Silences + marker types.Marker + alerts *AlertProvider - wg sync.WaitGroup + dispatcher *dispatch.Dispatcher + dispatcherWG sync.WaitGroup + + reloadConfigMtx sync.Mutex } func init() { @@ -47,25 +50,11 @@ func (am *Alertmanager) IsDisabled() bool { return !setting.AlertingEnabled || !setting.ExecuteAlerts } -func (am *Alertmanager) Init() error { +func (am *Alertmanager) Init() (err error) { am.logger = log.New("alertmanager") - - return nil -} - -func (am *Alertmanager) Run(ctx context.Context) error { //TODO: Speak with David Parrot wrt to the marker, we'll probably need our own. am.marker = types.NewMarker(prometheus.DefaultRegisterer) - var err error - am.silences, err = silence.New(silence.Options{ - SnapshotFile: filepath.Join("dir", "silences"), //TODO: This is a setting - Retention: time.Hour * 24, //TODO: This is also a setting - }) - if err != nil { - return errors.Wrap(err, "unable to initialize the silencing component of alerting") - } - am.notificationLog, err = nflog.New( nflog.WithRetention(time.Hour*24), //TODO: This is a setting. nflog.WithSnapshot(filepath.Join("dir", "notifications")), //TODO: This should be a setting @@ -73,29 +62,87 @@ func (am *Alertmanager) Run(ctx context.Context) error { if err != nil { return errors.Wrap(err, "unable to initialize the notification log component of alerting") } - - { - // Now, let's put together our notification pipeline - receivers := buildIntegrationsMap() - routingStage := make(notify.RoutingStage, len(receivers)) - - silencingStage := notify.NewMuteStage(silence.NewSilencer(am.silences, am.marker, gokit_log.NewNopLogger())) - //TODO: We need to unify these receivers - for name := range receivers { - stage := createReceiverStage(name, receivers[name], waitFunc, am.notificationLog) - routingStage[name] = notify.MultiStage{silencingStage, stage} - } - - am.alerts, err = NewAlertProvider(routingStage, am.marker, gokit_log.NewNopLogger()) - if err != nil { - return errors.Wrap(err, "failed to initialize alerting storage component") - } - - am.dispatcher = dispatch.NewDispatcher(am.alerts, BuildRoutingConfiguration(), routingStage, am.marker, timeoutFunc, gokit_log.NewNopLogger(), nil) + am.silences, err = silence.New(silence.Options{ + SnapshotFile: filepath.Join("dir", "silences"), //TODO: This is a setting + Retention: time.Hour * 24, //TODO: This is also a setting + }) + if err != nil { + return errors.Wrap(err, "unable to initialize the silencing component of alerting") } - am.wg.Add(1) - go am.dispatcher.Run() + return nil +} + +func (am *Alertmanager) Run(ctx context.Context) error { + // Make sure dispatcher starts. We can tolerate future reload failures. + if err := am.ReloadConfigFromDatabase(); err != nil { + return err + } + for { + select { + case <-ctx.Done(): + am.StopAndWait() + return nil + case <-time.After(1 * time.Minute): + // TODO: once we have a check to skip reload on same config, uncomment this. + //if err := am.ReloadConfigFromDatabase(); err != nil { + // am.logger.Error("failed to sync config from database", "error", err) + //} + } + } +} + +func (am *Alertmanager) StopAndWait() { + if am.dispatcher != nil { + am.dispatcher.Stop() + } + am.dispatcherWG.Wait() +} + +// ReloadConfigFromDatabase picks the latest config from database and restarts +// the components with the new config. +func (am *Alertmanager) ReloadConfigFromDatabase() error { + am.reloadConfigMtx.Lock() + defer am.reloadConfigMtx.Unlock() + + // TODO: check if config is same as before using hashes and skip reload in case they are same. + cfg, err := getConfigFromDatabase() + if err != nil { + return errors.Wrap(err, "get config from database") + } + return errors.Wrap(am.ApplyConfig(cfg), "reload from config") +} + +func getConfigFromDatabase() (*api.PostableApiAlertingConfig, error) { + // TODO: get configs from the database. + return &api.PostableApiAlertingConfig{}, nil +} + +// ApplyConfig applies a new configuration by re-initializing all components using the configuration provided. +// It is not safe to call concurrently. +func (am *Alertmanager) ApplyConfig(cfg *api.PostableApiAlertingConfig) error { + // Now, let's put together our notification pipeline + receivers := buildIntegrationsMap() + routingStage := make(notify.RoutingStage, len(receivers)) + + silencingStage := notify.NewMuteStage(silence.NewSilencer(am.silences, am.marker, gokit_log.NewNopLogger())) + //TODO: We need to unify these receivers + for name := range receivers { + stage := createReceiverStage(name, receivers[name], waitFunc, am.notificationLog) + routingStage[name] = notify.MultiStage{silencingStage, stage} + } + + am.alerts.SetStage(routingStage) + + am.StopAndWait() + am.dispatcher = dispatch.NewDispatcher(am.alerts, BuildRoutingConfiguration(), routingStage, am.marker, timeoutFunc, gokit_log.NewNopLogger(), nil) + + am.dispatcherWG.Add(1) + go func() { + defer am.dispatcherWG.Done() + am.dispatcher.Run() + }() + return nil }