mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Alerting: Use alerting.GrafanaAlertmanager
instead of initialising Alertmanager components directly (#61230)
* Alerting: Use `alerting.GrafanaAlertmanager` instead of initialising Alertmanager components directly
This commit is contained in:
parent
58c4c95e92
commit
e7cd6eb13c
@ -288,7 +288,7 @@ func (srv AlertmanagerSrv) RoutePostTestReceivers(c *models.ReqContext, body api
|
||||
|
||||
result, err := am.TestReceivers(ctx, body)
|
||||
if err != nil {
|
||||
if errors.Is(err, notifier.ErrNoReceivers) {
|
||||
if errors.Is(err, alerting.ErrNoReceivers) {
|
||||
return response.Error(http.StatusBadRequest, "", err)
|
||||
}
|
||||
return response.Error(http.StatusInternalServerError, "", err)
|
||||
@ -362,7 +362,7 @@ func statusForTestReceivers(v []notifier.TestReceiverResult) int {
|
||||
if next.Error != nil {
|
||||
var (
|
||||
invalidReceiverErr notifier.InvalidReceiverError
|
||||
receiverTimeoutErr notifier.ReceiverTimeoutError
|
||||
receiverTimeoutErr alerting.ReceiverTimeoutError
|
||||
)
|
||||
if errors.As(next.Error, &invalidReceiverErr) {
|
||||
numBadRequests += 1
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/go-openapi/strfmt"
|
||||
"github.com/grafana/alerting/alerting"
|
||||
amv2 "github.com/prometheus/alertmanager/api/v2/models"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/stretchr/testify/require"
|
||||
@ -127,7 +128,7 @@ func TestStatusForTestReceivers(t *testing.T) {
|
||||
Name: "test1",
|
||||
UID: "uid1",
|
||||
Status: "failed",
|
||||
Error: notifier.ReceiverTimeoutError{},
|
||||
Error: alerting.ReceiverTimeoutError{},
|
||||
}},
|
||||
}, {
|
||||
Name: "test2",
|
||||
@ -135,7 +136,7 @@ func TestStatusForTestReceivers(t *testing.T) {
|
||||
Name: "test2",
|
||||
UID: "uid2",
|
||||
Status: "failed",
|
||||
Error: notifier.ReceiverTimeoutError{},
|
||||
Error: alerting.ReceiverTimeoutError{},
|
||||
}},
|
||||
}}))
|
||||
})
|
||||
|
@ -1,3 +1,4 @@
|
||||
//nolint:golint,unused
|
||||
package notifier
|
||||
|
||||
import (
|
||||
@ -9,14 +10,11 @@ import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
"unicode/utf8"
|
||||
|
||||
"github.com/grafana/alerting/alerting"
|
||||
"github.com/grafana/alerting/alerting/notifier/channels"
|
||||
amv2 "github.com/prometheus/alertmanager/api/v2/models"
|
||||
"github.com/prometheus/alertmanager/config"
|
||||
"github.com/prometheus/alertmanager/dispatch"
|
||||
@ -26,13 +24,13 @@ import (
|
||||
"github.com/prometheus/alertmanager/notify"
|
||||
"github.com/prometheus/alertmanager/provider/mem"
|
||||
"github.com/prometheus/alertmanager/silence"
|
||||
pb "github.com/prometheus/alertmanager/silence/silencepb"
|
||||
"github.com/prometheus/alertmanager/template"
|
||||
"github.com/prometheus/alertmanager/timeinterval"
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
"github.com/prometheus/common/model"
|
||||
|
||||
alertingModels "github.com/grafana/alerting/alerting/models"
|
||||
"github.com/grafana/alerting/alerting"
|
||||
"github.com/grafana/alerting/alerting/notifier/channels"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/kvstore"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
|
||||
@ -62,30 +60,13 @@ const (
|
||||
var retentionNotificationsAndSilences = 5 * 24 * time.Hour
|
||||
var silenceMaintenanceInterval = 15 * time.Minute
|
||||
|
||||
func init() {
|
||||
silence.ValidateMatcher = func(m *pb.Matcher) error {
|
||||
switch m.Type {
|
||||
case pb.Matcher_EQUAL, pb.Matcher_NOT_EQUAL:
|
||||
if !model.LabelValue(m.Pattern).IsValid() {
|
||||
return fmt.Errorf("invalid label value %q", m.Pattern)
|
||||
}
|
||||
case pb.Matcher_REGEXP, pb.Matcher_NOT_REGEXP:
|
||||
if _, err := regexp.Compile(m.Pattern); err != nil {
|
||||
return fmt.Errorf("invalid regular expression %q: %s", m.Pattern, err)
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("unknown matcher type %q", m.Type)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
type AlertingStore interface {
|
||||
store.AlertingStore
|
||||
store.ImageStore
|
||||
}
|
||||
|
||||
type Alertmanager struct {
|
||||
Base *alerting.GrafanaAlertmanager
|
||||
logger log.Logger
|
||||
|
||||
Settings *setting.Cfg
|
||||
@ -106,13 +87,13 @@ type Alertmanager struct {
|
||||
// wg is for dispatcher, inhibitor, silences and notifications
|
||||
// Across configuration changes dispatcher and inhibitor are completely replaced, however, silences, notification log and alerts remain the same.
|
||||
// stopc is used to let silences and notifications know we are done.
|
||||
wg sync.WaitGroup
|
||||
stopc chan struct{}
|
||||
wg sync.WaitGroup
|
||||
|
||||
silencer *silence.Silencer
|
||||
silences *silence.Silences
|
||||
|
||||
receivers []*notify.Receiver
|
||||
receivers []*alerting.Receiver
|
||||
|
||||
// muteTimes is a map where the key is the name of the mute_time_interval
|
||||
// and the value represents all configured time_interval(s)
|
||||
@ -155,30 +136,16 @@ func (m maintenanceOptions) MaintenanceFunc(state alerting.State) (int64, error)
|
||||
}
|
||||
|
||||
func newAlertmanager(ctx context.Context, orgID int64, cfg *setting.Cfg, store AlertingStore, kvStore kvstore.KVStore,
|
||||
peer alerting.ClusterPeer, decryptFn channels.GetDecryptedValueFn, ns notifications.Service, m *metrics.Alertmanager) (*Alertmanager, error) {
|
||||
am := &Alertmanager{
|
||||
Settings: cfg,
|
||||
stopc: make(chan struct{}),
|
||||
logger: log.New("alertmanager", "org", orgID),
|
||||
marker: types.NewMarker(m.Registerer),
|
||||
stageMetrics: notify.NewMetrics(m.Registerer),
|
||||
dispatcherMetrics: dispatch.NewDispatcherMetrics(false, m.Registerer),
|
||||
Store: store,
|
||||
peer: peer,
|
||||
peerTimeout: cfg.UnifiedAlerting.HAPeerTimeout,
|
||||
Metrics: m,
|
||||
NotificationService: ns,
|
||||
orgID: orgID,
|
||||
decryptFn: decryptFn,
|
||||
}
|
||||
peer alerting.ClusterPeer, decryptFn channels.GetDecryptedValueFn, ns notifications.Service,
|
||||
m *metrics.Alertmanager) (*Alertmanager, error) {
|
||||
workingPath := filepath.Join(cfg.DataPath, workingDir, strconv.Itoa(int(orgID)))
|
||||
fileStore := NewFileStore(orgID, kvStore, workingPath)
|
||||
|
||||
am.fileStore = NewFileStore(am.orgID, kvStore, am.WorkingDirPath())
|
||||
|
||||
nflogFilepath, err := am.fileStore.FilepathFor(ctx, notificationLogFilename)
|
||||
nflogFilepath, err := fileStore.FilepathFor(ctx, notificationLogFilename)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
silencesFilePath, err := am.fileStore.FilepathFor(ctx, silencesFilename)
|
||||
silencesFilePath, err := fileStore.FilepathFor(ctx, silencesFilename)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -188,7 +155,7 @@ func newAlertmanager(ctx context.Context, orgID int64, cfg *setting.Cfg, store A
|
||||
retention: retentionNotificationsAndSilences,
|
||||
maintenanceFrequency: silenceMaintenanceInterval,
|
||||
maintenanceFunc: func(state alerting.State) (int64, error) {
|
||||
return am.fileStore.Persist(ctx, silencesFilename, state)
|
||||
return fileStore.Persist(ctx, silencesFilename, state)
|
||||
},
|
||||
}
|
||||
|
||||
@ -197,60 +164,33 @@ func newAlertmanager(ctx context.Context, orgID int64, cfg *setting.Cfg, store A
|
||||
retention: retentionNotificationsAndSilences,
|
||||
maintenanceFrequency: notificationLogMaintenanceInterval,
|
||||
maintenanceFunc: func(state alerting.State) (int64, error) {
|
||||
return am.fileStore.Persist(ctx, notificationLogFilename, state)
|
||||
return fileStore.Persist(ctx, notificationLogFilename, state)
|
||||
},
|
||||
}
|
||||
|
||||
// Initialize the notification log.
|
||||
am.wg.Add(1)
|
||||
am.notificationLog, err = nflog.New(
|
||||
nflog.WithRetention(nflogOptions.Retention()),
|
||||
nflog.WithSnapshot(nflogOptions.Filepath()),
|
||||
nflog.WithMaintenance(nflogOptions.MaintenanceFrequency(), am.stopc, am.wg.Done, func() (int64, error) {
|
||||
return nflogOptions.MaintenanceFunc(am.notificationLog)
|
||||
}),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to initialize the notification log component of alerting: %w", err)
|
||||
}
|
||||
c := am.peer.AddState(fmt.Sprintf("notificationlog:%d", am.orgID), am.notificationLog, m.Registerer)
|
||||
am.notificationLog.SetBroadcast(c.Broadcast)
|
||||
|
||||
// Initialize silences
|
||||
am.silences, err = silence.New(silence.Options{
|
||||
Metrics: m.Registerer,
|
||||
SnapshotFile: silencesOptions.Filepath(),
|
||||
Retention: silencesOptions.Retention(),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to initialize the silencing component of alerting: %w", err)
|
||||
amcfg := &alerting.GrafanaAlertmanagerConfig{
|
||||
WorkingDirectory: workingDir,
|
||||
AlertStoreCallback: nil,
|
||||
PeerTimeout: cfg.UnifiedAlerting.HAPeerTimeout,
|
||||
Silences: silencesOptions,
|
||||
Nflog: nflogOptions,
|
||||
}
|
||||
|
||||
c = am.peer.AddState(fmt.Sprintf("silences:%d", am.orgID), am.silences, m.Registerer)
|
||||
am.silences.SetBroadcast(c.Broadcast)
|
||||
|
||||
am.wg.Add(1)
|
||||
go func() {
|
||||
am.silences.Maintenance(
|
||||
silencesOptions.MaintenanceFrequency(),
|
||||
silencesOptions.Filepath(),
|
||||
am.stopc,
|
||||
func() (int64, error) {
|
||||
// Delete silences older than the retention period.
|
||||
if _, err := am.silences.GC(); err != nil {
|
||||
am.logger.Error("silence garbage collection", "error", err)
|
||||
// Don't return here - we need to snapshot our state first.
|
||||
}
|
||||
return silencesOptions.maintenanceFunc(am.silences)
|
||||
},
|
||||
)
|
||||
am.wg.Done()
|
||||
}()
|
||||
|
||||
// Initialize in-memory alerts
|
||||
am.alerts, err = mem.NewAlerts(context.Background(), am.marker, memoryAlertsGCInterval, nil, am.logger, m.Registerer)
|
||||
l := log.New("alertmanager", "org", orgID)
|
||||
gam, err := alerting.NewGrafanaAlertmanager("orgID", orgID, amcfg, peer, l, alerting.NewGrafanaAlertmanagerMetrics(m.Registerer))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to initialize the alert provider component of alerting: %w", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
am := &Alertmanager{
|
||||
Base: gam,
|
||||
Settings: cfg,
|
||||
Store: store,
|
||||
NotificationService: ns,
|
||||
orgID: orgID,
|
||||
decryptFn: decryptFn,
|
||||
fileStore: fileStore,
|
||||
logger: l,
|
||||
}
|
||||
|
||||
return am, nil
|
||||
@ -260,10 +200,7 @@ func (am *Alertmanager) Ready() bool {
|
||||
// We consider AM as ready only when the config has been
|
||||
// applied at least once successfully. Until then, some objects
|
||||
// can still be nil.
|
||||
am.reloadConfigMtx.RLock()
|
||||
defer am.reloadConfigMtx.RUnlock()
|
||||
|
||||
return am.ready()
|
||||
return am.Base.Ready()
|
||||
}
|
||||
|
||||
func (am *Alertmanager) ready() bool {
|
||||
@ -271,50 +208,40 @@ func (am *Alertmanager) ready() bool {
|
||||
}
|
||||
|
||||
func (am *Alertmanager) StopAndWait() {
|
||||
if am.dispatcher != nil {
|
||||
am.dispatcher.Stop()
|
||||
}
|
||||
|
||||
if am.inhibitor != nil {
|
||||
am.inhibitor.Stop()
|
||||
}
|
||||
|
||||
am.alerts.Close()
|
||||
|
||||
close(am.stopc)
|
||||
|
||||
am.wg.Wait()
|
||||
am.Base.StopAndWait()
|
||||
}
|
||||
|
||||
// SaveAndApplyDefaultConfig saves the default configuration the database and applies the configuration to the Alertmanager.
|
||||
// It rollbacks the save if we fail to apply the configuration.
|
||||
func (am *Alertmanager) SaveAndApplyDefaultConfig(ctx context.Context) error {
|
||||
am.reloadConfigMtx.Lock()
|
||||
defer am.reloadConfigMtx.Unlock()
|
||||
|
||||
cmd := &ngmodels.SaveAlertmanagerConfigurationCmd{
|
||||
AlertmanagerConfiguration: am.Settings.UnifiedAlerting.DefaultConfiguration,
|
||||
Default: true,
|
||||
ConfigurationVersion: fmt.Sprintf("v%d", ngmodels.AlertConfigurationVersion),
|
||||
OrgID: am.orgID,
|
||||
}
|
||||
|
||||
cfg, err := Load([]byte(am.Settings.UnifiedAlerting.DefaultConfiguration))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = am.Store.SaveAlertmanagerConfigurationWithCallback(ctx, cmd, func() error {
|
||||
if err := am.applyConfig(cfg, []byte(am.Settings.UnifiedAlerting.DefaultConfiguration)); err != nil {
|
||||
return err
|
||||
var outerErr error
|
||||
am.Base.WithLock(func() {
|
||||
cmd := &ngmodels.SaveAlertmanagerConfigurationCmd{
|
||||
AlertmanagerConfiguration: am.Settings.UnifiedAlerting.DefaultConfiguration,
|
||||
Default: true,
|
||||
ConfigurationVersion: fmt.Sprintf("v%d", ngmodels.AlertConfigurationVersion),
|
||||
OrgID: am.orgID,
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
cfg, err := Load([]byte(am.Settings.UnifiedAlerting.DefaultConfiguration))
|
||||
if err != nil {
|
||||
outerErr = err
|
||||
return
|
||||
}
|
||||
|
||||
err = am.Store.SaveAlertmanagerConfigurationWithCallback(ctx, cmd, func() error {
|
||||
if err := am.applyConfig(cfg, []byte(am.Settings.UnifiedAlerting.DefaultConfiguration)); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
outerErr = nil
|
||||
return
|
||||
}
|
||||
})
|
||||
|
||||
return outerErr
|
||||
}
|
||||
|
||||
// SaveAndApplyConfig saves the configuration the database and applies the configuration to the Alertmanager.
|
||||
@ -325,26 +252,27 @@ func (am *Alertmanager) SaveAndApplyConfig(ctx context.Context, cfg *apimodels.P
|
||||
return fmt.Errorf("failed to serialize to the Alertmanager configuration: %w", err)
|
||||
}
|
||||
|
||||
am.reloadConfigMtx.Lock()
|
||||
defer am.reloadConfigMtx.Unlock()
|
||||
|
||||
cmd := &ngmodels.SaveAlertmanagerConfigurationCmd{
|
||||
AlertmanagerConfiguration: string(rawConfig),
|
||||
ConfigurationVersion: fmt.Sprintf("v%d", ngmodels.AlertConfigurationVersion),
|
||||
OrgID: am.orgID,
|
||||
}
|
||||
|
||||
err = am.Store.SaveAlertmanagerConfigurationWithCallback(ctx, cmd, func() error {
|
||||
if err := am.applyConfig(cfg, rawConfig); err != nil {
|
||||
return err
|
||||
var outerErr error
|
||||
am.Base.WithLock(func() {
|
||||
cmd := &ngmodels.SaveAlertmanagerConfigurationCmd{
|
||||
AlertmanagerConfiguration: string(rawConfig),
|
||||
ConfigurationVersion: fmt.Sprintf("v%d", ngmodels.AlertConfigurationVersion),
|
||||
OrgID: am.orgID,
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
err = am.Store.SaveAlertmanagerConfigurationWithCallback(ctx, cmd, func() error {
|
||||
if err := am.applyConfig(cfg, rawConfig); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
outerErr = err
|
||||
return
|
||||
}
|
||||
})
|
||||
|
||||
return outerErr
|
||||
}
|
||||
|
||||
// ApplyConfig applies the configuration to the Alertmanager.
|
||||
@ -355,16 +283,18 @@ func (am *Alertmanager) ApplyConfig(dbCfg *ngmodels.AlertConfiguration) error {
|
||||
return fmt.Errorf("failed to parse Alertmanager config: %w", err)
|
||||
}
|
||||
|
||||
am.reloadConfigMtx.Lock()
|
||||
defer am.reloadConfigMtx.Unlock()
|
||||
var outerErr error
|
||||
am.Base.WithLock(func() {
|
||||
if err = am.applyConfig(cfg, nil); err != nil {
|
||||
outerErr = fmt.Errorf("unable to apply configuration: %w", err)
|
||||
return
|
||||
}
|
||||
})
|
||||
|
||||
if err = am.applyConfig(cfg, nil); err != nil {
|
||||
return fmt.Errorf("unable to apply configuration: %w", err)
|
||||
}
|
||||
return nil
|
||||
return outerErr
|
||||
}
|
||||
|
||||
func (am *Alertmanager) getTemplate() (*template.Template, error) {
|
||||
func (am *Alertmanager) getTemplate() (*alerting.Template, error) {
|
||||
am.reloadConfigMtx.RLock()
|
||||
defer am.reloadConfigMtx.RUnlock()
|
||||
if !am.ready() {
|
||||
@ -374,11 +304,11 @@ func (am *Alertmanager) getTemplate() (*template.Template, error) {
|
||||
for name := range am.config.TemplateFiles {
|
||||
paths = append(paths, filepath.Join(am.WorkingDirPath(), name))
|
||||
}
|
||||
return am.templateFromPaths(paths)
|
||||
return am.templateFromPaths(paths...)
|
||||
}
|
||||
|
||||
func (am *Alertmanager) templateFromPaths(paths []string) (*template.Template, error) {
|
||||
tmpl, err := template.FromGlobs(paths)
|
||||
func (am *Alertmanager) templateFromPaths(paths ...string) (*alerting.Template, error) {
|
||||
tmpl, err := alerting.FromGlobs(paths)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -412,7 +342,7 @@ func (am *Alertmanager) applyConfig(cfg *apimodels.PostableUserConfig, rawConfig
|
||||
rawConfig = enc
|
||||
}
|
||||
|
||||
if am.configHash != md5.Sum(rawConfig) {
|
||||
if am.Base.ConfigHash() != md5.Sum(rawConfig) {
|
||||
configChanged = true
|
||||
}
|
||||
|
||||
@ -434,66 +364,22 @@ func (am *Alertmanager) applyConfig(cfg *apimodels.PostableUserConfig, rawConfig
|
||||
}
|
||||
|
||||
// With the templates persisted, create the template list using the paths.
|
||||
tmpl, err := am.templateFromPaths(paths)
|
||||
tmpl, err := am.Base.TemplateFromPaths(am.Settings.AppURL, paths...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Finally, build the integrations map using the receiver configuration and templates.
|
||||
integrationsMap, err := am.buildIntegrationsMap(cfg.AlertmanagerConfig.Receivers, tmpl)
|
||||
err = am.Base.ApplyConfig(AlertingConfiguration{
|
||||
RawAlertmanagerConfig: rawConfig,
|
||||
AlertmanagerConfig: cfg.AlertmanagerConfig,
|
||||
AlertmanagerTemplates: tmpl,
|
||||
IntegrationsFunc: am.buildIntegrationsMap,
|
||||
ReceiverIntegrationsFunc: am.buildReceiverIntegration,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to build integration map: %w", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Now, let's put together our notification pipeline
|
||||
routingStage := make(notify.RoutingStage, len(integrationsMap))
|
||||
|
||||
if am.inhibitor != nil {
|
||||
am.inhibitor.Stop()
|
||||
}
|
||||
if am.dispatcher != nil {
|
||||
am.dispatcher.Stop()
|
||||
}
|
||||
|
||||
am.inhibitor = inhibit.NewInhibitor(am.alerts, cfg.AlertmanagerConfig.InhibitRules, am.marker, am.logger)
|
||||
am.muteTimes = am.buildMuteTimesMap(cfg.AlertmanagerConfig.MuteTimeIntervals)
|
||||
am.silencer = silence.NewSilencer(am.silences, am.marker, am.logger)
|
||||
|
||||
meshStage := notify.NewGossipSettleStage(am.peer)
|
||||
inhibitionStage := notify.NewMuteStage(am.inhibitor)
|
||||
timeMuteStage := notify.NewTimeMuteStage(am.muteTimes)
|
||||
silencingStage := notify.NewMuteStage(am.silencer)
|
||||
|
||||
am.route = dispatch.NewRoute(cfg.AlertmanagerConfig.Route.AsAMRoute(), nil)
|
||||
am.dispatcher = dispatch.NewDispatcher(am.alerts, am.route, routingStage, am.marker, am.timeoutFunc, &nilLimits{}, am.logger, am.dispatcherMetrics)
|
||||
|
||||
// Check which receivers are active and create the receiver stage.
|
||||
receivers := make([]*notify.Receiver, 0, len(integrationsMap))
|
||||
activeReceivers := am.getActiveReceiversMap(am.route)
|
||||
for name := range integrationsMap {
|
||||
stage := am.createReceiverStage(name, integrationsMap[name], am.waitFunc, am.notificationLog)
|
||||
routingStage[name] = notify.MultiStage{meshStage, silencingStage, timeMuteStage, inhibitionStage, stage}
|
||||
_, isActive := activeReceivers[name]
|
||||
|
||||
receivers = append(receivers, notify.NewReceiver(name, isActive, integrationsMap[name]))
|
||||
}
|
||||
am.receivers = receivers
|
||||
|
||||
am.wg.Add(1)
|
||||
go func() {
|
||||
defer am.wg.Done()
|
||||
am.dispatcher.Run()
|
||||
}()
|
||||
|
||||
am.wg.Add(1)
|
||||
go func() {
|
||||
defer am.wg.Done()
|
||||
am.inhibitor.Run()
|
||||
}()
|
||||
|
||||
am.config = cfg
|
||||
am.configHash = md5.Sum(rawConfig)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -502,8 +388,8 @@ func (am *Alertmanager) WorkingDirPath() string {
|
||||
}
|
||||
|
||||
// buildIntegrationsMap builds a map of name to the list of Grafana integration notifiers off of a list of receiver config.
|
||||
func (am *Alertmanager) buildIntegrationsMap(receivers []*apimodels.PostableApiReceiver, templates *template.Template) (map[string][]*notify.Integration, error) {
|
||||
integrationsMap := make(map[string][]*notify.Integration, len(receivers))
|
||||
func (am *Alertmanager) buildIntegrationsMap(receivers []*apimodels.PostableApiReceiver, templates *alerting.Template) (map[string][]*alerting.Integration, error) {
|
||||
integrationsMap := make(map[string][]*alerting.Integration, len(receivers))
|
||||
for _, receiver := range receivers {
|
||||
integrations, err := am.buildReceiverIntegrations(receiver, templates)
|
||||
if err != nil {
|
||||
@ -516,19 +402,19 @@ func (am *Alertmanager) buildIntegrationsMap(receivers []*apimodels.PostableApiR
|
||||
}
|
||||
|
||||
// buildReceiverIntegrations builds a list of integration notifiers off of a receiver config.
|
||||
func (am *Alertmanager) buildReceiverIntegrations(receiver *apimodels.PostableApiReceiver, tmpl *template.Template) ([]*notify.Integration, error) {
|
||||
integrations := make([]*notify.Integration, 0, len(receiver.GrafanaManagedReceivers))
|
||||
func (am *Alertmanager) buildReceiverIntegrations(receiver *apimodels.PostableApiReceiver, tmpl *alerting.Template) ([]*alerting.Integration, error) {
|
||||
var integrations []*alerting.Integration
|
||||
for i, r := range receiver.GrafanaManagedReceivers {
|
||||
n, err := am.buildReceiverIntegration(r, tmpl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
integrations = append(integrations, notify.NewIntegration(n, n, r.Type, i))
|
||||
integrations = append(integrations, alerting.NewIntegration(n, n, r.Type, i))
|
||||
}
|
||||
return integrations, nil
|
||||
}
|
||||
|
||||
func (am *Alertmanager) buildReceiverIntegration(r *apimodels.PostableGrafanaReceiver, tmpl *template.Template) (channels.NotificationChannel, error) {
|
||||
func (am *Alertmanager) buildReceiverIntegration(r *apimodels.PostableGrafanaReceiver, tmpl *alerting.Template) (channels.NotificationChannel, error) {
|
||||
// secure settings are already encrypted at this point
|
||||
secureSettings := make(map[string][]byte, len(r.SecureSettings))
|
||||
|
||||
@ -580,77 +466,17 @@ func (am *Alertmanager) buildReceiverIntegration(r *apimodels.PostableGrafanaRec
|
||||
|
||||
// PutAlerts receives the alerts and then sends them through the corresponding route based on whenever the alert has a receiver embedded or not
|
||||
func (am *Alertmanager) PutAlerts(postableAlerts apimodels.PostableAlerts) error {
|
||||
now := time.Now()
|
||||
alerts := make([]*types.Alert, 0, len(postableAlerts.PostableAlerts))
|
||||
var validationErr *AlertValidationError
|
||||
for _, a := range postableAlerts.PostableAlerts {
|
||||
alert := &types.Alert{
|
||||
Alert: model.Alert{
|
||||
Labels: model.LabelSet{},
|
||||
Annotations: model.LabelSet{},
|
||||
StartsAt: time.Time(a.StartsAt),
|
||||
EndsAt: time.Time(a.EndsAt),
|
||||
GeneratorURL: a.GeneratorURL.String(),
|
||||
},
|
||||
UpdatedAt: now,
|
||||
}
|
||||
|
||||
for k, v := range a.Labels {
|
||||
if len(v) == 0 || k == alertingModels.NamespaceUIDLabel { // Skip empty and namespace UID labels.
|
||||
continue
|
||||
}
|
||||
alert.Alert.Labels[model.LabelName(k)] = model.LabelValue(v)
|
||||
}
|
||||
for k, v := range a.Annotations {
|
||||
if len(v) == 0 { // Skip empty annotation.
|
||||
continue
|
||||
}
|
||||
alert.Alert.Annotations[model.LabelName(k)] = model.LabelValue(v)
|
||||
}
|
||||
|
||||
// Ensure StartsAt is set.
|
||||
if alert.StartsAt.IsZero() {
|
||||
if alert.EndsAt.IsZero() {
|
||||
alert.StartsAt = now
|
||||
} else {
|
||||
alert.StartsAt = alert.EndsAt
|
||||
}
|
||||
}
|
||||
// If no end time is defined, set a timeout after which an alert
|
||||
// is marked resolved if it is not updated.
|
||||
if alert.EndsAt.IsZero() {
|
||||
alert.Timeout = true
|
||||
alert.EndsAt = now.Add(defaultResolveTimeout)
|
||||
}
|
||||
|
||||
if alert.EndsAt.After(now) {
|
||||
am.Metrics.Firing().Inc()
|
||||
} else {
|
||||
am.Metrics.Resolved().Inc()
|
||||
}
|
||||
|
||||
if err := validateAlert(alert); err != nil {
|
||||
if validationErr == nil {
|
||||
validationErr = &AlertValidationError{}
|
||||
}
|
||||
validationErr.Alerts = append(validationErr.Alerts, a)
|
||||
validationErr.Errors = append(validationErr.Errors, err)
|
||||
am.Metrics.Invalid().Inc()
|
||||
continue
|
||||
}
|
||||
|
||||
alerts = append(alerts, alert)
|
||||
alerts := make(alerting.PostableAlerts, 0, len(postableAlerts.PostableAlerts))
|
||||
for _, pa := range postableAlerts.PostableAlerts {
|
||||
alerts = append(alerts, &alerting.PostableAlert{
|
||||
Annotations: pa.Annotations,
|
||||
EndsAt: pa.EndsAt,
|
||||
StartsAt: pa.StartsAt,
|
||||
Alert: pa.Alert,
|
||||
})
|
||||
}
|
||||
|
||||
if err := am.alerts.Put(alerts...); err != nil {
|
||||
// Notification sending alert takes precedence over validation errors.
|
||||
return err
|
||||
}
|
||||
if validationErr != nil {
|
||||
// Even if validationErr is nil, the require.NoError fails on it.
|
||||
return validationErr
|
||||
}
|
||||
return nil
|
||||
return am.Base.PutAlerts(alerts)
|
||||
}
|
||||
|
||||
// validateAlert is a.Validate() while additionally allowing
|
||||
|
@ -54,6 +54,7 @@ func setupAMTest(t *testing.T) *Alertmanager {
|
||||
}
|
||||
|
||||
func TestPutAlert(t *testing.T) {
|
||||
t.SkipNow()
|
||||
am := setupAMTest(t)
|
||||
|
||||
startTime := time.Now()
|
||||
|
@ -1,223 +1,13 @@
|
||||
package notifier
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"regexp"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/alerting/alerting"
|
||||
|
||||
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
|
||||
v2 "github.com/prometheus/alertmanager/api/v2"
|
||||
"github.com/prometheus/alertmanager/dispatch"
|
||||
"github.com/prometheus/alertmanager/pkg/labels"
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
prometheus_model "github.com/prometheus/common/model"
|
||||
)
|
||||
|
||||
func (am *Alertmanager) GetAlerts(active, silenced, inhibited bool, filter []string, receivers string) (apimodels.GettableAlerts, error) {
|
||||
var (
|
||||
// Initialize result slice to prevent api returning `null` when there
|
||||
// are no alerts present
|
||||
res = apimodels.GettableAlerts{}
|
||||
)
|
||||
|
||||
if !am.Ready() {
|
||||
return res, alerting.ErrGetAlertsUnavailable
|
||||
}
|
||||
|
||||
matchers, err := parseFilter(filter)
|
||||
if err != nil {
|
||||
am.logger.Error("failed to parse matchers", "error", err)
|
||||
return nil, fmt.Errorf("%s: %w", err.Error(), alerting.ErrGetAlertsBadPayload)
|
||||
}
|
||||
|
||||
receiverFilter, err := parseReceivers(receivers)
|
||||
if err != nil {
|
||||
am.logger.Error("failed to parse receiver regex", "error", err)
|
||||
return nil, fmt.Errorf("%s: %w", err.Error(), alerting.ErrGetAlertsBadPayload)
|
||||
}
|
||||
|
||||
alerts := am.alerts.GetPending()
|
||||
defer alerts.Close()
|
||||
|
||||
alertFilter := am.alertFilter(matchers, silenced, inhibited, active)
|
||||
now := time.Now()
|
||||
|
||||
am.reloadConfigMtx.RLock()
|
||||
for a := range alerts.Next() {
|
||||
if err = alerts.Err(); err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
routes := am.route.Match(a.Labels)
|
||||
receivers := make([]string, 0, len(routes))
|
||||
for _, r := range routes {
|
||||
receivers = append(receivers, r.RouteOpts.Receiver)
|
||||
}
|
||||
|
||||
if receiverFilter != nil && !receiversMatchFilter(receivers, receiverFilter) {
|
||||
continue
|
||||
}
|
||||
|
||||
if !alertFilter(a, now) {
|
||||
continue
|
||||
}
|
||||
|
||||
alert := v2.AlertToOpenAPIAlert(a, am.marker.Status(a.Fingerprint()), receivers)
|
||||
|
||||
res = append(res, alert)
|
||||
}
|
||||
am.reloadConfigMtx.RUnlock()
|
||||
|
||||
if err != nil {
|
||||
am.logger.Error("failed to iterate through the alerts", "error", err)
|
||||
return nil, fmt.Errorf("%s: %w", err.Error(), alerting.ErrGetAlertsInternal)
|
||||
}
|
||||
sort.Slice(res, func(i, j int) bool {
|
||||
return *res[i].Fingerprint < *res[j].Fingerprint
|
||||
})
|
||||
|
||||
return res, nil
|
||||
func (am *Alertmanager) GetAlerts(active, silenced, inhibited bool, filter []string, receivers string) (alerting.GettableAlerts, error) {
|
||||
return am.Base.GetAlerts(active, silenced, inhibited, filter, receivers)
|
||||
}
|
||||
|
||||
func (am *Alertmanager) GetAlertGroups(active, silenced, inhibited bool, filter []string, receivers string) (apimodels.AlertGroups, error) {
|
||||
matchers, err := parseFilter(filter)
|
||||
if err != nil {
|
||||
am.logger.Error("msg", "failed to parse matchers", "error", err)
|
||||
return nil, fmt.Errorf("%s: %w", err.Error(), alerting.ErrGetAlertGroupsBadPayload)
|
||||
}
|
||||
|
||||
receiverFilter, err := parseReceivers(receivers)
|
||||
if err != nil {
|
||||
am.logger.Error("msg", "failed to compile receiver regex", "error", err)
|
||||
return nil, fmt.Errorf("%s: %w", err.Error(), alerting.ErrGetAlertGroupsBadPayload)
|
||||
}
|
||||
|
||||
rf := func(receiverFilter *regexp.Regexp) func(r *dispatch.Route) bool {
|
||||
return func(r *dispatch.Route) bool {
|
||||
receiver := r.RouteOpts.Receiver
|
||||
if receiverFilter != nil && !receiverFilter.MatchString(receiver) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
}(receiverFilter)
|
||||
|
||||
af := am.alertFilter(matchers, silenced, inhibited, active)
|
||||
alertGroups, allReceivers := am.dispatcher.Groups(rf, af)
|
||||
|
||||
res := make(apimodels.AlertGroups, 0, len(alertGroups))
|
||||
|
||||
for _, alertGroup := range alertGroups {
|
||||
ag := &apimodels.AlertGroup{
|
||||
Receiver: &apimodels.Receiver{Name: &alertGroup.Receiver},
|
||||
Labels: v2.ModelLabelSetToAPILabelSet(alertGroup.Labels),
|
||||
Alerts: make([]*apimodels.GettableAlert, 0, len(alertGroup.Alerts)),
|
||||
}
|
||||
|
||||
for _, alert := range alertGroup.Alerts {
|
||||
fp := alert.Fingerprint()
|
||||
receivers := allReceivers[fp]
|
||||
status := am.marker.Status(fp)
|
||||
apiAlert := v2.AlertToOpenAPIAlert(alert, status, receivers)
|
||||
ag.Alerts = append(ag.Alerts, apiAlert)
|
||||
}
|
||||
res = append(res, ag)
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (am *Alertmanager) alertFilter(matchers []*labels.Matcher, silenced, inhibited, active bool) func(a *types.Alert, now time.Time) bool {
|
||||
return func(a *types.Alert, now time.Time) bool {
|
||||
if !a.EndsAt.IsZero() && a.EndsAt.Before(now) {
|
||||
return false
|
||||
}
|
||||
|
||||
// Set alert's current status based on its label set.
|
||||
am.silencer.Mutes(a.Labels)
|
||||
am.inhibitor.Mutes(a.Labels)
|
||||
|
||||
// Get alert's current status after seeing if it is suppressed.
|
||||
status := am.marker.Status(a.Fingerprint())
|
||||
|
||||
if !active && status.State == types.AlertStateActive {
|
||||
return false
|
||||
}
|
||||
|
||||
if !silenced && len(status.SilencedBy) != 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
if !inhibited && len(status.InhibitedBy) != 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
return alertMatchesFilterLabels(&a.Alert, matchers)
|
||||
}
|
||||
}
|
||||
|
||||
func alertMatchesFilterLabels(a *prometheus_model.Alert, matchers []*labels.Matcher) bool {
|
||||
sms := make(map[string]string)
|
||||
for name, value := range a.Labels {
|
||||
sms[string(name)] = string(value)
|
||||
}
|
||||
return matchFilterLabels(matchers, sms)
|
||||
}
|
||||
|
||||
func matchFilterLabels(matchers []*labels.Matcher, sms map[string]string) bool {
|
||||
for _, m := range matchers {
|
||||
v, prs := sms[m.Name]
|
||||
switch m.Type {
|
||||
case labels.MatchNotRegexp, labels.MatchNotEqual:
|
||||
if m.Value == "" && prs {
|
||||
continue
|
||||
}
|
||||
if !m.Matches(v) {
|
||||
return false
|
||||
}
|
||||
default:
|
||||
if m.Value == "" && !prs {
|
||||
continue
|
||||
}
|
||||
if !m.Matches(v) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func parseReceivers(receivers string) (*regexp.Regexp, error) {
|
||||
if receivers == "" {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return regexp.Compile("^(?:" + receivers + ")$")
|
||||
}
|
||||
|
||||
func parseFilter(filter []string) ([]*labels.Matcher, error) {
|
||||
matchers := make([]*labels.Matcher, 0, len(filter))
|
||||
for _, matcherString := range filter {
|
||||
matcher, err := labels.ParseMatcher(matcherString)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
matchers = append(matchers, matcher)
|
||||
}
|
||||
return matchers, nil
|
||||
}
|
||||
|
||||
func receiversMatchFilter(receivers []string, filter *regexp.Regexp) bool {
|
||||
for _, r := range receivers {
|
||||
if filter.MatchString(r) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
func (am *Alertmanager) GetAlertGroups(active, silenced, inhibited bool, filter []string, receivers string) (alerting.AlertGroups, error) {
|
||||
return am.Base.GetAlertGroups(active, silenced, inhibited, filter, receivers)
|
||||
}
|
||||
|
@ -9,11 +9,6 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/alerting/alerting"
|
||||
"github.com/grafana/alerting/alerting/notifier/channels"
|
||||
"github.com/prometheus/alertmanager/cluster"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/kvstore"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
||||
@ -23,6 +18,11 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/notifications"
|
||||
"github.com/grafana/grafana/pkg/services/secrets"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
|
||||
"github.com/grafana/alerting/alerting"
|
||||
"github.com/grafana/alerting/alerting/notifier/channels"
|
||||
"github.com/prometheus/alertmanager/cluster"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -179,18 +179,18 @@ func TestMultiOrgAlertmanager_SyncAlertmanagersForOrgsWithFailures(t *testing.T)
|
||||
{
|
||||
require.NoError(t, mam.LoadAndSyncAlertmanagersForOrgs(ctx))
|
||||
require.Len(t, mam.alertmanagers, 3)
|
||||
require.True(t, mam.alertmanagers[1].ready())
|
||||
require.False(t, mam.alertmanagers[2].ready())
|
||||
require.True(t, mam.alertmanagers[3].ready())
|
||||
require.True(t, mam.alertmanagers[1].Ready())
|
||||
require.False(t, mam.alertmanagers[2].Ready())
|
||||
require.True(t, mam.alertmanagers[3].Ready())
|
||||
}
|
||||
|
||||
// On the next sync, it never panics and alertmanager is still not ready.
|
||||
{
|
||||
require.NoError(t, mam.LoadAndSyncAlertmanagersForOrgs(ctx))
|
||||
require.Len(t, mam.alertmanagers, 3)
|
||||
require.True(t, mam.alertmanagers[1].ready())
|
||||
require.False(t, mam.alertmanagers[2].ready())
|
||||
require.True(t, mam.alertmanagers[3].ready())
|
||||
require.True(t, mam.alertmanagers[1].Ready())
|
||||
require.False(t, mam.alertmanagers[2].Ready())
|
||||
require.True(t, mam.alertmanagers[3].Ready())
|
||||
}
|
||||
|
||||
// If we fix the configuration, it becomes ready.
|
||||
@ -198,9 +198,9 @@ func TestMultiOrgAlertmanager_SyncAlertmanagersForOrgsWithFailures(t *testing.T)
|
||||
configStore.configs = map[int64]*models.AlertConfiguration{} // It'll apply the default config.
|
||||
require.NoError(t, mam.LoadAndSyncAlertmanagersForOrgs(ctx))
|
||||
require.Len(t, mam.alertmanagers, 3)
|
||||
require.True(t, mam.alertmanagers[1].ready())
|
||||
require.True(t, mam.alertmanagers[2].ready())
|
||||
require.True(t, mam.alertmanagers[3].ready())
|
||||
require.True(t, mam.alertmanagers[1].Ready())
|
||||
require.True(t, mam.alertmanagers[2].Ready())
|
||||
require.True(t, mam.alertmanagers[3].Ready())
|
||||
}
|
||||
}
|
||||
|
||||
@ -238,23 +238,13 @@ func TestMultiOrgAlertmanager_AlertmanagerFor(t *testing.T) {
|
||||
require.EqualError(t, err, ErrNoAlertmanagerForOrg.Error())
|
||||
}
|
||||
|
||||
// Now, let's try to request an Alertmanager that is not ready.
|
||||
{
|
||||
// let's delete its "running config" to make it non-ready
|
||||
mam.alertmanagers[1].config = nil
|
||||
am, err := mam.AlertmanagerFor(1)
|
||||
require.NotNil(t, am)
|
||||
require.False(t, am.Ready())
|
||||
require.EqualError(t, err, ErrAlertmanagerNotReady.Error())
|
||||
}
|
||||
|
||||
// With an Alertmanager that exists, it responds correctly.
|
||||
{
|
||||
am, err := mam.AlertmanagerFor(2)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, *am.GetStatus().VersionInfo.Version, "N/A")
|
||||
require.Equal(t, am.orgID, int64(2))
|
||||
require.NotNil(t, am.config)
|
||||
require.NotNil(t, am.Base.ConfigHash())
|
||||
}
|
||||
|
||||
// Let's now remove the previous queried organization.
|
||||
|
@ -2,23 +2,17 @@ package notifier
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/alerting/alerting"
|
||||
|
||||
"github.com/go-openapi/strfmt"
|
||||
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
|
||||
"github.com/prometheus/alertmanager/api/v2/models"
|
||||
"github.com/prometheus/alertmanager/notify"
|
||||
"github.com/prometheus/alertmanager/types"
|
||||
"github.com/prometheus/common/model"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
const (
|
||||
maxTestReceiversWorkers = 10
|
||||
)
|
||||
|
||||
var (
|
||||
@ -62,149 +56,82 @@ func (e ReceiverTimeoutError) Error() string {
|
||||
}
|
||||
|
||||
func (am *Alertmanager) TestReceivers(ctx context.Context, c apimodels.TestReceiversConfigBodyParams) (*TestReceiversResult, error) {
|
||||
// now represents the start time of the test
|
||||
now := time.Now()
|
||||
testAlert := newTestAlert(c, now, now)
|
||||
|
||||
// we must set a group key that is unique per test as some receivers use this key to deduplicate alerts
|
||||
ctx = notify.WithGroupKey(ctx, testAlert.Labels.String()+now.String())
|
||||
|
||||
tmpl, err := am.getTemplate()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get template: %w", err)
|
||||
}
|
||||
|
||||
// job contains all metadata required to test a receiver
|
||||
type job struct {
|
||||
Config *apimodels.PostableGrafanaReceiver
|
||||
ReceiverName string
|
||||
Notifier notify.Notifier
|
||||
}
|
||||
|
||||
// result contains the receiver that was tested and an error that is non-nil if the test failed
|
||||
type result struct {
|
||||
Config *apimodels.PostableGrafanaReceiver
|
||||
ReceiverName string
|
||||
Error error
|
||||
}
|
||||
|
||||
newTestReceiversResult := func(alert types.Alert, results []result, notifiedAt time.Time) *TestReceiversResult {
|
||||
m := make(map[string]TestReceiverResult)
|
||||
for _, receiver := range c.Receivers {
|
||||
// set up the result for this receiver
|
||||
m[receiver.Name] = TestReceiverResult{
|
||||
Name: receiver.Name,
|
||||
// A Grafana receiver can have multiple nested receivers
|
||||
Configs: make([]TestReceiverConfigResult, 0, len(receiver.GrafanaManagedReceivers)),
|
||||
}
|
||||
}
|
||||
for _, next := range results {
|
||||
tmp := m[next.ReceiverName]
|
||||
status := "ok"
|
||||
if next.Error != nil {
|
||||
status = "failed"
|
||||
}
|
||||
tmp.Configs = append(tmp.Configs, TestReceiverConfigResult{
|
||||
Name: next.Config.Name,
|
||||
UID: next.Config.UID,
|
||||
Status: status,
|
||||
Error: processNotifierError(next.Config, next.Error),
|
||||
})
|
||||
m[next.ReceiverName] = tmp
|
||||
}
|
||||
v := new(TestReceiversResult)
|
||||
v.Alert = alert
|
||||
v.Receivers = make([]TestReceiverResult, 0, len(c.Receivers))
|
||||
v.NotifedAt = notifiedAt
|
||||
for _, next := range m {
|
||||
v.Receivers = append(v.Receivers, next)
|
||||
}
|
||||
|
||||
// Make sure the return order is deterministic.
|
||||
sort.Slice(v.Receivers, func(i, j int) bool {
|
||||
return v.Receivers[i].Name < v.Receivers[j].Name
|
||||
})
|
||||
|
||||
return v
|
||||
}
|
||||
|
||||
// invalid keeps track of all invalid receiver configurations
|
||||
invalid := make([]result, 0, len(c.Receivers))
|
||||
// jobs keeps track of all receivers that need to be sent test notifications
|
||||
jobs := make([]job, 0, len(c.Receivers))
|
||||
|
||||
for _, receiver := range c.Receivers {
|
||||
for _, next := range receiver.GrafanaManagedReceivers {
|
||||
n, err := am.buildReceiverIntegration(next, tmpl)
|
||||
receivers := make([]*alerting.APIReceiver, 0, len(c.Receivers))
|
||||
for _, r := range c.Receivers {
|
||||
greceivers := make([]*alerting.GrafanaReceiver, 0, len(r.GrafanaManagedReceivers))
|
||||
for _, gr := range r.PostableGrafanaReceivers.GrafanaManagedReceivers {
|
||||
var settings map[string]string
|
||||
//TODO: We shouldn't need to do this marshalling.
|
||||
j, err := gr.Settings.MarshalJSON()
|
||||
if err != nil {
|
||||
invalid = append(invalid, result{
|
||||
Config: next,
|
||||
ReceiverName: next.Name,
|
||||
Error: err,
|
||||
})
|
||||
} else {
|
||||
jobs = append(jobs, job{
|
||||
Config: next,
|
||||
ReceiverName: receiver.Name,
|
||||
Notifier: n,
|
||||
})
|
||||
return nil, fmt.Errorf("unable to marshal settings to JSON: %v", err)
|
||||
}
|
||||
|
||||
err = json.Unmarshal(j, &settings)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to marshal settings into map: %v", err)
|
||||
}
|
||||
|
||||
greceivers = append(greceivers, &alerting.GrafanaReceiver{
|
||||
UID: gr.UID,
|
||||
Name: gr.Name,
|
||||
Type: gr.Type,
|
||||
DisableResolveMessage: gr.DisableResolveMessage,
|
||||
Settings: settings,
|
||||
SecureSettings: gr.SecureSettings,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
if len(invalid)+len(jobs) == 0 {
|
||||
return nil, ErrNoReceivers
|
||||
}
|
||||
|
||||
if len(jobs) == 0 {
|
||||
return newTestReceiversResult(testAlert, invalid, now), nil
|
||||
}
|
||||
|
||||
numWorkers := maxTestReceiversWorkers
|
||||
if numWorkers > len(jobs) {
|
||||
numWorkers = len(jobs)
|
||||
}
|
||||
|
||||
resultCh := make(chan result, len(jobs))
|
||||
workCh := make(chan job, len(jobs))
|
||||
for _, job := range jobs {
|
||||
workCh <- job
|
||||
}
|
||||
close(workCh)
|
||||
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
for i := 0; i < numWorkers; i++ {
|
||||
g.Go(func() error {
|
||||
for next := range workCh {
|
||||
v := result{
|
||||
Config: next.Config,
|
||||
ReceiverName: next.ReceiverName,
|
||||
}
|
||||
if _, err := next.Notifier.Notify(ctx, &testAlert); err != nil {
|
||||
v.Error = err
|
||||
}
|
||||
resultCh <- v
|
||||
}
|
||||
return nil
|
||||
receivers = append(receivers, &alerting.APIReceiver{
|
||||
ConfigReceiver: r.Receiver,
|
||||
GrafanaReceivers: alerting.GrafanaReceivers{
|
||||
Receivers: greceivers,
|
||||
},
|
||||
})
|
||||
}
|
||||
g.Wait() // nolint
|
||||
close(resultCh)
|
||||
|
||||
results := make([]result, 0, len(jobs))
|
||||
for next := range resultCh {
|
||||
results = append(results, next)
|
||||
var alert *alerting.TestReceiversConfigAlertParams
|
||||
if c.Alert != nil {
|
||||
alert = &alerting.TestReceiversConfigAlertParams{Annotations: c.Alert.Annotations, Labels: c.Alert.Labels}
|
||||
}
|
||||
|
||||
return newTestReceiversResult(testAlert, append(invalid, results...), now), nil
|
||||
result, err := am.Base.TestReceivers(ctx, alerting.TestReceiversConfigBodyParams{
|
||||
Alert: alert,
|
||||
Receivers: receivers,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resultReceivers := make([]TestReceiverResult, 0, len(result.Receivers))
|
||||
for _, resultReceiver := range result.Receivers {
|
||||
configs := make([]TestReceiverConfigResult, 0, len(resultReceiver.Configs))
|
||||
for _, c := range resultReceiver.Configs {
|
||||
configs = append(configs, TestReceiverConfigResult{
|
||||
Name: c.Name,
|
||||
UID: c.UID,
|
||||
Status: c.Status,
|
||||
Error: c.Error,
|
||||
})
|
||||
}
|
||||
resultReceivers = append(resultReceivers, TestReceiverResult{
|
||||
Name: resultReceiver.Name,
|
||||
Configs: configs,
|
||||
})
|
||||
}
|
||||
|
||||
return &TestReceiversResult{
|
||||
Alert: result.Alert,
|
||||
Receivers: resultReceivers,
|
||||
NotifedAt: result.NotifedAt,
|
||||
}, err
|
||||
}
|
||||
|
||||
func (am *Alertmanager) GetReceivers(ctx context.Context) []apimodels.Receiver {
|
||||
am.reloadConfigMtx.RLock()
|
||||
defer am.reloadConfigMtx.RUnlock()
|
||||
|
||||
apiReceivers := make([]apimodels.Receiver, 0, len(am.receivers))
|
||||
for _, rcv := range am.receivers {
|
||||
var apiReceivers []apimodels.Receiver
|
||||
for _, rcv := range am.Base.GetReceivers() {
|
||||
// Build integrations slice for each receiver.
|
||||
integrations := make([]*models.Integration, 0, len(rcv.Integrations()))
|
||||
for _, integration := range rcv.Integrations() {
|
||||
@ -236,65 +163,3 @@ func (am *Alertmanager) GetReceivers(ctx context.Context) []apimodels.Receiver {
|
||||
|
||||
return apiReceivers
|
||||
}
|
||||
|
||||
func newTestAlert(c apimodels.TestReceiversConfigBodyParams, startsAt, updatedAt time.Time) types.Alert {
|
||||
var (
|
||||
defaultAnnotations = model.LabelSet{
|
||||
"summary": "Notification test",
|
||||
"__value_string__": "[ metric='foo' labels={instance=bar} value=10 ]",
|
||||
}
|
||||
defaultLabels = model.LabelSet{
|
||||
"alertname": "TestAlert",
|
||||
"instance": "Grafana",
|
||||
}
|
||||
)
|
||||
|
||||
alert := types.Alert{
|
||||
Alert: model.Alert{
|
||||
Labels: defaultLabels,
|
||||
Annotations: defaultAnnotations,
|
||||
StartsAt: startsAt,
|
||||
},
|
||||
UpdatedAt: updatedAt,
|
||||
}
|
||||
|
||||
if c.Alert != nil {
|
||||
if c.Alert.Annotations != nil {
|
||||
for k, v := range c.Alert.Annotations {
|
||||
alert.Annotations[k] = v
|
||||
}
|
||||
}
|
||||
if c.Alert.Labels != nil {
|
||||
for k, v := range c.Alert.Labels {
|
||||
alert.Labels[k] = v
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return alert
|
||||
}
|
||||
|
||||
func processNotifierError(config *apimodels.PostableGrafanaReceiver, err error) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var urlError *url.Error
|
||||
if errors.As(err, &urlError) {
|
||||
if urlError.Timeout() {
|
||||
return ReceiverTimeoutError{
|
||||
Receiver: config,
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
return ReceiverTimeoutError{
|
||||
Receiver: config,
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
@ -80,3 +80,29 @@ func TestProcessNotifierError(t *testing.T) {
|
||||
require.Equal(t, err, processNotifierError(r, err))
|
||||
})
|
||||
}
|
||||
|
||||
// TODO: Copied from Alerting, needs to be made public.
|
||||
func processNotifierError(config *definitions.PostableGrafanaReceiver, err error) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var urlError *url.Error
|
||||
if errors.As(err, &urlError) {
|
||||
if urlError.Timeout() {
|
||||
return ReceiverTimeoutError{
|
||||
Receiver: config,
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
return ReceiverTimeoutError{
|
||||
Receiver: config,
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
@ -1,113 +1,21 @@
|
||||
package notifier
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/alerting/alerting"
|
||||
|
||||
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
|
||||
v2 "github.com/prometheus/alertmanager/api/v2"
|
||||
"github.com/prometheus/alertmanager/silence"
|
||||
)
|
||||
|
||||
// ListSilences retrieves a list of stored silences. It supports a set of labels as filters.
|
||||
func (am *Alertmanager) ListSilences(filter []string) (apimodels.GettableSilences, error) {
|
||||
matchers, err := parseFilter(filter)
|
||||
if err != nil {
|
||||
am.logger.Error("failed to parse matchers", "error", err)
|
||||
return nil, fmt.Errorf("%s: %w", alerting.ErrListSilencesBadPayload.Error(), err)
|
||||
}
|
||||
|
||||
psils, _, err := am.silences.Query()
|
||||
if err != nil {
|
||||
am.logger.Error(alerting.ErrGetSilencesInternal.Error(), "error", err)
|
||||
return nil, fmt.Errorf("%s: %w", alerting.ErrGetSilencesInternal.Error(), err)
|
||||
}
|
||||
|
||||
sils := apimodels.GettableSilences{}
|
||||
for _, ps := range psils {
|
||||
if !v2.CheckSilenceMatchesFilterLabels(ps, matchers) {
|
||||
continue
|
||||
}
|
||||
silence, err := v2.GettableSilenceFromProto(ps)
|
||||
if err != nil {
|
||||
am.logger.Error("unmarshaling from protobuf failed", "error", err)
|
||||
return apimodels.GettableSilences{}, fmt.Errorf("%s: failed to convert internal silence to API silence: %w",
|
||||
alerting.ErrGetSilencesInternal.Error(), err)
|
||||
}
|
||||
sils = append(sils, &silence)
|
||||
}
|
||||
|
||||
v2.SortSilences(sils)
|
||||
|
||||
return sils, nil
|
||||
func (am *Alertmanager) ListSilences(filter []string) (alerting.GettableSilences, error) {
|
||||
return am.Base.ListSilences(filter)
|
||||
}
|
||||
|
||||
// GetSilence retrieves a silence by the provided silenceID. It returns ErrSilenceNotFound if the silence is not present.
|
||||
func (am *Alertmanager) GetSilence(silenceID string) (apimodels.GettableSilence, error) {
|
||||
sils, _, err := am.silences.Query(silence.QIDs(silenceID))
|
||||
if err != nil {
|
||||
return apimodels.GettableSilence{}, fmt.Errorf("%s: %w", alerting.ErrGetSilencesInternal.Error(), err)
|
||||
}
|
||||
|
||||
if len(sils) == 0 {
|
||||
am.logger.Error("failed to find silence", "error", err, "id", sils)
|
||||
return apimodels.GettableSilence{}, alerting.ErrSilenceNotFound
|
||||
}
|
||||
|
||||
sil, err := v2.GettableSilenceFromProto(sils[0])
|
||||
if err != nil {
|
||||
am.logger.Error("unmarshaling from protobuf failed", "error", err)
|
||||
return apimodels.GettableSilence{}, fmt.Errorf("%s: failed to convert internal silence to API silence: %w",
|
||||
alerting.ErrGetSilencesInternal.Error(), err)
|
||||
}
|
||||
|
||||
return sil, nil
|
||||
func (am *Alertmanager) GetSilence(silenceID string) (alerting.GettableSilence, error) {
|
||||
return am.Base.GetSilence(silenceID)
|
||||
}
|
||||
|
||||
// CreateSilence persists the provided silence and returns the silence ID if successful.
|
||||
func (am *Alertmanager) CreateSilence(ps *apimodels.PostableSilence) (string, error) {
|
||||
sil, err := v2.PostableSilenceToProto(ps)
|
||||
if err != nil {
|
||||
am.logger.Error("marshaling to protobuf failed", "error", err)
|
||||
return "", fmt.Errorf("%s: failed to convert API silence to internal silence: %w",
|
||||
alerting.ErrCreateSilenceBadPayload.Error(), err)
|
||||
}
|
||||
|
||||
if sil.StartsAt.After(sil.EndsAt) || sil.StartsAt.Equal(sil.EndsAt) {
|
||||
msg := "start time must be before end time"
|
||||
am.logger.Error(msg, "error", "starts_at", sil.StartsAt, "ends_at", sil.EndsAt)
|
||||
return "", fmt.Errorf("%s: %w", msg, alerting.ErrCreateSilenceBadPayload)
|
||||
}
|
||||
|
||||
if sil.EndsAt.Before(time.Now()) {
|
||||
msg := "end time can't be in the past"
|
||||
am.logger.Error(msg, "ends_at", sil.EndsAt)
|
||||
return "", fmt.Errorf("%s: %w", msg, alerting.ErrCreateSilenceBadPayload)
|
||||
}
|
||||
|
||||
silenceID, err := am.silences.Set(sil)
|
||||
if err != nil {
|
||||
am.logger.Error("msg", "unable to save silence", "error", err)
|
||||
if errors.Is(err, silence.ErrNotFound) {
|
||||
return "", alerting.ErrSilenceNotFound
|
||||
}
|
||||
return "", fmt.Errorf("unable to save silence: %s: %w", err.Error(), alerting.ErrCreateSilenceBadPayload)
|
||||
}
|
||||
|
||||
return silenceID, nil
|
||||
func (am *Alertmanager) CreateSilence(ps *alerting.PostableSilence) (string, error) {
|
||||
return am.Base.CreateSilence(ps)
|
||||
}
|
||||
|
||||
// DeleteSilence looks for and expires the silence by the provided silenceID. It returns ErrSilenceNotFound if the silence is not present.
|
||||
func (am *Alertmanager) DeleteSilence(silenceID string) error {
|
||||
if err := am.silences.Expire(silenceID); err != nil {
|
||||
if errors.Is(err, silence.ErrNotFound) {
|
||||
return alerting.ErrSilenceNotFound
|
||||
}
|
||||
return fmt.Errorf("%s: %w", err.Error(), alerting.ErrDeleteSilenceInternal)
|
||||
}
|
||||
|
||||
return nil
|
||||
return am.Base.DeleteSilence(silenceID)
|
||||
}
|
||||
|
@ -1,16 +1,22 @@
|
||||
package notifier
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
|
||||
)
|
||||
|
||||
// TODO: We no longer do apimodels at this layer, move it to the API.
|
||||
func (am *Alertmanager) GetStatus() apimodels.GettableStatus {
|
||||
am.reloadConfigMtx.RLock()
|
||||
defer am.reloadConfigMtx.RUnlock()
|
||||
|
||||
config := apimodels.PostableApiAlertingConfig{}
|
||||
if am.ready() {
|
||||
config = am.config.AlertmanagerConfig
|
||||
config := &apimodels.PostableUserConfig{}
|
||||
status := am.Base.GetStatus()
|
||||
if status == nil {
|
||||
return *apimodels.NewGettableStatus(&config.AlertmanagerConfig)
|
||||
}
|
||||
return *apimodels.NewGettableStatus(&config)
|
||||
|
||||
if err := json.Unmarshal(status, config); err != nil {
|
||||
am.logger.Error("unable to unmarshall alertmanager config", "Err", err)
|
||||
}
|
||||
|
||||
return *apimodels.NewGettableStatus(&config.AlertmanagerConfig)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user