AlertingNG: Add alert provider and basic structure with dispatcher, silences and delivery stages (#31833)

* AlertingNG: Add alert provider

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Add unit tests

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Alertmanager WIP

* Merge alertmanager into notifier

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Fixes for PR 31833 (#31990)

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Use alertmanager from upgrad-uuid temporarily to unblock

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Fix lint

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

Co-authored-by: Josue Abreu <josue@grafana.com>
This commit is contained in:
Ganesh Vernekar
2021-03-16 15:14:52 +05:30
committed by GitHub
parent e9402a56ba
commit ecbc98ba5d
6 changed files with 551 additions and 12 deletions

View File

@@ -0,0 +1,108 @@
package notifier
import (
"context"
"time"
"github.com/go-kit/kit/log"
"github.com/prometheus/alertmanager/api/v2/models"
"github.com/prometheus/alertmanager/notify"
"github.com/prometheus/alertmanager/provider"
"github.com/prometheus/alertmanager/provider/mem"
"github.com/prometheus/alertmanager/types"
"github.com/prometheus/common/model"
)
type PostableAlert struct {
models.PostableAlert
// List of receiver names to sent alert to
Receivers []string `json:"receivers"`
}
type AlertProvider struct {
provider.Alerts
logger log.Logger
// TODO(codesome): This stage is temporary to get code out quickly.
// Eventually, the alerts meant directly for receivers and not routing
// will be stored in memory and provided via an iterator, for example
// GetPendingLegacy() AlertIterator, and the external code will use this
// iterator to send to the stage.
stage notify.Stage
}
func NewAlertProvider(s notify.Stage, m types.Marker, l log.Logger) (*AlertProvider, error) {
alerts, err := mem.NewAlerts(context.Background(), m, 30*time.Minute, l)
if err != nil {
return nil, err
}
return &AlertProvider{
Alerts: alerts,
stage: s,
logger: l,
}, nil
}
func (ap *AlertProvider) PutPostableAlert(alerts ...*PostableAlert) error {
var alertsWithReceivers []*PostableAlert
var alertsWithoutReceivers []*types.Alert
for _, a := range alerts {
if len(a.Receivers) > 0 {
alertsWithReceivers = append(alertsWithReceivers, a)
} else {
alertsWithoutReceivers = append(alertsWithoutReceivers, alertForDelivery(a))
}
}
// Without receiver names, alerts go through routing.
if err := ap.Alerts.Put(alertsWithoutReceivers...); err != nil {
return err
}
if len(alertsWithReceivers) == 0 {
return nil
}
// Group alerts with receivers based on the receiver names.
groupedAlerts := make(map[string][]*types.Alert)
for _, a := range alertsWithReceivers {
for _, recv := range a.Receivers {
groupedAlerts[recv] = append(groupedAlerts[recv], alertForDelivery(a))
}
}
for recv, alerts := range groupedAlerts {
ctx := notify.WithReceiverName(context.Background(), recv)
_, _, err := ap.stage.Exec(ctx, ap.logger, alerts...)
if err != nil {
return err
}
}
return nil
}
func alertForDelivery(a *PostableAlert) *types.Alert {
lbls := model.LabelSet{}
annotations := model.LabelSet{}
for k, v := range a.Labels {
lbls[model.LabelName(k)] = model.LabelValue(v)
}
for k, v := range a.Annotations {
annotations[model.LabelName(k)] = model.LabelValue(v)
}
return &types.Alert{
Alert: model.Alert{
Labels: lbls,
Annotations: annotations,
StartsAt: time.Time(a.StartsAt),
EndsAt: time.Time(a.EndsAt),
GeneratorURL: a.GeneratorURL.String(),
},
UpdatedAt: time.Time{}, // TODO(codesome) what should this be?
Timeout: false, // TODO(codesome).
}
}

View File

@@ -0,0 +1,129 @@
package notifier
import (
"context"
"errors"
"testing"
"github.com/go-kit/kit/log"
"github.com/prometheus/alertmanager/api/v2/models"
"github.com/prometheus/alertmanager/notify"
"github.com/prometheus/alertmanager/provider"
"github.com/prometheus/alertmanager/types"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
)
func TestAlertProvider_PutPostableAlert(t *testing.T) {
marker := types.NewMarker(prometheus.DefaultRegisterer)
stage := &mockStage{alerts: make(map[string][]*types.Alert)}
provider := &mockAlertProvider{}
ap, err := NewAlertProvider(stage, marker, log.NewNopLogger())
require.NoError(t, err)
ap.Alerts = provider
postableAlerts := []*PostableAlert{
{
// Goes through routing since no receiver.
PostableAlert: models.PostableAlert{
Annotations: models.LabelSet{"msg": "AlertOne annotation"},
Alert: models.Alert{
Labels: models.LabelSet{"alertname": "AlertOne"},
},
},
}, {
// Goes directly through notification pipeling since there is receiver.
PostableAlert: models.PostableAlert{
Annotations: models.LabelSet{"msg": "AlertTwo annotation"},
Alert: models.Alert{
Labels: models.LabelSet{"alertname": "AlertTwo"},
},
},
Receivers: []string{"recv1", "recv2"},
}, {
// Goes directly through notification pipeling since there is receiver.
PostableAlert: models.PostableAlert{
Annotations: models.LabelSet{"msg": "AlertThree annotation"},
Alert: models.Alert{
Labels: models.LabelSet{"alertname": "AlertThree"},
},
},
Receivers: []string{"recv2", "recv3"},
},
}
require.NoError(t, ap.PutPostableAlert(postableAlerts...))
// Alerts that should be sent for routing.
expProviderAlerts := []*types.Alert{
{
Alert: model.Alert{
Annotations: model.LabelSet{"msg": "AlertOne annotation"},
Labels: model.LabelSet{"alertname": "AlertOne"},
},
},
}
require.Equal(t, expProviderAlerts, provider.alerts)
// Alerts that should go directly to the notification pipeline.
expPipelineAlerts := map[string][]*types.Alert{
"recv1": {
{
Alert: model.Alert{
Annotations: model.LabelSet{"msg": "AlertTwo annotation"},
Labels: model.LabelSet{"alertname": "AlertTwo"},
},
},
},
"recv2": {
{
Alert: model.Alert{
Annotations: model.LabelSet{"msg": "AlertTwo annotation"},
Labels: model.LabelSet{"alertname": "AlertTwo"},
},
}, {
Alert: model.Alert{
Annotations: model.LabelSet{"msg": "AlertThree annotation"},
Labels: model.LabelSet{"alertname": "AlertThree"},
},
},
},
"recv3": {
{
Alert: model.Alert{
Annotations: model.LabelSet{"msg": "AlertThree annotation"},
Labels: model.LabelSet{"alertname": "AlertThree"},
},
},
},
}
require.Equal(t, expPipelineAlerts, stage.alerts)
}
type mockAlertProvider struct {
alerts []*types.Alert
}
func (a *mockAlertProvider) Subscribe() provider.AlertIterator { return nil }
func (a *mockAlertProvider) GetPending() provider.AlertIterator { return nil }
func (a *mockAlertProvider) Get(model.Fingerprint) (*types.Alert, error) { return nil, nil }
func (a *mockAlertProvider) Put(alerts ...*types.Alert) error {
a.alerts = append(a.alerts, alerts...)
return nil
}
type mockStage struct {
alerts map[string][]*types.Alert
}
func (s *mockStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
recv, ok := notify.ReceiverName(ctx)
if !ok {
return ctx, nil, errors.New("receiver name not found")
}
s.alerts[recv] = append(s.alerts[recv], alerts...)
return ctx, nil, nil
}

View File

@@ -0,0 +1,275 @@
package notifier
import (
"context"
"path/filepath"
"sort"
"sync"
"time"
gokit_log "github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/dispatch"
"github.com/prometheus/alertmanager/nflog"
"github.com/prometheus/alertmanager/nflog/nflogpb"
"github.com/prometheus/alertmanager/notify"
"github.com/prometheus/alertmanager/pkg/labels"
"github.com/prometheus/alertmanager/silence"
"github.com/prometheus/alertmanager/silence/silencepb"
"github.com/prometheus/alertmanager/types"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/setting"
)
type Alertmanager struct {
logger log.Logger
// notificationLog keeps tracks of which notifications we've fired already.
notificationLog *nflog.Log
// silences keeps the track of which notifications we should not fire due to user configuration.
silences *silence.Silences
marker types.Marker
alerts *AlertProvider
dispatcher *dispatch.Dispatcher
wg sync.WaitGroup
}
type WithReceiverStage struct {
}
func (s *WithReceiverStage) Exec(ctx context.Context, l gokit_log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
//TODO: Alerts with a receiver should be handled here.
return ctx, nil, nil
}
func init() {
registry.RegisterService(&Alertmanager{})
}
func (am *Alertmanager) IsDisabled() bool {
return !setting.AlertingEnabled || !setting.ExecuteAlerts
}
func (am *Alertmanager) Init() error {
am.logger = log.New("alertmanager")
return nil
}
func (am *Alertmanager) Run(ctx context.Context) error {
//TODO: Speak with David Parrot wrt to the marker, we'll probably need our own.
am.marker = types.NewMarker(prometheus.DefaultRegisterer)
var err error
am.alerts, err = NewAlertProvider(&WithReceiverStage{}, am.marker, gokit_log.NewNopLogger())
if err != nil {
return errors.Wrap(err, "failed to initialize alerting storage component")
}
am.silences, err = silence.New(silence.Options{
SnapshotFile: filepath.Join("dir", "silences"), //TODO: This is a setting
Retention: time.Hour * 24, //TODO: This is also a setting
})
if err != nil {
return errors.Wrap(err, "unable to initialize the silencing component of alerting")
}
am.notificationLog, err = nflog.New(
nflog.WithRetention(time.Hour*24), //TODO: This is a setting.
nflog.WithSnapshot(filepath.Join("dir", "notifications")), //TODO: This should be a setting
)
if err != nil {
return errors.Wrap(err, "unable to initialize the notification log component of alerting")
}
{
// Now, let's put together our notification pipeline
receivers := buildIntegrationsMap()
routingStage := make(notify.RoutingStage, len(receivers))
silencingStage := notify.NewMuteStage(silence.NewSilencer(am.silences, am.marker, gokit_log.NewNopLogger()))
//TODO: We need to unify these receivers
for name := range receivers {
stage := createReceiverStage(name, receivers[name], waitFunc, am.notificationLog)
routingStage[name] = notify.MultiStage{silencingStage, stage}
}
am.dispatcher = dispatch.NewDispatcher(am.alerts, BuildRoutingConfiguration(), routingStage, am.marker, timeoutFunc, gokit_log.NewNopLogger(), nil)
}
am.wg.Add(1)
go am.dispatcher.Run()
return nil
}
// CreateAlerts receives the alerts and then sends them through the corresponding route based on whenever the alert has a receiver embedded or not
func (am *Alertmanager) CreateAlerts(alerts ...*PostableAlert) error {
return am.alerts.PutPostableAlert(alerts...)
}
func (am *Alertmanager) ListSilences(matchers []*labels.Matcher) ([]types.Silence, error) {
pbsilences, _, err := am.silences.Query()
if err != nil {
return nil, errors.Wrap(err, "unable to query for the list of silences")
}
r := []types.Silence{}
for _, pbs := range pbsilences {
s, err := silenceFromProto(pbs)
if err != nil {
return nil, errors.Wrap(err, "unable to marshal silence")
}
sms := make(map[string]string)
for _, m := range s.Matchers {
sms[m.Name] = m.Value
}
if !matchFilterLabels(matchers, sms) {
continue
}
r = append(r, *s)
}
var active, pending, expired []types.Silence
for _, s := range r {
switch s.Status.State {
case types.SilenceStateActive:
active = append(active, s)
case types.SilenceStatePending:
pending = append(pending, s)
case types.SilenceStateExpired:
expired = append(expired, s)
}
}
sort.Slice(active, func(i int, j int) bool {
return active[i].EndsAt.Before(active[j].EndsAt)
})
sort.Slice(pending, func(i int, j int) bool {
return pending[i].StartsAt.Before(pending[j].EndsAt)
})
sort.Slice(expired, func(i int, j int) bool {
return expired[i].EndsAt.After(expired[j].EndsAt)
})
// Initialize silences explicitly to an empty list (instead of nil)
// So that it does not get converted to "null" in JSON.
silences := []types.Silence{}
silences = append(silences, active...)
silences = append(silences, pending...)
silences = append(silences, expired...)
return silences, nil
}
func (am *Alertmanager) GetSilence(silence *types.Silence) {}
func (am *Alertmanager) CreateSilence(silence *types.Silence) {}
func (am *Alertmanager) DeleteSilence(silence *types.Silence) {}
// createReceiverStage creates a pipeline of stages for a receiver.
func createReceiverStage(name string, integrations []notify.Integration, wait func() time.Duration, notificationLog notify.NotificationLog) notify.Stage {
var fs notify.FanoutStage
for i := range integrations {
recv := &nflogpb.Receiver{
GroupName: name,
Integration: integrations[i].Name(),
Idx: uint32(integrations[i].Index()),
}
var s notify.MultiStage
s = append(s, notify.NewWaitStage(wait))
s = append(s, notify.NewDedupStage(&integrations[i], notificationLog, recv))
//TODO: This probably won't work w/o the metrics
s = append(s, notify.NewRetryStage(integrations[i], name, nil))
s = append(s, notify.NewSetNotifiesStage(notificationLog, recv))
fs = append(fs, s)
}
return fs
}
// BuildRoutingConfiguration produces an alertmanager-based routing configuration.
func BuildRoutingConfiguration() *dispatch.Route {
var cfg *config.Config
return dispatch.NewRoute(cfg.Route, nil)
}
func buildIntegrationsMap() map[string][]notify.Integration {
return map[string][]notify.Integration{}
}
func waitFunc() time.Duration {
return setting.AlertingNotificationTimeout
}
func timeoutFunc(d time.Duration) time.Duration {
//TODO: What does MinTimeout means here?
if d < notify.MinTimeout {
d = notify.MinTimeout
}
return d + waitFunc()
}
// copied from the Alertmanager
func silenceFromProto(s *silencepb.Silence) (*types.Silence, error) {
sil := &types.Silence{
ID: s.Id,
StartsAt: s.StartsAt,
EndsAt: s.EndsAt,
UpdatedAt: s.UpdatedAt,
Status: types.SilenceStatus{
State: types.CalcSilenceState(s.StartsAt, s.EndsAt),
},
Comment: s.Comment,
CreatedBy: s.CreatedBy,
}
for _, m := range s.Matchers {
var t labels.MatchType
switch m.Type {
case silencepb.Matcher_EQUAL:
t = labels.MatchEqual
case silencepb.Matcher_REGEXP:
t = labels.MatchRegexp
case silencepb.Matcher_NOT_EQUAL:
t = labels.MatchNotEqual
case silencepb.Matcher_NOT_REGEXP:
t = labels.MatchNotRegexp
}
matcher, err := labels.NewMatcher(t, m.Name, m.Pattern)
if err != nil {
return nil, err
}
sil.Matchers = append(sil.Matchers, matcher)
}
return sil, nil
}
func matchFilterLabels(matchers []*labels.Matcher, sms map[string]string) bool {
for _, m := range matchers {
v, prs := sms[m.Name]
switch m.Type {
case labels.MatchNotRegexp, labels.MatchNotEqual:
if m.Value == "" && prs {
continue
}
if !m.Matches(v) {
return false
}
default:
if m.Value == "" && !prs {
continue
}
if !m.Matches(v) {
return false
}
}
}
return true
}

View File

@@ -0,0 +1 @@
package notifier