Upgrade Prometheus Alertmanager and small fixes (#32280)

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
Ganesh Vernekar
2021-03-25 17:21:44 +05:30
committed by GitHub
parent 8232b6ebbc
commit 093e5947f4
3 changed files with 21 additions and 10 deletions

View File

@@ -8,6 +8,7 @@ import (
"time"
gokit_log "github.com/go-kit/kit/log"
"github.com/grafana/alerting-api/pkg/api"
"github.com/pkg/errors"
"github.com/prometheus/alertmanager/dispatch"
"github.com/prometheus/alertmanager/nflog"
@@ -20,7 +21,6 @@ import (
"github.com/prometheus/alertmanager/types"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/alerting-api/pkg/api"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/services/ngalert/models"
@@ -51,6 +51,8 @@ type Alertmanager struct {
dispatcher *dispatch.Dispatcher
dispatcherWG sync.WaitGroup
stageMetrics *notify.Metrics
reloadConfigMtx sync.Mutex
}
@@ -66,6 +68,7 @@ func (am *Alertmanager) Init() (err error) {
am.logger = log.New("alertmanager")
r := prometheus.NewRegistry()
am.marker = types.NewMarker(r)
am.stageMetrics = notify.NewMetrics(r)
am.Store = store.DBstore{SQLStore: am.SQLStore}
am.notificationLog, err = nflog.New(
@@ -137,7 +140,7 @@ func (am *Alertmanager) SyncAndApplyConfigFromDatabase() error {
if err != nil {
return errors.Wrap(err, "get config from database")
}
return errors.Wrap(am.ApplyConfig(cfg), "reload from config")
return errors.Wrap(am.applyConfig(cfg), "reload from config")
}
func (am *Alertmanager) getConfigFromDatabase() (*api.PostableUserConfig, error) {
@@ -152,8 +155,16 @@ func (am *Alertmanager) getConfigFromDatabase() (*api.PostableUserConfig, error)
}
// ApplyConfig applies a new configuration by re-initializing all components using the configuration provided.
// It is not safe to call concurrently.
func (am *Alertmanager) ApplyConfig(cfg *api.PostableUserConfig) error {
am.reloadConfigMtx.Lock()
defer am.reloadConfigMtx.Unlock()
return am.applyConfig(cfg)
}
// applyConfig applies a new configuration by re-initializing all components using the configuration provided.
// It is not safe to call concurrently.
func (am *Alertmanager) applyConfig(cfg *api.PostableUserConfig) error {
// First, we need to make sure we persist the templates to disk.
paths, _, err := PersistTemplates(cfg, am.WorkingDirPath())
if err != nil {
@@ -176,7 +187,7 @@ func (am *Alertmanager) ApplyConfig(cfg *api.PostableUserConfig) error {
silencingStage := notify.NewMuteStage(silence.NewSilencer(am.silences, am.marker, gokit_log.NewNopLogger()))
for name := range integrationsMap {
stage := createReceiverStage(name, integrationsMap[name], waitFunc, am.notificationLog)
stage := am.createReceiverStage(name, integrationsMap[name], waitFunc, am.notificationLog)
routingStage[name] = notify.MultiStage{silencingStage, stage}
}
@@ -279,7 +290,7 @@ func (am *Alertmanager) ListSilences(matchers []*labels.Matcher) ([]types.Silenc
return active[i].EndsAt.Before(active[j].EndsAt)
})
sort.Slice(pending, func(i int, j int) bool {
return pending[i].StartsAt.Before(pending[j].EndsAt)
return pending[i].EndsAt.Before(pending[j].EndsAt)
})
sort.Slice(expired, func(i int, j int) bool {
return expired[i].EndsAt.After(expired[j].EndsAt)
@@ -300,7 +311,7 @@ func (am *Alertmanager) CreateSilence(silence *types.Silence) {}
func (am *Alertmanager) DeleteSilence(silence *types.Silence) {}
// createReceiverStage creates a pipeline of stages for a receiver.
func createReceiverStage(name string, integrations []notify.Integration, wait func() time.Duration, notificationLog notify.NotificationLog) notify.Stage {
func (am *Alertmanager) createReceiverStage(name string, integrations []notify.Integration, wait func() time.Duration, notificationLog notify.NotificationLog) notify.Stage {
var fs notify.FanoutStage
for i := range integrations {
recv := &nflogpb.Receiver{
@@ -312,7 +323,7 @@ func createReceiverStage(name string, integrations []notify.Integration, wait fu
s = append(s, notify.NewWaitStage(wait))
s = append(s, notify.NewDedupStage(&integrations[i], notificationLog, recv))
//TODO: This probably won't work w/o the metrics
s = append(s, notify.NewRetryStage(integrations[i], name, nil))
s = append(s, notify.NewRetryStage(integrations[i], name, am.stageMetrics))
s = append(s, notify.NewSetNotifiesStage(notificationLog, recv))
fs = append(fs, s)