diff --git a/pkg/services/ngalert/sender/notifier.go b/pkg/services/ngalert/sender/notifier.go new file mode 100644 index 00000000000..19fcf21c690 --- /dev/null +++ b/pkg/services/ngalert/sender/notifier.go @@ -0,0 +1,609 @@ +// THIS FILE IS COPIED FROM UPSTREAM +// +// https://github.com/prometheus/prometheus/blob/d437f0bb6b53ec8594a43b871f92252980b13ddd/notifier/notifier.go +// +// Copyright 2013 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// nolint +package sender + +import ( + "context" + "fmt" + "net" + "net/http" + "net/url" + "path" + "strings" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/go-openapi/strfmt" + "github.com/pkg/errors" + "github.com/prometheus/alertmanager/api/v2/models" + "github.com/prometheus/client_golang/prometheus" + config_util "github.com/prometheus/common/config" + "github.com/prometheus/common/model" + "github.com/prometheus/common/version" + + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/discovery/targetgroup" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/relabel" +) + +const ( + contentTypeJSON = "application/json" +) + +// String constants for instrumentation. +const ( + namespace = "prometheus" + subsystem = "notifications" + alertmanagerLabel = "alertmanager" +) + +var userAgent = fmt.Sprintf("Prometheus/%s", version.Version) + +// Alert is a generic representation of an alert in the Prometheus eco-system. +type Alert struct { + // Label value pairs for purpose of aggregation, matching, and disposition + // dispatching. This must minimally include an "alertname" label. + Labels labels.Labels `json:"labels"` + + // Extra key/value information which does not define alert identity. + Annotations labels.Labels `json:"annotations"` + + // The known time range for this alert. Both ends are optional. + StartsAt time.Time `json:"startsAt,omitempty"` + EndsAt time.Time `json:"endsAt,omitempty"` + GeneratorURL string `json:"generatorURL,omitempty"` +} + +// Name returns the name of the alert. It is equivalent to the "alertname" label. +func (a *Alert) Name() string { + return a.Labels.Get(labels.AlertName) +} + +// Hash returns a hash over the alert. It is equivalent to the alert labels hash. +func (a *Alert) Hash() uint64 { + return a.Labels.Hash() +} + +func (a *Alert) String() string { + s := fmt.Sprintf("%s[%s]", a.Name(), fmt.Sprintf("%016x", a.Hash())[:7]) + if a.Resolved() { + return s + "[resolved]" + } + return s + "[active]" +} + +// Resolved returns true iff the activity interval ended in the past. +func (a *Alert) Resolved() bool { + return a.ResolvedAt(time.Now()) +} + +// ResolvedAt returns true iff the activity interval ended before +// the given timestamp. +func (a *Alert) ResolvedAt(ts time.Time) bool { + if a.EndsAt.IsZero() { + return false + } + return !a.EndsAt.After(ts) +} + +// Manager is responsible for dispatching alert notifications to an +// alert manager service. +type Manager struct { + queue []*Alert + opts *Options + + metrics *alertMetrics + + more chan struct{} + mtx sync.RWMutex + ctx context.Context + cancel func() + + alertmanagers map[string]*alertmanagerSet + logger log.Logger +} + +// Options are the configurable parameters of a Handler. +type Options struct { + QueueCapacity int + ExternalLabels labels.Labels + RelabelConfigs []*relabel.Config + // Used for sending HTTP requests to the Alertmanager. + Do func(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) + + Registerer prometheus.Registerer +} + +type alertMetrics struct { + latency *prometheus.SummaryVec + errors *prometheus.CounterVec + sent *prometheus.CounterVec + dropped prometheus.Counter + queueLength prometheus.GaugeFunc + queueCapacity prometheus.Gauge + alertmanagersDiscovered prometheus.GaugeFunc +} + +func newAlertMetrics(r prometheus.Registerer, queueCap int, queueLen, alertmanagersDiscovered func() float64) *alertMetrics { + m := &alertMetrics{ + latency: prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "latency_seconds", + Help: "Latency quantiles for sending alert notifications.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }, + []string{alertmanagerLabel}, + ), + errors: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "errors_total", + Help: "Total number of errors sending alert notifications.", + }, + []string{alertmanagerLabel}, + ), + sent: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "sent_total", + Help: "Total number of alerts sent.", + }, + []string{alertmanagerLabel}, + ), + dropped: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "dropped_total", + Help: "Total number of alerts dropped due to errors when sending to Alertmanager.", + }), + queueLength: prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "queue_length", + Help: "The number of alert notifications in the queue.", + }, queueLen), + queueCapacity: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "queue_capacity", + Help: "The capacity of the alert notifications queue.", + }), + alertmanagersDiscovered: prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "prometheus_notifications_alertmanagers_discovered", + Help: "The number of alertmanagers discovered and active.", + }, alertmanagersDiscovered), + } + + m.queueCapacity.Set(float64(queueCap)) + + if r != nil { + r.MustRegister( + m.latency, + m.errors, + m.sent, + m.dropped, + m.queueLength, + m.queueCapacity, + m.alertmanagersDiscovered, + ) + } + + return m +} + +func do(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) { + if client == nil { + client = http.DefaultClient + } + return client.Do(req.WithContext(ctx)) +} + +// NewManager is the manager constructor. +func NewManager(o *Options, logger log.Logger) *Manager { + ctx, cancel := context.WithCancel(context.Background()) + + if o.Do == nil { + o.Do = do + } + if logger == nil { + logger = log.NewNopLogger() + } + + n := &Manager{ + queue: make([]*Alert, 0, o.QueueCapacity), + ctx: ctx, + cancel: cancel, + more: make(chan struct{}, 1), + opts: o, + logger: logger, + } + + queueLenFunc := func() float64 { return float64(n.queueLen()) } + alertmanagersDiscoveredFunc := func() float64 { return float64(len(n.Alertmanagers())) } + + n.metrics = newAlertMetrics( + o.Registerer, + o.QueueCapacity, + queueLenFunc, + alertmanagersDiscoveredFunc, + ) + + return n +} + +const maxBatchSize = 64 + +func (n *Manager) queueLen() int { + n.mtx.RLock() + defer n.mtx.RUnlock() + + return len(n.queue) +} + +func (n *Manager) nextBatch() []*Alert { + n.mtx.Lock() + defer n.mtx.Unlock() + + var alerts []*Alert + + if len(n.queue) > maxBatchSize { + alerts = append(make([]*Alert, 0, maxBatchSize), n.queue[:maxBatchSize]...) + n.queue = n.queue[maxBatchSize:] + } else { + alerts = append(make([]*Alert, 0, len(n.queue)), n.queue...) + n.queue = n.queue[:0] + } + + return alerts +} + +// Run dispatches notifications continuously. +func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) { + + for { + select { + case <-n.ctx.Done(): + return + case ts := <-tsets: + n.reload(ts) + case <-n.more: + } + alerts := n.nextBatch() + + if !n.sendAll(alerts...) { + n.metrics.dropped.Add(float64(len(alerts))) + } + // If the queue still has items left, kick off the next iteration. + if n.queueLen() > 0 { + n.setMore() + } + } +} + +func (n *Manager) reload(tgs map[string][]*targetgroup.Group) { + n.mtx.Lock() + defer n.mtx.Unlock() + + for id, tgroup := range tgs { + am, ok := n.alertmanagers[id] + if !ok { + level.Error(n.logger).Log("msg", "couldn't sync alert manager set", "err", fmt.Sprintf("invalid id:%v", id)) + continue + } + am.sync(tgroup) + } +} + +// Send queues the given notification requests for processing. +// Panics if called on a handler that is not running. +func (n *Manager) Send(alerts ...*Alert) { + n.mtx.Lock() + defer n.mtx.Unlock() + + // Attach external labels before relabelling and sending. + for _, a := range alerts { + lb := labels.NewBuilder(a.Labels) + + for _, l := range n.opts.ExternalLabels { + if a.Labels.Get(l.Name) == "" { + lb.Set(l.Name, l.Value) + } + } + + a.Labels = lb.Labels() + } + + alerts = n.relabelAlerts(alerts) + if len(alerts) == 0 { + return + } + + // Queue capacity should be significantly larger than a single alert + // batch could be. + if d := len(alerts) - n.opts.QueueCapacity; d > 0 { + alerts = alerts[d:] + + level.Warn(n.logger).Log("msg", "Alert batch larger than queue capacity, dropping alerts", "num_dropped", d) + n.metrics.dropped.Add(float64(d)) + } + + // If the queue is full, remove the oldest alerts in favor + // of newer ones. + if d := (len(n.queue) + len(alerts)) - n.opts.QueueCapacity; d > 0 { + n.queue = n.queue[d:] + + level.Warn(n.logger).Log("msg", "Alert notification queue full, dropping alerts", "num_dropped", d) + n.metrics.dropped.Add(float64(d)) + } + n.queue = append(n.queue, alerts...) + + // Notify sending goroutine that there are alerts to be processed. + n.setMore() +} + +func (n *Manager) relabelAlerts(alerts []*Alert) []*Alert { + var relabeledAlerts []*Alert + + for _, alert := range alerts { + labels := relabel.Process(alert.Labels, n.opts.RelabelConfigs...) + if labels != nil { + alert.Labels = labels + relabeledAlerts = append(relabeledAlerts, alert) + } + } + return relabeledAlerts +} + +// setMore signals that the alert queue has items. +func (n *Manager) setMore() { + // If we cannot send on the channel, it means the signal already exists + // and has not been consumed yet. + select { + case n.more <- struct{}{}: + default: + } +} + +// Alertmanagers returns a slice of Alertmanager URLs. +func (n *Manager) Alertmanagers() []*url.URL { + n.mtx.RLock() + amSets := n.alertmanagers + n.mtx.RUnlock() + + var res []*url.URL + + for _, ams := range amSets { + ams.mtx.RLock() + for _, am := range ams.ams { + res = append(res, am.url()) + } + ams.mtx.RUnlock() + } + + return res +} + +// DroppedAlertmanagers returns a slice of Alertmanager URLs. +func (n *Manager) DroppedAlertmanagers() []*url.URL { + n.mtx.RLock() + amSets := n.alertmanagers + n.mtx.RUnlock() + + var res []*url.URL + + for _, ams := range amSets { + ams.mtx.RLock() + for _, dam := range ams.droppedAms { + res = append(res, dam.url()) + } + ams.mtx.RUnlock() + } + + return res +} + +func alertsToOpenAPIAlerts(alerts []*Alert) models.PostableAlerts { + openAPIAlerts := models.PostableAlerts{} + for _, a := range alerts { + start := strfmt.DateTime(a.StartsAt) + end := strfmt.DateTime(a.EndsAt) + openAPIAlerts = append(openAPIAlerts, &models.PostableAlert{ + Annotations: labelsToOpenAPILabelSet(a.Annotations), + EndsAt: end, + StartsAt: start, + Alert: models.Alert{ + GeneratorURL: strfmt.URI(a.GeneratorURL), + Labels: labelsToOpenAPILabelSet(a.Labels), + }, + }) + } + + return openAPIAlerts +} + +func labelsToOpenAPILabelSet(modelLabelSet labels.Labels) models.LabelSet { + apiLabelSet := models.LabelSet{} + for _, label := range modelLabelSet { + apiLabelSet[label.Name] = label.Value + } + + return apiLabelSet +} + +// Stop shuts down the notification handler. +func (n *Manager) Stop() { + level.Info(n.logger).Log("msg", "Stopping notification manager...") + n.cancel() +} + +// alertmanager holds Alertmanager endpoint information. +type alertmanager interface { + url() *url.URL +} + +type alertmanagerLabels struct { + labels.Labels +} + +const pathLabel = "__alerts_path__" + +func (a alertmanagerLabels) url() *url.URL { + return &url.URL{ + Scheme: a.Get(model.SchemeLabel), + Host: a.Get(model.AddressLabel), + Path: a.Get(pathLabel), + } +} + +func newAlertmanagerSet(cfg *config.AlertmanagerConfig, logger log.Logger, metrics *alertMetrics) (*alertmanagerSet, error) { + client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, "alertmanager") + if err != nil { + return nil, err + } + s := &alertmanagerSet{ + client: client, + cfg: cfg, + logger: logger, + metrics: metrics, + } + return s, nil +} + +// sync extracts a deduplicated set of Alertmanager endpoints from a list +// of target groups definitions. +func (s *alertmanagerSet) sync(tgs []*targetgroup.Group) { + allAms := []alertmanager{} + allDroppedAms := []alertmanager{} + + for _, tg := range tgs { + ams, droppedAms, err := alertmanagerFromGroup(tg, s.cfg) + if err != nil { + level.Error(s.logger).Log("msg", "Creating discovered Alertmanagers failed", "err", err) + continue + } + allAms = append(allAms, ams...) + allDroppedAms = append(allDroppedAms, droppedAms...) + } + + s.mtx.Lock() + defer s.mtx.Unlock() + // Set new Alertmanagers and deduplicate them along their unique URL. + s.ams = []alertmanager{} + s.droppedAms = []alertmanager{} + s.droppedAms = append(s.droppedAms, allDroppedAms...) + seen := map[string]struct{}{} + + for _, am := range allAms { + us := am.url().String() + if _, ok := seen[us]; ok { + continue + } + + // This will initialize the Counters for the AM to 0. + s.metrics.sent.WithLabelValues(us) + s.metrics.errors.WithLabelValues(us) + + seen[us] = struct{}{} + s.ams = append(s.ams, am) + } +} + +func postPath(pre string, v config.AlertmanagerAPIVersion) string { + alertPushEndpoint := fmt.Sprintf("/api/%v/alerts", string(v)) + return path.Join("/", pre, alertPushEndpoint) +} + +// alertmanagerFromGroup extracts a list of alertmanagers from a target group +// and an associated AlertmanagerConfig. +func alertmanagerFromGroup(tg *targetgroup.Group, cfg *config.AlertmanagerConfig) ([]alertmanager, []alertmanager, error) { + var res []alertmanager + var droppedAlertManagers []alertmanager + + for _, tlset := range tg.Targets { + lbls := make([]labels.Label, 0, len(tlset)+2+len(tg.Labels)) + + for ln, lv := range tlset { + lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)}) + } + // Set configured scheme as the initial scheme label for overwrite. + lbls = append(lbls, labels.Label{Name: model.SchemeLabel, Value: cfg.Scheme}) + lbls = append(lbls, labels.Label{Name: pathLabel, Value: postPath(cfg.PathPrefix, cfg.APIVersion)}) + + // Combine target labels with target group labels. + for ln, lv := range tg.Labels { + if _, ok := tlset[ln]; !ok { + lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)}) + } + } + + lset := relabel.Process(labels.New(lbls...), cfg.RelabelConfigs...) + if lset == nil { + droppedAlertManagers = append(droppedAlertManagers, alertmanagerLabels{lbls}) + continue + } + + lb := labels.NewBuilder(lset) + + // addPort checks whether we should add a default port to the address. + // If the address is not valid, we don't append a port either. + addPort := func(s string) bool { + // If we can split, a port exists and we don't have to add one. + if _, _, err := net.SplitHostPort(s); err == nil { + return false + } + // If adding a port makes it valid, the previous error + // was not due to an invalid address and we can append a port. + _, _, err := net.SplitHostPort(s + ":1234") + return err == nil + } + addr := lset.Get(model.AddressLabel) + // If it's an address with no trailing port, infer it based on the used scheme. + if addPort(addr) { + // Addresses reaching this point are already wrapped in [] if necessary. + switch lset.Get(model.SchemeLabel) { + case "http", "": + addr = addr + ":80" + case "https": + addr = addr + ":443" + default: + return nil, nil, errors.Errorf("invalid scheme: %q", cfg.Scheme) + } + lb.Set(model.AddressLabel, addr) + } + + if err := config.CheckTargetAddress(model.LabelValue(addr)); err != nil { + return nil, nil, err + } + + // Meta labels are deleted after relabelling. Other internal labels propagate to + // the target which decides whether they will be part of their label set. + for _, l := range lset { + if strings.HasPrefix(l.Name, model.MetaLabelPrefix) { + lb.Del(l.Name) + } + } + + res = append(res, alertmanagerLabels{lset}) + } + return res, droppedAlertManagers, nil +} diff --git a/pkg/services/ngalert/sender/notifier_ext.go b/pkg/services/ngalert/sender/notifier_ext.go new file mode 100644 index 00000000000..7b94c45e0cd --- /dev/null +++ b/pkg/services/ngalert/sender/notifier_ext.go @@ -0,0 +1,199 @@ +// This extension file contains all changed functions that would normally be +// in notifier.go. This helps us to keep track of the changes compared +// to upstream. +// Changes are denoted explicitly by a comment with the prefix "Extension:" + +// nolint +package sender + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "sync" + "time" + + "go.uber.org/atomic" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/config" +) + +// ApplyConfig updates the status state as the new config requires. +// Extension: add new parameter headers. +func (n *Manager) ApplyConfig(conf *config.Config, headers map[string]map[string]string) error { + n.mtx.Lock() + defer n.mtx.Unlock() + + n.opts.ExternalLabels = conf.GlobalConfig.ExternalLabels + n.opts.RelabelConfigs = conf.AlertingConfig.AlertRelabelConfigs + + amSets := make(map[string]*alertmanagerSet) + + for k, cfg := range conf.AlertingConfig.AlertmanagerConfigs.ToMap() { + ams, err := newAlertmanagerSet(cfg, n.logger, n.metrics) + if err != nil { + return err + } + // Extension: set the headers to the alertmanager set. + if headers, ok := headers[k]; ok { + ams.headers = headers + } + amSets[k] = ams + } + + n.alertmanagers = amSets + + return nil +} + +// alertmanagerSet contains a set of Alertmanagers discovered via a group of service +// discovery definitions that have a common configuration on how alerts should be sent. +type alertmanagerSet struct { + cfg *config.AlertmanagerConfig + client *http.Client + + // Extension: headers that should be used for the http requests to the alertmanagers. + headers map[string]string + + metrics *alertMetrics + + mtx sync.RWMutex + ams []alertmanager + droppedAms []alertmanager + logger log.Logger +} + +// sendAll sends the alerts to all configured Alertmanagers concurrently. +// It returns true if the alerts could be sent successfully to at least one Alertmanager. +func (n *Manager) sendAll(alerts ...*Alert) bool { + if len(alerts) == 0 { + return true + } + + begin := time.Now() + + // v1Payload and v2Payload represent 'alerts' marshaled for Alertmanager API + // v1 or v2. Marshaling happens below. Reference here is for caching between + // for loop iterations. + var v1Payload, v2Payload []byte + + n.mtx.RLock() + amSets := n.alertmanagers + n.mtx.RUnlock() + + var ( + wg sync.WaitGroup + numSuccess atomic.Uint64 + ) + for _, ams := range amSets { + var ( + payload []byte + err error + ) + + ams.mtx.RLock() + + switch ams.cfg.APIVersion { + case config.AlertmanagerAPIVersionV1: + { + if v1Payload == nil { + v1Payload, err = json.Marshal(alerts) + if err != nil { + level.Error(n.logger).Log("msg", "Encoding alerts for Alertmanager API v1 failed", "err", err) + ams.mtx.RUnlock() + return false + } + } + + payload = v1Payload + } + case config.AlertmanagerAPIVersionV2: + { + if v2Payload == nil { + openAPIAlerts := alertsToOpenAPIAlerts(alerts) + + v2Payload, err = json.Marshal(openAPIAlerts) + if err != nil { + level.Error(n.logger).Log("msg", "Encoding alerts for Alertmanager API v2 failed", "err", err) + ams.mtx.RUnlock() + return false + } + } + + payload = v2Payload + } + default: + { + level.Error(n.logger).Log( + "msg", fmt.Sprintf("Invalid Alertmanager API version '%v', expected one of '%v'", ams.cfg.APIVersion, config.SupportedAlertmanagerAPIVersions), + "err", err, + ) + ams.mtx.RUnlock() + return false + } + } + + for _, am := range ams.ams { + wg.Add(1) + + ctx, cancel := context.WithTimeout(n.ctx, time.Duration(ams.cfg.Timeout)) + defer cancel() + + // Extension: added headers parameter. + go func(client *http.Client, url string, headers map[string]string) { + if err := n.sendOne(ctx, client, url, payload, headers); err != nil { + level.Error(n.logger).Log("alertmanager", url, "count", len(alerts), "msg", "Error sending alert", "err", err) + n.metrics.errors.WithLabelValues(url).Inc() + } else { + numSuccess.Inc() + } + n.metrics.latency.WithLabelValues(url).Observe(time.Since(begin).Seconds()) + n.metrics.sent.WithLabelValues(url).Add(float64(len(alerts))) + + wg.Done() + }(ams.client, am.url().String(), ams.headers) + } + + ams.mtx.RUnlock() + } + + wg.Wait() + + return numSuccess.Load() > 0 +} + +// Extension: added headers parameter. +func (n *Manager) sendOne(ctx context.Context, c *http.Client, url string, b []byte, headers map[string]string) error { + req, err := http.NewRequest("POST", url, bytes.NewReader(b)) + if err != nil { + return err + } + req.Header.Set("User-Agent", userAgent) + req.Header.Set("Content-Type", contentTypeJSON) + // Extension: set headers. + for k, v := range headers { + req.Header.Set(k, v) + } + resp, err := n.opts.Do(ctx, c, req) + if err != nil { + return err + } + defer func() { + io.Copy(ioutil.Discard, resp.Body) + resp.Body.Close() + }() + + // Any HTTP status 2xx is OK. + if resp.StatusCode/100 != 2 { + return errors.Errorf("bad response status %s", resp.Status) + } + + return nil +} diff --git a/pkg/services/ngalert/sender/router.go b/pkg/services/ngalert/sender/router.go index ed36e5bd76a..56302f7eadf 100644 --- a/pkg/services/ngalert/sender/router.go +++ b/pkg/services/ngalert/sender/router.go @@ -128,8 +128,12 @@ func (d *AlertsRouter) SyncAndApplyConfigFromDatabase() error { redactedAMs := buildRedactedAMs(d.logger, alertmanagers, cfg.OrgID) d.logger.Debug("Alertmanagers found in the configuration", "alertmanagers", redactedAMs) + var hashes []string + for _, cfg := range alertmanagers { + hashes = append(hashes, cfg.SHA256()) + } // We have a running sender, check if we need to apply a new config. - amHash := asSHA256(alertmanagers) + amHash := asSHA256(hashes) if ok { if d.externalAlertmanagersCfgHash[cfg.OrgID] == amHash { d.logger.Debug("Sender configuration is the same as the one running, no-op", "org", cfg.OrgID, "alertmanagers", redactedAMs) @@ -184,10 +188,10 @@ func (d *AlertsRouter) SyncAndApplyConfigFromDatabase() error { return nil } -func buildRedactedAMs(l log.Logger, alertmanagers []string, ordId int64) []string { +func buildRedactedAMs(l log.Logger, alertmanagers []externalAMcfg, ordId int64) []string { var redactedAMs []string for _, am := range alertmanagers { - parsedAM, err := url.Parse(am) + parsedAM, err := url.Parse(am.amURL) if err != nil { l.Error("Failed to parse alertmanager string", "org", ordId, "error", err) continue @@ -204,8 +208,10 @@ func asSHA256(strings []string) string { return fmt.Sprintf("%x", h.Sum(nil)) } -func (d *AlertsRouter) alertmanagersFromDatasources(orgID int64) ([]string, error) { - var alertmanagers []string +func (d *AlertsRouter) alertmanagersFromDatasources(orgID int64) ([]externalAMcfg, error) { + var ( + alertmanagers []externalAMcfg + ) // We might have alertmanager datasources that are acting as external // alertmanager, let's fetch them. query := &datasources.GetDataSourcesByTypeQuery{ @@ -230,7 +236,20 @@ func (d *AlertsRouter) alertmanagersFromDatasources(orgID int64) ([]string, erro "error", err) continue } - alertmanagers = append(alertmanagers, amURL) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + headers, err := d.datasourceService.CustomHeaders(ctx, ds) + cancel() + if err != nil { + d.logger.Error("Failed to get headers for external alertmanager", + "org", ds.OrgID, + "uid", ds.UID, + "error", err) + continue + } + alertmanagers = append(alertmanagers, externalAMcfg{ + amURL: amURL, + headers: headers, + }) } return alertmanagers, nil } diff --git a/pkg/services/ngalert/sender/router_test.go b/pkg/services/ngalert/sender/router_test.go index 4310f0f3c8b..1447224bd9f 100644 --- a/pkg/services/ngalert/sender/router_test.go +++ b/pkg/services/ngalert/sender/router_test.go @@ -589,7 +589,13 @@ func TestAlertManagers_buildRedactedAMs(t *testing.T) { for _, tt := range tc { t.Run(tt.name, func(t *testing.T) { - require.Equal(t, tt.expected, buildRedactedAMs(&fakeLogger, tt.amUrls, tt.orgId)) + var cfgs []externalAMcfg + for _, url := range tt.amUrls { + cfgs = append(cfgs, externalAMcfg{ + amURL: url, + }) + } + require.Equal(t, tt.expected, buildRedactedAMs(&fakeLogger, cfgs, tt.orgId)) require.Equal(t, tt.errCalls, fakeLogger.ErrorLogs.Calls) require.Equal(t, tt.errLog, fakeLogger.ErrorLogs.Message) }) diff --git a/pkg/services/ngalert/sender/sender.go b/pkg/services/ngalert/sender/sender.go index 43bc1a1b6f6..8e09015a463 100644 --- a/pkg/services/ngalert/sender/sender.go +++ b/pkg/services/ngalert/sender/sender.go @@ -19,7 +19,6 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/notifier" "github.com/grafana/grafana/pkg/infra/log" apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions" @@ -35,12 +34,40 @@ type ExternalAlertmanager struct { logger log.Logger wg sync.WaitGroup - manager *notifier.Manager + manager *Manager sdCancel context.CancelFunc sdManager *discovery.Manager } +type externalAMcfg struct { + amURL string + headers map[string]string +} + +func (cfg *externalAMcfg) SHA256() string { + return asSHA256([]string{cfg.headerString(), cfg.amURL}) +} + +// headersString transforms all the headers in a sorted way as a +// single string so it can be used for hashing and comparing. +func (cfg *externalAMcfg) headerString() string { + var result strings.Builder + + headerKeys := make([]string, 0, len(cfg.headers)) + for key := range cfg.headers { + headerKeys = append(headerKeys, key) + } + + sort.Strings(headerKeys) + + for _, key := range headerKeys { + result.WriteString(fmt.Sprintf("%s:%s", key, cfg.headers[key])) + } + + return result.String() +} + func NewExternalAlertmanagerSender() *ExternalAlertmanager { l := log.New("ngalert.sender.external-alertmanager") sdCtx, sdCancel := context.WithCancel(context.Background()) @@ -49,10 +76,10 @@ func NewExternalAlertmanagerSender() *ExternalAlertmanager { sdCancel: sdCancel, } - s.manager = notifier.NewManager( + s.manager = NewManager( // Injecting a new registry here means these metrics are not exported. // Once we fix the individual Alertmanager metrics we should fix this scenario too. - ¬ifier.Options{QueueCapacity: defaultMaxQueueCapacity, Registerer: prometheus.NewRegistry()}, + &Options{QueueCapacity: defaultMaxQueueCapacity, Registerer: prometheus.NewRegistry()}, s.logger, ) @@ -62,8 +89,8 @@ func NewExternalAlertmanagerSender() *ExternalAlertmanager { } // ApplyConfig syncs a configuration with the sender. -func (s *ExternalAlertmanager) ApplyConfig(orgId, id int64, alertmanagers []string) error { - notifierCfg, err := buildNotifierConfig(alertmanagers) +func (s *ExternalAlertmanager) ApplyConfig(orgId, id int64, alertmanagers []externalAMcfg) error { + notifierCfg, headers, err := buildNotifierConfig(alertmanagers) if err != nil { return err } @@ -71,7 +98,7 @@ func (s *ExternalAlertmanager) ApplyConfig(orgId, id int64, alertmanagers []stri s.logger = s.logger.New("org", orgId, "cfg", id) s.logger.Info("Synchronizing config with external Alertmanager group") - if err := s.manager.ApplyConfig(notifierCfg); err != nil { + if err := s.manager.ApplyConfig(notifierCfg, headers); err != nil { return err } @@ -106,7 +133,7 @@ func (s *ExternalAlertmanager) SendAlerts(alerts apimodels.PostableAlerts) { if len(alerts.PostableAlerts) == 0 { return } - as := make([]*notifier.Alert, 0, len(alerts.PostableAlerts)) + as := make([]*Alert, 0, len(alerts.PostableAlerts)) for _, a := range alerts.PostableAlerts { na := s.alertToNotifierAlert(a) as = append(as, na) @@ -133,12 +160,13 @@ func (s *ExternalAlertmanager) DroppedAlertmanagers() []*url.URL { return s.manager.DroppedAlertmanagers() } -func buildNotifierConfig(alertmanagers []string) (*config.Config, error) { +func buildNotifierConfig(alertmanagers []externalAMcfg) (*config.Config, map[string]map[string]string, error) { amConfigs := make([]*config.AlertmanagerConfig, 0, len(alertmanagers)) - for _, amURL := range alertmanagers { - u, err := url.Parse(amURL) + headers := map[string]map[string]string{} + for i, am := range alertmanagers { + u, err := url.Parse(am.amURL) if err != nil { - return nil, err + return nil, nil, err } sdConfig := discovery.Configs{ @@ -157,6 +185,12 @@ func buildNotifierConfig(alertmanagers []string) (*config.Config, error) { ServiceDiscoveryConfigs: sdConfig, } + if am.headers != nil { + // The key has the same format as the AlertmanagerConfigs.ToMap() would generate + // so we can use it later on when working with the alertmanager config map. + headers[fmt.Sprintf("config-%d", i)] = am.headers + } + // Check the URL for basic authentication information first if u.User != nil { amConfig.HTTPClientConfig.BasicAuth = &common_config.BasicAuth{ @@ -176,12 +210,12 @@ func buildNotifierConfig(alertmanagers []string) (*config.Config, error) { }, } - return notifierConfig, nil + return notifierConfig, headers, nil } -func (s *ExternalAlertmanager) alertToNotifierAlert(alert models.PostableAlert) *notifier.Alert { +func (s *ExternalAlertmanager) alertToNotifierAlert(alert models.PostableAlert) *Alert { // Prometheus alertmanager has stricter rules for annotations/labels than grafana's internal alertmanager, so we sanitize invalid keys. - return ¬ifier.Alert{ + return &Alert{ Labels: s.sanitizeLabelSet(alert.Alert.Labels), Annotations: s.sanitizeLabelSet(alert.Annotations), StartsAt: time.Time(alert.StartsAt),