mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
AlertingNG: Refactor notifier to support config reloads (#32099)
* AlertingNG: Refactor notifier to support config reloads Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Fix review comments and make reloading of config a sync operation Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Fix review comments Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
parent
5c07df9f4b
commit
8854001b67
@ -2,6 +2,7 @@ package notifier
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
@ -30,6 +31,7 @@ type AlertProvider struct {
|
||||
// GetPendingLegacy() AlertIterator, and the external code will use this
|
||||
// iterator to send to the stage.
|
||||
stage notify.Stage
|
||||
stageMtx sync.Mutex
|
||||
}
|
||||
|
||||
// NewAlertProvider returns AlertProvider that also supports legacy alerts via PutPostableAlert.
|
||||
@ -77,8 +79,10 @@ func (ap *AlertProvider) PutPostableAlert(alerts ...*PostableAlert) error {
|
||||
}
|
||||
|
||||
for recv, alerts := range groupedAlerts {
|
||||
ap.stageMtx.Lock()
|
||||
ctx := notify.WithReceiverName(context.Background(), recv)
|
||||
_, _, err := ap.stage.Exec(ctx, ap.logger, alerts...)
|
||||
ap.stageMtx.Unlock()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -87,6 +91,12 @@ func (ap *AlertProvider) PutPostableAlert(alerts ...*PostableAlert) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ap *AlertProvider) SetStage(s notify.Stage) {
|
||||
ap.stageMtx.Lock()
|
||||
defer ap.stageMtx.Unlock()
|
||||
ap.stage = s
|
||||
}
|
||||
|
||||
func alertForDelivery(a *PostableAlert) *types.Alert {
|
||||
lbls := model.LabelSet{}
|
||||
annotations := model.LabelSet{}
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
"github.com/grafana/alerting-api/pkg/api"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/registry"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
@ -34,9 +35,11 @@ type Alertmanager struct {
|
||||
silences *silence.Silences
|
||||
marker types.Marker
|
||||
alerts *AlertProvider
|
||||
dispatcher *dispatch.Dispatcher
|
||||
|
||||
wg sync.WaitGroup
|
||||
dispatcher *dispatch.Dispatcher
|
||||
dispatcherWG sync.WaitGroup
|
||||
|
||||
reloadConfigMtx sync.Mutex
|
||||
}
|
||||
|
||||
func init() {
|
||||
@ -47,25 +50,11 @@ func (am *Alertmanager) IsDisabled() bool {
|
||||
return !setting.AlertingEnabled || !setting.ExecuteAlerts
|
||||
}
|
||||
|
||||
func (am *Alertmanager) Init() error {
|
||||
func (am *Alertmanager) Init() (err error) {
|
||||
am.logger = log.New("alertmanager")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (am *Alertmanager) Run(ctx context.Context) error {
|
||||
//TODO: Speak with David Parrot wrt to the marker, we'll probably need our own.
|
||||
am.marker = types.NewMarker(prometheus.DefaultRegisterer)
|
||||
|
||||
var err error
|
||||
am.silences, err = silence.New(silence.Options{
|
||||
SnapshotFile: filepath.Join("dir", "silences"), //TODO: This is a setting
|
||||
Retention: time.Hour * 24, //TODO: This is also a setting
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to initialize the silencing component of alerting")
|
||||
}
|
||||
|
||||
am.notificationLog, err = nflog.New(
|
||||
nflog.WithRetention(time.Hour*24), //TODO: This is a setting.
|
||||
nflog.WithSnapshot(filepath.Join("dir", "notifications")), //TODO: This should be a setting
|
||||
@ -73,8 +62,65 @@ func (am *Alertmanager) Run(ctx context.Context) error {
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to initialize the notification log component of alerting")
|
||||
}
|
||||
am.silences, err = silence.New(silence.Options{
|
||||
SnapshotFile: filepath.Join("dir", "silences"), //TODO: This is a setting
|
||||
Retention: time.Hour * 24, //TODO: This is also a setting
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to initialize the silencing component of alerting")
|
||||
}
|
||||
|
||||
{
|
||||
return nil
|
||||
}
|
||||
|
||||
func (am *Alertmanager) Run(ctx context.Context) error {
|
||||
// Make sure dispatcher starts. We can tolerate future reload failures.
|
||||
if err := am.ReloadConfigFromDatabase(); err != nil {
|
||||
return err
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
am.StopAndWait()
|
||||
return nil
|
||||
case <-time.After(1 * time.Minute):
|
||||
// TODO: once we have a check to skip reload on same config, uncomment this.
|
||||
//if err := am.ReloadConfigFromDatabase(); err != nil {
|
||||
// am.logger.Error("failed to sync config from database", "error", err)
|
||||
//}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (am *Alertmanager) StopAndWait() {
|
||||
if am.dispatcher != nil {
|
||||
am.dispatcher.Stop()
|
||||
}
|
||||
am.dispatcherWG.Wait()
|
||||
}
|
||||
|
||||
// ReloadConfigFromDatabase picks the latest config from database and restarts
|
||||
// the components with the new config.
|
||||
func (am *Alertmanager) ReloadConfigFromDatabase() error {
|
||||
am.reloadConfigMtx.Lock()
|
||||
defer am.reloadConfigMtx.Unlock()
|
||||
|
||||
// TODO: check if config is same as before using hashes and skip reload in case they are same.
|
||||
cfg, err := getConfigFromDatabase()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "get config from database")
|
||||
}
|
||||
return errors.Wrap(am.ApplyConfig(cfg), "reload from config")
|
||||
}
|
||||
|
||||
func getConfigFromDatabase() (*api.PostableApiAlertingConfig, error) {
|
||||
// TODO: get configs from the database.
|
||||
return &api.PostableApiAlertingConfig{}, nil
|
||||
}
|
||||
|
||||
// ApplyConfig applies a new configuration by re-initializing all components using the configuration provided.
|
||||
// It is not safe to call concurrently.
|
||||
func (am *Alertmanager) ApplyConfig(cfg *api.PostableApiAlertingConfig) error {
|
||||
// Now, let's put together our notification pipeline
|
||||
receivers := buildIntegrationsMap()
|
||||
routingStage := make(notify.RoutingStage, len(receivers))
|
||||
@ -86,16 +132,17 @@ func (am *Alertmanager) Run(ctx context.Context) error {
|
||||
routingStage[name] = notify.MultiStage{silencingStage, stage}
|
||||
}
|
||||
|
||||
am.alerts, err = NewAlertProvider(routingStage, am.marker, gokit_log.NewNopLogger())
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to initialize alerting storage component")
|
||||
}
|
||||
am.alerts.SetStage(routingStage)
|
||||
|
||||
am.StopAndWait()
|
||||
am.dispatcher = dispatch.NewDispatcher(am.alerts, BuildRoutingConfiguration(), routingStage, am.marker, timeoutFunc, gokit_log.NewNopLogger(), nil)
|
||||
}
|
||||
|
||||
am.wg.Add(1)
|
||||
go am.dispatcher.Run()
|
||||
am.dispatcherWG.Add(1)
|
||||
go func() {
|
||||
defer am.dispatcherWG.Done()
|
||||
am.dispatcher.Run()
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user