mirror of
https://github.com/grafana/grafana.git
synced 2025-02-14 09:33:34 -06:00
* Alerting: Refactor & fix unified alerting metrics structure Fixes and refactors the metrics structure we have for the ngalert service. Now, each component has its own metric struct that includes the JUST the metrics it uses. Additionally, I have fixed the configuration metrics and added new metrics to determine if we have discovered and started all the necessary configurations of an instance. This allows us to alert on `grafana_alerting_discovered_configurations - grafana_alerting_active_configurations != 0` to know whether an alertmanager instance did not start successfully.
724 lines
22 KiB
Go
724 lines
22 KiB
Go
package notifier
|
|
|
|
import (
|
|
"context"
|
|
"crypto/md5"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"net/url"
|
|
"path/filepath"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
gokit_log "github.com/go-kit/kit/log"
|
|
amv2 "github.com/prometheus/alertmanager/api/v2/models"
|
|
"github.com/prometheus/alertmanager/dispatch"
|
|
"github.com/prometheus/alertmanager/inhibit"
|
|
"github.com/prometheus/alertmanager/nflog"
|
|
"github.com/prometheus/alertmanager/nflog/nflogpb"
|
|
"github.com/prometheus/alertmanager/notify"
|
|
"github.com/prometheus/alertmanager/provider/mem"
|
|
"github.com/prometheus/alertmanager/silence"
|
|
"github.com/prometheus/alertmanager/template"
|
|
"github.com/prometheus/alertmanager/types"
|
|
"github.com/prometheus/common/model"
|
|
|
|
"github.com/grafana/grafana/pkg/components/securejsondata"
|
|
"github.com/grafana/grafana/pkg/infra/kvstore"
|
|
"github.com/grafana/grafana/pkg/infra/log"
|
|
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
|
|
"github.com/grafana/grafana/pkg/services/ngalert/logging"
|
|
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
|
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
|
|
"github.com/grafana/grafana/pkg/services/ngalert/notifier/channels"
|
|
"github.com/grafana/grafana/pkg/services/ngalert/store"
|
|
"github.com/grafana/grafana/pkg/setting"
|
|
)
|
|
|
|
const (
|
|
notificationLogFilename = "notifications"
|
|
silencesFilename = "silences"
|
|
|
|
workingDir = "alerting"
|
|
// How long should we keep silences and notification entries on-disk after they've served their purpose.
|
|
retentionNotificationsAndSilences = 5 * 24 * time.Hour
|
|
// maintenanceNotificationAndSilences how often should we flush and gargabe collect notifications and silences
|
|
maintenanceNotificationAndSilences = 15 * time.Minute
|
|
// defaultResolveTimeout is the default timeout used for resolving an alert
|
|
// if the end time is not specified.
|
|
defaultResolveTimeout = 5 * time.Minute
|
|
// memoryAlertsGCInterval is the interval at which we'll remove resolved alerts from memory.
|
|
memoryAlertsGCInterval = 30 * time.Minute
|
|
// To start, the alertmanager needs at least one route defined.
|
|
// TODO: we should move this to Grafana settings and define this as the default.
|
|
alertmanagerDefaultConfiguration = `
|
|
{
|
|
"alertmanager_config": {
|
|
"route": {
|
|
"receiver": "grafana-default-email"
|
|
},
|
|
"receivers": [{
|
|
"name": "grafana-default-email",
|
|
"grafana_managed_receiver_configs": [{
|
|
"uid": "",
|
|
"name": "email receiver",
|
|
"type": "email",
|
|
"isDefault": true,
|
|
"settings": {
|
|
"addresses": "<example@email.com>"
|
|
}
|
|
}]
|
|
}]
|
|
}
|
|
}
|
|
`
|
|
)
|
|
|
|
type Alertmanager struct {
|
|
logger log.Logger
|
|
gokitLogger gokit_log.Logger
|
|
|
|
Settings *setting.Cfg
|
|
Store store.AlertingStore
|
|
fileStore *FileStore
|
|
Metrics *metrics.Alertmanager
|
|
|
|
notificationLog *nflog.Log
|
|
marker types.Marker
|
|
alerts *mem.Alerts
|
|
route *dispatch.Route
|
|
|
|
dispatcher *dispatch.Dispatcher
|
|
inhibitor *inhibit.Inhibitor
|
|
// wg is for dispatcher, inhibitor, silences and notifications
|
|
// Across configuration changes dispatcher and inhibitor are completely replaced, however, silences, notification log and alerts remain the same.
|
|
// stopc is used to let silences and notifications know we are done.
|
|
wg sync.WaitGroup
|
|
stopc chan struct{}
|
|
|
|
silencer *silence.Silencer
|
|
silences *silence.Silences
|
|
|
|
stageMetrics *notify.Metrics
|
|
dispatcherMetrics *dispatch.DispatcherMetrics
|
|
|
|
reloadConfigMtx sync.RWMutex
|
|
config *apimodels.PostableUserConfig
|
|
configHash [16]byte
|
|
orgID int64
|
|
}
|
|
|
|
func newAlertmanager(orgID int64, cfg *setting.Cfg, store store.AlertingStore, kvStore kvstore.KVStore, m *metrics.Alertmanager) (*Alertmanager, error) {
|
|
am := &Alertmanager{
|
|
Settings: cfg,
|
|
stopc: make(chan struct{}),
|
|
logger: log.New("alertmanager", "org", orgID),
|
|
marker: types.NewMarker(m.Registerer),
|
|
stageMetrics: notify.NewMetrics(m.Registerer),
|
|
dispatcherMetrics: dispatch.NewDispatcherMetrics(false, m.Registerer),
|
|
Store: store,
|
|
Metrics: m,
|
|
orgID: orgID,
|
|
}
|
|
|
|
am.gokitLogger = gokit_log.NewLogfmtLogger(logging.NewWrapper(am.logger))
|
|
am.fileStore = NewFileStore(am.orgID, kvStore, am.WorkingDirPath())
|
|
|
|
nflogFilepath, err := am.fileStore.FilepathFor(context.TODO(), notificationLogFilename)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
silencesFilePath, err := am.fileStore.FilepathFor(context.TODO(), silencesFilename)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Initialize the notification log
|
|
am.wg.Add(1)
|
|
am.notificationLog, err = nflog.New(
|
|
nflog.WithRetention(retentionNotificationsAndSilences),
|
|
nflog.WithSnapshot(nflogFilepath),
|
|
nflog.WithMaintenance(maintenanceNotificationAndSilences, am.stopc, am.wg.Done, func() (int64, error) {
|
|
return am.fileStore.Persist(context.TODO(), notificationLogFilename, am.notificationLog)
|
|
}),
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to initialize the notification log component of alerting: %w", err)
|
|
}
|
|
// Initialize silences
|
|
am.silences, err = silence.New(silence.Options{
|
|
Metrics: m.Registerer,
|
|
SnapshotFile: silencesFilePath,
|
|
Retention: retentionNotificationsAndSilences,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to initialize the silencing component of alerting: %w", err)
|
|
}
|
|
|
|
am.wg.Add(1)
|
|
go func() {
|
|
am.silences.Maintenance(15*time.Minute, silencesFilePath, am.stopc, func() (int64, error) {
|
|
return am.fileStore.Persist(context.TODO(), silencesFilename, am.silences)
|
|
})
|
|
am.wg.Done()
|
|
}()
|
|
|
|
// Initialize in-memory alerts
|
|
am.alerts, err = mem.NewAlerts(context.Background(), am.marker, memoryAlertsGCInterval, nil, am.gokitLogger)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to initialize the alert provider component of alerting: %w", err)
|
|
}
|
|
|
|
return am, nil
|
|
}
|
|
|
|
func (am *Alertmanager) Ready() bool {
|
|
// We consider AM as ready only when the config has been
|
|
// applied at least once successfully. Until then, some objects
|
|
// can still be nil.
|
|
am.reloadConfigMtx.RLock()
|
|
defer am.reloadConfigMtx.RUnlock()
|
|
|
|
return am.ready()
|
|
}
|
|
|
|
func (am *Alertmanager) ready() bool {
|
|
return am.config != nil
|
|
}
|
|
|
|
func (am *Alertmanager) StopAndWait() {
|
|
if am.dispatcher != nil {
|
|
am.dispatcher.Stop()
|
|
}
|
|
|
|
if am.inhibitor != nil {
|
|
am.inhibitor.Stop()
|
|
}
|
|
|
|
am.alerts.Close()
|
|
|
|
close(am.stopc)
|
|
|
|
am.wg.Wait()
|
|
}
|
|
|
|
// SaveAndApplyDefaultConfig saves the default configuration the database and applies the configuration to the Alertmanager.
|
|
// It rollbacks the save if we fail to apply the configuration.
|
|
func (am *Alertmanager) SaveAndApplyDefaultConfig() error {
|
|
am.reloadConfigMtx.Lock()
|
|
defer am.reloadConfigMtx.Unlock()
|
|
|
|
cmd := &ngmodels.SaveAlertmanagerConfigurationCmd{
|
|
AlertmanagerConfiguration: alertmanagerDefaultConfiguration,
|
|
Default: true,
|
|
ConfigurationVersion: fmt.Sprintf("v%d", ngmodels.AlertConfigurationVersion),
|
|
OrgID: am.orgID,
|
|
}
|
|
|
|
cfg, err := Load([]byte(alertmanagerDefaultConfiguration))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = am.Store.SaveAlertmanagerConfigurationWithCallback(cmd, func() error {
|
|
if err := am.applyConfig(cfg, []byte(alertmanagerDefaultConfiguration)); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// SaveAndApplyConfig saves the configuration the database and applies the configuration to the Alertmanager.
|
|
// It rollbacks the save if we fail to apply the configuration.
|
|
func (am *Alertmanager) SaveAndApplyConfig(cfg *apimodels.PostableUserConfig) error {
|
|
rawConfig, err := json.Marshal(&cfg)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to serialize to the Alertmanager configuration: %w", err)
|
|
}
|
|
|
|
am.reloadConfigMtx.Lock()
|
|
defer am.reloadConfigMtx.Unlock()
|
|
|
|
cmd := &ngmodels.SaveAlertmanagerConfigurationCmd{
|
|
AlertmanagerConfiguration: string(rawConfig),
|
|
ConfigurationVersion: fmt.Sprintf("v%d", ngmodels.AlertConfigurationVersion),
|
|
OrgID: am.orgID,
|
|
}
|
|
|
|
err = am.Store.SaveAlertmanagerConfigurationWithCallback(cmd, func() error {
|
|
if err := am.applyConfig(cfg, rawConfig); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// SyncAndApplyConfigFromDatabase picks the latest config from database and restarts
|
|
// the components with the new config.
|
|
func (am *Alertmanager) SyncAndApplyConfigFromDatabase() error {
|
|
am.reloadConfigMtx.Lock()
|
|
defer am.reloadConfigMtx.Unlock()
|
|
|
|
// First, let's get the configuration we need from the database.
|
|
q := &ngmodels.GetLatestAlertmanagerConfigurationQuery{OrgID: am.orgID}
|
|
if err := am.Store.GetLatestAlertmanagerConfiguration(q); err != nil {
|
|
// If there's no configuration in the database, let's use the default configuration.
|
|
if errors.Is(err, store.ErrNoAlertmanagerConfiguration) {
|
|
// First, let's save it to the database. We don't need to use a transaction here as we'll always succeed.
|
|
am.logger.Info("no Alertmanager configuration found, saving and applying a default")
|
|
savecmd := &ngmodels.SaveAlertmanagerConfigurationCmd{
|
|
AlertmanagerConfiguration: alertmanagerDefaultConfiguration,
|
|
Default: true,
|
|
ConfigurationVersion: fmt.Sprintf("v%d", ngmodels.AlertConfigurationVersion),
|
|
OrgID: am.orgID,
|
|
}
|
|
if err := am.Store.SaveAlertmanagerConfiguration(savecmd); err != nil {
|
|
return err
|
|
}
|
|
|
|
q.Result = &ngmodels.AlertConfiguration{AlertmanagerConfiguration: alertmanagerDefaultConfiguration, Default: true}
|
|
} else {
|
|
return fmt.Errorf("unable to get Alertmanager configuration from the database: %w", err)
|
|
}
|
|
}
|
|
|
|
cfg, err := Load([]byte(q.Result.AlertmanagerConfiguration))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := am.applyConfig(cfg, nil); err != nil {
|
|
return fmt.Errorf("unable to reload configuration: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (am *Alertmanager) getTemplate() (*template.Template, error) {
|
|
am.reloadConfigMtx.RLock()
|
|
defer am.reloadConfigMtx.RUnlock()
|
|
if !am.ready() {
|
|
return nil, errors.New("alertmanager is not initialized")
|
|
}
|
|
paths := make([]string, 0, len(am.config.TemplateFiles))
|
|
for name := range am.config.TemplateFiles {
|
|
paths = append(paths, filepath.Join(am.WorkingDirPath(), name))
|
|
}
|
|
return am.templateFromPaths(paths...)
|
|
}
|
|
|
|
func (am *Alertmanager) templateFromPaths(paths ...string) (*template.Template, error) {
|
|
tmpl, err := template.FromGlobs(paths...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
externalURL, err := url.Parse(am.Settings.AppURL)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tmpl.ExternalURL = externalURL
|
|
return tmpl, nil
|
|
}
|
|
|
|
// applyConfig applies a new configuration by re-initializing all components using the configuration provided.
|
|
// It is not safe to call concurrently.
|
|
func (am *Alertmanager) applyConfig(cfg *apimodels.PostableUserConfig, rawConfig []byte) (err error) {
|
|
// First, let's make sure this config is not already loaded
|
|
var configChanged bool
|
|
if rawConfig == nil {
|
|
enc, err := json.Marshal(cfg.AlertmanagerConfig)
|
|
if err != nil {
|
|
// In theory, this should never happen.
|
|
return err
|
|
}
|
|
rawConfig = enc
|
|
}
|
|
|
|
if am.configHash != md5.Sum(rawConfig) {
|
|
configChanged = true
|
|
}
|
|
|
|
if cfg.TemplateFiles == nil {
|
|
cfg.TemplateFiles = map[string]string{}
|
|
}
|
|
cfg.TemplateFiles["__default__.tmpl"] = channels.DefaultTemplateString
|
|
|
|
// next, we need to make sure we persist the templates to disk.
|
|
paths, templatesChanged, err := PersistTemplates(cfg, am.WorkingDirPath())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// If neither the configuration nor templates have changed, we've got nothing to do.
|
|
if !configChanged && !templatesChanged {
|
|
am.logger.Debug("neither config nor template have changed, skipping configuration sync.")
|
|
return nil
|
|
}
|
|
|
|
// With the templates persisted, create the template list using the paths.
|
|
tmpl, err := am.templateFromPaths(paths...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Finally, build the integrations map using the receiver configuration and templates.
|
|
integrationsMap, err := am.buildIntegrationsMap(cfg.AlertmanagerConfig.Receivers, tmpl)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to build integration map: %w", err)
|
|
}
|
|
// Now, let's put together our notification pipeline
|
|
routingStage := make(notify.RoutingStage, len(integrationsMap))
|
|
|
|
if am.inhibitor != nil {
|
|
am.inhibitor.Stop()
|
|
}
|
|
if am.dispatcher != nil {
|
|
am.dispatcher.Stop()
|
|
}
|
|
|
|
am.inhibitor = inhibit.NewInhibitor(am.alerts, cfg.AlertmanagerConfig.InhibitRules, am.marker, am.gokitLogger)
|
|
am.silencer = silence.NewSilencer(am.silences, am.marker, am.gokitLogger)
|
|
|
|
inhibitionStage := notify.NewMuteStage(am.inhibitor)
|
|
silencingStage := notify.NewMuteStage(am.silencer)
|
|
for name := range integrationsMap {
|
|
stage := am.createReceiverStage(name, integrationsMap[name], waitFunc, am.notificationLog)
|
|
routingStage[name] = notify.MultiStage{silencingStage, inhibitionStage, stage}
|
|
}
|
|
|
|
am.route = dispatch.NewRoute(cfg.AlertmanagerConfig.Route, nil)
|
|
am.dispatcher = dispatch.NewDispatcher(am.alerts, am.route, routingStage, am.marker, timeoutFunc, &nilLimits{}, am.gokitLogger, am.dispatcherMetrics)
|
|
|
|
am.wg.Add(1)
|
|
go func() {
|
|
defer am.wg.Done()
|
|
am.dispatcher.Run()
|
|
}()
|
|
|
|
am.wg.Add(1)
|
|
go func() {
|
|
defer am.wg.Done()
|
|
am.inhibitor.Run()
|
|
}()
|
|
|
|
am.config = cfg
|
|
am.configHash = md5.Sum(rawConfig)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (am *Alertmanager) WorkingDirPath() string {
|
|
return filepath.Join(am.Settings.DataPath, workingDir, strconv.Itoa(int(am.orgID)))
|
|
}
|
|
|
|
// buildIntegrationsMap builds a map of name to the list of Grafana integration notifiers off of a list of receiver config.
|
|
func (am *Alertmanager) buildIntegrationsMap(receivers []*apimodels.PostableApiReceiver, templates *template.Template) (map[string][]notify.Integration, error) {
|
|
integrationsMap := make(map[string][]notify.Integration, len(receivers))
|
|
for _, receiver := range receivers {
|
|
integrations, err := am.buildReceiverIntegrations(receiver, templates)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
integrationsMap[receiver.Name] = integrations
|
|
}
|
|
|
|
return integrationsMap, nil
|
|
}
|
|
|
|
type NotificationChannel interface {
|
|
notify.Notifier
|
|
notify.ResolvedSender
|
|
}
|
|
|
|
// buildReceiverIntegrations builds a list of integration notifiers off of a receiver config.
|
|
func (am *Alertmanager) buildReceiverIntegrations(receiver *apimodels.PostableApiReceiver, tmpl *template.Template) ([]notify.Integration, error) {
|
|
var integrations []notify.Integration
|
|
for i, r := range receiver.GrafanaManagedReceivers {
|
|
n, err := am.buildReceiverIntegration(r, tmpl)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
integrations = append(integrations, notify.NewIntegration(n, n, r.Type, i))
|
|
}
|
|
return integrations, nil
|
|
}
|
|
|
|
func (am *Alertmanager) buildReceiverIntegration(r *apimodels.PostableGrafanaReceiver, tmpl *template.Template) (NotificationChannel, error) {
|
|
// secure settings are already encrypted at this point
|
|
secureSettings := securejsondata.SecureJsonData(make(map[string][]byte, len(r.SecureSettings)))
|
|
|
|
for k, v := range r.SecureSettings {
|
|
d, err := base64.StdEncoding.DecodeString(v)
|
|
if err != nil {
|
|
return nil, InvalidReceiverError{
|
|
Receiver: r,
|
|
Err: errors.New("failed to decode secure setting"),
|
|
}
|
|
}
|
|
secureSettings[k] = d
|
|
}
|
|
|
|
var (
|
|
cfg = &channels.NotificationChannelConfig{
|
|
UID: r.UID,
|
|
Name: r.Name,
|
|
Type: r.Type,
|
|
DisableResolveMessage: r.DisableResolveMessage,
|
|
Settings: r.Settings,
|
|
SecureSettings: secureSettings,
|
|
}
|
|
n NotificationChannel
|
|
err error
|
|
)
|
|
switch r.Type {
|
|
case "email":
|
|
n, err = channels.NewEmailNotifier(cfg, tmpl) // Email notifier already has a default template.
|
|
case "pagerduty":
|
|
n, err = channels.NewPagerdutyNotifier(cfg, tmpl)
|
|
case "pushover":
|
|
n, err = channels.NewPushoverNotifier(cfg, tmpl)
|
|
case "slack":
|
|
n, err = channels.NewSlackNotifier(cfg, tmpl)
|
|
case "telegram":
|
|
n, err = channels.NewTelegramNotifier(cfg, tmpl)
|
|
case "victorops":
|
|
n, err = channels.NewVictoropsNotifier(cfg, tmpl)
|
|
case "teams":
|
|
n, err = channels.NewTeamsNotifier(cfg, tmpl)
|
|
case "dingding":
|
|
n, err = channels.NewDingDingNotifier(cfg, tmpl)
|
|
case "kafka":
|
|
n, err = channels.NewKafkaNotifier(cfg, tmpl)
|
|
case "webhook":
|
|
n, err = channels.NewWebHookNotifier(cfg, tmpl)
|
|
case "sensugo":
|
|
n, err = channels.NewSensuGoNotifier(cfg, tmpl)
|
|
case "discord":
|
|
n, err = channels.NewDiscordNotifier(cfg, tmpl)
|
|
case "googlechat":
|
|
n, err = channels.NewGoogleChatNotifier(cfg, tmpl)
|
|
case "LINE":
|
|
n, err = channels.NewLineNotifier(cfg, tmpl)
|
|
case "threema":
|
|
n, err = channels.NewThreemaNotifier(cfg, tmpl)
|
|
case "opsgenie":
|
|
n, err = channels.NewOpsgenieNotifier(cfg, tmpl)
|
|
case "prometheus-alertmanager":
|
|
n, err = channels.NewAlertmanagerNotifier(cfg, tmpl)
|
|
default:
|
|
return nil, InvalidReceiverError{
|
|
Receiver: r,
|
|
Err: fmt.Errorf("notifier %s is not supported", r.Type),
|
|
}
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, InvalidReceiverError{
|
|
Receiver: r,
|
|
Err: err,
|
|
}
|
|
}
|
|
|
|
return n, nil
|
|
}
|
|
|
|
// PutAlerts receives the alerts and then sends them through the corresponding route based on whenever the alert has a receiver embedded or not
|
|
func (am *Alertmanager) PutAlerts(postableAlerts apimodels.PostableAlerts) error {
|
|
now := time.Now()
|
|
alerts := make([]*types.Alert, 0, len(postableAlerts.PostableAlerts))
|
|
var validationErr *AlertValidationError
|
|
for _, a := range postableAlerts.PostableAlerts {
|
|
alert := &types.Alert{
|
|
Alert: model.Alert{
|
|
Labels: model.LabelSet{},
|
|
Annotations: model.LabelSet{},
|
|
StartsAt: time.Time(a.StartsAt),
|
|
EndsAt: time.Time(a.EndsAt),
|
|
GeneratorURL: a.GeneratorURL.String(),
|
|
},
|
|
UpdatedAt: now,
|
|
}
|
|
|
|
for k, v := range a.Labels {
|
|
if len(v) == 0 || k == ngmodels.NamespaceUIDLabel { // Skip empty and namespace UID labels.
|
|
continue
|
|
}
|
|
alert.Alert.Labels[model.LabelName(k)] = model.LabelValue(v)
|
|
}
|
|
for k, v := range a.Annotations {
|
|
if len(v) == 0 { // Skip empty annotation.
|
|
continue
|
|
}
|
|
alert.Alert.Annotations[model.LabelName(k)] = model.LabelValue(v)
|
|
}
|
|
|
|
// Ensure StartsAt is set.
|
|
if alert.StartsAt.IsZero() {
|
|
if alert.EndsAt.IsZero() {
|
|
alert.StartsAt = now
|
|
} else {
|
|
alert.StartsAt = alert.EndsAt
|
|
}
|
|
}
|
|
// If no end time is defined, set a timeout after which an alert
|
|
// is marked resolved if it is not updated.
|
|
if alert.EndsAt.IsZero() {
|
|
alert.Timeout = true
|
|
alert.EndsAt = now.Add(defaultResolveTimeout)
|
|
}
|
|
|
|
if alert.EndsAt.After(now) {
|
|
am.Metrics.Firing().Inc()
|
|
} else {
|
|
am.Metrics.Resolved().Inc()
|
|
}
|
|
|
|
if err := validateAlert(alert); err != nil {
|
|
if validationErr == nil {
|
|
validationErr = &AlertValidationError{}
|
|
}
|
|
validationErr.Alerts = append(validationErr.Alerts, a)
|
|
validationErr.Errors = append(validationErr.Errors, err)
|
|
am.Metrics.Invalid().Inc()
|
|
continue
|
|
}
|
|
|
|
alerts = append(alerts, alert)
|
|
}
|
|
|
|
if err := am.alerts.Put(alerts...); err != nil {
|
|
// Notification sending alert takes precedence over validation errors.
|
|
return err
|
|
}
|
|
if validationErr != nil {
|
|
// Even if validationErr is nil, the require.NoError fails on it.
|
|
return validationErr
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// validateAlert is a.Validate() while additionally allowing
|
|
// space for label and annotation names.
|
|
func validateAlert(a *types.Alert) error {
|
|
if a.StartsAt.IsZero() {
|
|
return fmt.Errorf("start time missing")
|
|
}
|
|
if !a.EndsAt.IsZero() && a.EndsAt.Before(a.StartsAt) {
|
|
return fmt.Errorf("start time must be before end time")
|
|
}
|
|
if err := validateLabelSet(a.Labels); err != nil {
|
|
return fmt.Errorf("invalid label set: %s", err)
|
|
}
|
|
if len(a.Labels) == 0 {
|
|
return fmt.Errorf("at least one label pair required")
|
|
}
|
|
if err := validateLabelSet(a.Annotations); err != nil {
|
|
return fmt.Errorf("invalid annotations: %s", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// validateLabelSet is ls.Validate() while additionally allowing
|
|
// space for label names.
|
|
func validateLabelSet(ls model.LabelSet) error {
|
|
for ln, lv := range ls {
|
|
if !isValidLabelName(ln) {
|
|
return fmt.Errorf("invalid name %q", ln)
|
|
}
|
|
if !lv.IsValid() {
|
|
return fmt.Errorf("invalid value %q", lv)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// isValidLabelName is ln.IsValid() while additionally allowing spaces.
|
|
// The regex for Prometheus data model is ^[a-zA-Z_][a-zA-Z0-9_]*$
|
|
// while we will follow ^[a-zA-Z_][a-zA-Z0-9_ ]*$
|
|
func isValidLabelName(ln model.LabelName) bool {
|
|
if len(ln) == 0 {
|
|
return false
|
|
}
|
|
for i, b := range ln {
|
|
if !((b >= 'a' && b <= 'z') ||
|
|
(b >= 'A' && b <= 'Z') ||
|
|
b == '_' ||
|
|
(i > 0 && (b == ' ' || (b >= '0' && b <= '9')))) {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// AlertValidationError is the error capturing the validation errors
|
|
// faced on the alerts.
|
|
type AlertValidationError struct {
|
|
Alerts []amv2.PostableAlert
|
|
Errors []error // Errors[i] refers to Alerts[i].
|
|
}
|
|
|
|
func (e AlertValidationError) Error() string {
|
|
errMsg := ""
|
|
if len(e.Errors) != 0 {
|
|
errMsg = e.Errors[0].Error()
|
|
for _, e := range e.Errors[1:] {
|
|
errMsg += ";" + e.Error()
|
|
}
|
|
}
|
|
return errMsg
|
|
}
|
|
|
|
// createReceiverStage creates a pipeline of stages for a receiver.
|
|
func (am *Alertmanager) createReceiverStage(name string, integrations []notify.Integration, wait func() time.Duration, notificationLog notify.NotificationLog) notify.Stage {
|
|
var fs notify.FanoutStage
|
|
for i := range integrations {
|
|
recv := &nflogpb.Receiver{
|
|
GroupName: name,
|
|
Integration: integrations[i].Name(),
|
|
Idx: uint32(integrations[i].Index()),
|
|
}
|
|
var s notify.MultiStage
|
|
s = append(s, notify.NewWaitStage(wait))
|
|
s = append(s, notify.NewDedupStage(&integrations[i], notificationLog, recv))
|
|
s = append(s, notify.NewRetryStage(integrations[i], name, am.stageMetrics))
|
|
s = append(s, notify.NewSetNotifiesStage(notificationLog, recv))
|
|
|
|
fs = append(fs, s)
|
|
}
|
|
return fs
|
|
}
|
|
|
|
func waitFunc() time.Duration {
|
|
// When it's a single instance, we don't need additional wait. The routing policies will have their own group wait.
|
|
// We need >0 wait here in case we have peers to sync the notification state with. 0 wait in that case can result
|
|
// in duplicate notifications being sent.
|
|
// TODO: we have setting.AlertingNotificationTimeout in legacy settings. Either use that or separate set of config
|
|
// for clustering with intuitive name, like "PeerTimeout".
|
|
return 0
|
|
}
|
|
|
|
func timeoutFunc(d time.Duration) time.Duration {
|
|
//TODO: What does MinTimeout means here?
|
|
if d < notify.MinTimeout {
|
|
d = notify.MinTimeout
|
|
}
|
|
return d + waitFunc()
|
|
}
|
|
|
|
type nilLimits struct{}
|
|
|
|
func (n nilLimits) MaxNumberOfAggregationGroups() int { return 0 }
|