NGAlert: Run the maintenance cycle for the silences (#33301)

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
Ganesh Vernekar 2021-04-23 19:49:03 +05:30 committed by GitHub
parent ec1c85acca
commit 659ea20c3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 65 additions and 40 deletions

View File

@ -18,7 +18,6 @@ import (
"github.com/prometheus/alertmanager/nflog" "github.com/prometheus/alertmanager/nflog"
"github.com/prometheus/alertmanager/nflog/nflogpb" "github.com/prometheus/alertmanager/nflog/nflogpb"
"github.com/prometheus/alertmanager/notify" "github.com/prometheus/alertmanager/notify"
"github.com/prometheus/alertmanager/provider"
"github.com/prometheus/alertmanager/provider/mem" "github.com/prometheus/alertmanager/provider/mem"
"github.com/prometheus/alertmanager/silence" "github.com/prometheus/alertmanager/silence"
"github.com/prometheus/alertmanager/template" "github.com/prometheus/alertmanager/template"
@ -45,6 +44,8 @@ const (
workingDir = "alerting" workingDir = "alerting"
// How long should we keep silences and notification entries on-disk after they've served their purpose. // How long should we keep silences and notification entries on-disk after they've served their purpose.
retentionNotificationsAndSilences = 5 * 24 * time.Hour retentionNotificationsAndSilences = 5 * 24 * time.Hour
// maintenanceNotificationAndSilences how often should we flush and gargabe collect notifications and silences
maintenanceNotificationAndSilences = 15 * time.Minute
// defaultResolveTimeout is the default timeout used for resolving an alert // defaultResolveTimeout is the default timeout used for resolving an alert
// if the end time is not specified. // if the end time is not specified.
defaultResolveTimeout = 5 * time.Minute defaultResolveTimeout = 5 * time.Minute
@ -81,18 +82,21 @@ type Alertmanager struct {
SQLStore *sqlstore.SQLStore `inject:""` SQLStore *sqlstore.SQLStore `inject:""`
Store store.AlertingStore Store store.AlertingStore
// notificationLog keeps tracks of which notifications we've fired already.
notificationLog *nflog.Log notificationLog *nflog.Log
// silences keeps the track of which notifications we should not fire due to user configuration. marker types.Marker
alerts *mem.Alerts
route *dispatch.Route
silencer *silence.Silencer
silences *silence.Silences
marker types.Marker
alerts provider.Alerts
route *dispatch.Route
dispatcher *dispatch.Dispatcher dispatcher *dispatch.Dispatcher
inhibitor *inhibit.Inhibitor inhibitor *inhibit.Inhibitor
wg sync.WaitGroup // wg is for dispatcher, inhibitor, silences and notifications
// Across configuration changes dispatcher and inhibitor are completely replaced, however, silences, notification log and alerts remain the same.
// stopc is used to let silences and notifications know we are done.
wg sync.WaitGroup
stopc chan struct{}
silencer *silence.Silencer
silences *silence.Silences
stageMetrics *notify.Metrics stageMetrics *notify.Metrics
dispatcherMetrics *dispatch.DispatcherMetrics dispatcherMetrics *dispatch.DispatcherMetrics
@ -113,6 +117,7 @@ func (am *Alertmanager) IsDisabled() bool {
} }
func (am *Alertmanager) Init() (err error) { func (am *Alertmanager) Init() (err error) {
am.stopc = make(chan struct{})
am.logger = log.New("alertmanager") am.logger = log.New("alertmanager")
r := prometheus.NewRegistry() r := prometheus.NewRegistry()
am.marker = types.NewMarker(r) am.marker = types.NewMarker(r)
@ -120,13 +125,17 @@ func (am *Alertmanager) Init() (err error) {
am.dispatcherMetrics = dispatch.NewDispatcherMetrics(r) am.dispatcherMetrics = dispatch.NewDispatcherMetrics(r)
am.Store = store.DBstore{SQLStore: am.SQLStore} am.Store = store.DBstore{SQLStore: am.SQLStore}
// Initialize the notification log
am.wg.Add(1)
am.notificationLog, err = nflog.New( am.notificationLog, err = nflog.New(
nflog.WithRetention(retentionNotificationsAndSilences), nflog.WithRetention(retentionNotificationsAndSilences),
nflog.WithSnapshot(filepath.Join(am.WorkingDirPath(), "notifications")), nflog.WithSnapshot(filepath.Join(am.WorkingDirPath(), "notifications")),
nflog.WithMaintenance(maintenanceNotificationAndSilences, am.stopc, am.wg.Done),
) )
if err != nil { if err != nil {
return fmt.Errorf("unable to initialize the notification log component of alerting: %w", err) return fmt.Errorf("unable to initialize the notification log component of alerting: %w", err)
} }
// Initialize silences
am.silences, err = silence.New(silence.Options{ am.silences, err = silence.New(silence.Options{
SnapshotFile: filepath.Join(am.WorkingDirPath(), "silences"), SnapshotFile: filepath.Join(am.WorkingDirPath(), "silences"),
Retention: retentionNotificationsAndSilences, Retention: retentionNotificationsAndSilences,
@ -135,6 +144,13 @@ func (am *Alertmanager) Init() (err error) {
return fmt.Errorf("unable to initialize the silencing component of alerting: %w", err) return fmt.Errorf("unable to initialize the silencing component of alerting: %w", err)
} }
am.wg.Add(1)
go func() {
am.silences.Maintenance(15*time.Minute, filepath.Join(am.WorkingDirPath(), "silences"), am.stopc)
am.wg.Done()
}()
// Initialize in-memory alerts
am.alerts, err = mem.NewAlerts(context.Background(), am.marker, memoryAlertsGCInterval, gokit_log.NewNopLogger()) am.alerts, err = mem.NewAlerts(context.Background(), am.marker, memoryAlertsGCInterval, gokit_log.NewNopLogger())
if err != nil { if err != nil {
return fmt.Errorf("unable to initialize the alert provider component of alerting: %w", err) return fmt.Errorf("unable to initialize the alert provider component of alerting: %w", err)
@ -152,8 +168,7 @@ func (am *Alertmanager) Run(ctx context.Context) error {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
am.StopAndWait() return am.StopAndWait()
return nil
case <-time.After(pollInterval): case <-time.After(pollInterval):
if err := am.SyncAndApplyConfigFromDatabase(); err != nil { if err := am.SyncAndApplyConfigFromDatabase(); err != nil {
am.logger.Error("unable to sync configuration", "err", err) am.logger.Error("unable to sync configuration", "err", err)
@ -167,7 +182,7 @@ func (am *Alertmanager) AddMigration(mg *migrator.Migrator) {
alertmanagerConfigurationMigration(mg) alertmanagerConfigurationMigration(mg)
} }
func (am *Alertmanager) StopAndWait() { func (am *Alertmanager) StopAndWait() error {
if am.dispatcher != nil { if am.dispatcher != nil {
am.dispatcher.Stop() am.dispatcher.Stop()
} }
@ -175,7 +190,13 @@ func (am *Alertmanager) StopAndWait() {
if am.inhibitor != nil { if am.inhibitor != nil {
am.inhibitor.Stop() am.inhibitor.Stop()
} }
am.alerts.Close()
close(am.stopc)
am.wg.Wait() am.wg.Wait()
return nil
} }
func (am *Alertmanager) SaveAndApplyConfig(cfg *apimodels.PostableUserConfig) error { func (am *Alertmanager) SaveAndApplyConfig(cfg *apimodels.PostableUserConfig) error {
@ -288,6 +309,13 @@ func (am *Alertmanager) applyConfig(cfg *apimodels.PostableUserConfig) error {
// Now, let's put together our notification pipeline // Now, let's put together our notification pipeline
routingStage := make(notify.RoutingStage, len(integrationsMap)) routingStage := make(notify.RoutingStage, len(integrationsMap))
if am.inhibitor != nil {
am.inhibitor.Stop()
}
if am.dispatcher != nil {
am.dispatcher.Stop()
}
am.inhibitor = inhibit.NewInhibitor(am.alerts, cfg.AlertmanagerConfig.InhibitRules, am.marker, gokit_log.NewNopLogger()) am.inhibitor = inhibit.NewInhibitor(am.alerts, cfg.AlertmanagerConfig.InhibitRules, am.marker, gokit_log.NewNopLogger())
am.silencer = silence.NewSilencer(am.silences, am.marker, gokit_log.NewNopLogger()) am.silencer = silence.NewSilencer(am.silences, am.marker, gokit_log.NewNopLogger())
@ -298,7 +326,6 @@ func (am *Alertmanager) applyConfig(cfg *apimodels.PostableUserConfig) error {
routingStage[name] = notify.MultiStage{silencingStage, inhibitionStage, stage} routingStage[name] = notify.MultiStage{silencingStage, inhibitionStage, stage}
} }
am.StopAndWait()
am.route = dispatch.NewRoute(cfg.AlertmanagerConfig.Route, nil) am.route = dispatch.NewRoute(cfg.AlertmanagerConfig.Route, nil)
am.dispatcher = dispatch.NewDispatcher(am.alerts, am.route, routingStage, am.marker, timeoutFunc, gokit_log.NewNopLogger(), am.dispatcherMetrics) am.dispatcher = dispatch.NewDispatcher(am.alerts, am.route, routingStage, am.marker, timeoutFunc, gokit_log.NewNopLogger(), am.dispatcherMetrics)

View File

@ -1,16 +1,19 @@
package notifier package notifier
import ( import (
"context"
"errors" "errors"
"io/ioutil" "io/ioutil"
"os" "os"
"testing" "testing"
"time" "time"
gokit_log "github.com/go-kit/kit/log"
"github.com/go-openapi/strfmt" "github.com/go-openapi/strfmt"
"github.com/prometheus/alertmanager/api/v2/models" "github.com/prometheus/alertmanager/api/v2/models"
"github.com/prometheus/alertmanager/provider" "github.com/prometheus/alertmanager/provider/mem"
"github.com/prometheus/alertmanager/types" "github.com/prometheus/alertmanager/types"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -20,27 +23,26 @@ import (
) )
func TestAlertmanager_ShouldUseDefaultConfigurationWhenNoConfiguration(t *testing.T) { func TestAlertmanager_ShouldUseDefaultConfigurationWhenNoConfiguration(t *testing.T) {
am := &Alertmanager{ am := &Alertmanager{}
Settings: &setting.Cfg{}, am.Settings = &setting.Cfg{}
SQLStore: sqlstore.InitTestDB(t), am.SQLStore = sqlstore.InitTestDB(t)
}
require.NoError(t, am.Init()) require.NoError(t, am.Init())
require.NoError(t, am.SyncAndApplyConfigFromDatabase()) require.NoError(t, am.SyncAndApplyConfigFromDatabase())
require.NotNil(t, am.config) require.NotNil(t, am.config)
} }
func TestPutAlert(t *testing.T) { func TestPutAlert(t *testing.T) {
am := &Alertmanager{}
dir, err := ioutil.TempDir("", "") dir, err := ioutil.TempDir("", "")
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { t.Cleanup(func() {
require.NoError(t, os.RemoveAll(dir)) require.NoError(t, os.RemoveAll(dir))
}) })
am := &Alertmanager{ am.Settings = &setting.Cfg{
Settings: &setting.Cfg{ DataPath: dir,
DataPath: dir,
},
} }
require.NoError(t, am.Init()) require.NoError(t, am.Init())
startTime := time.Now() startTime := time.Now()
@ -259,34 +261,30 @@ func TestPutAlert(t *testing.T) {
for _, c := range cases { for _, c := range cases {
t.Run(c.title, func(t *testing.T) { t.Run(c.title, func(t *testing.T) {
alertProvider := &mockAlertProvider{} r := prometheus.NewRegistry()
am.alerts = alertProvider am.marker = types.NewMarker(r)
am.alerts, err = mem.NewAlerts(context.Background(), am.marker, 15*time.Minute, gokit_log.NewNopLogger())
require.NoError(t, err)
alerts := []*types.Alert{}
err := am.PutAlerts(c.postableAlerts) err := am.PutAlerts(c.postableAlerts)
if c.expError != nil { if c.expError != nil {
require.Error(t, err) require.Error(t, err)
require.Equal(t, c.expError, err) require.Equal(t, c.expError, err)
require.Equal(t, 0, len(alertProvider.alerts)) require.Equal(t, 0, len(alerts))
return return
} }
require.NoError(t, err) require.NoError(t, err)
iter := am.alerts.GetPending()
defer iter.Close()
for a := range iter.Next() {
alerts = append(alerts, a)
}
// We take the "now" time from one of the UpdatedAt. // We take the "now" time from one of the UpdatedAt.
now := alertProvider.alerts[0].UpdatedAt now := alerts[0].UpdatedAt
require.Equal(t, c.expAlerts(now), alertProvider.alerts) require.Equal(t, c.expAlerts(now), alerts)
}) })
} }
} }
type mockAlertProvider struct {
alerts []*types.Alert
}
func (a *mockAlertProvider) Subscribe() provider.AlertIterator { return nil }
func (a *mockAlertProvider) GetPending() provider.AlertIterator { return nil }
func (a *mockAlertProvider) Get(model.Fingerprint) (*types.Alert, error) { return nil, nil }
func (a *mockAlertProvider) Put(alerts ...*types.Alert) error {
a.alerts = append(a.alerts, alerts...)
return nil
}