AlertingNG: Remove the receivers field from postable alerts (#33068)

* AlertingNG: Remove the receivers field from postable alerts and update tests

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Fix review comments

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
Ganesh Vernekar 2021-04-19 12:28:44 +05:30 committed by GitHub
parent d62601e664
commit 6271777ec6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 101 additions and 181 deletions

View File

@ -2,101 +2,41 @@ package notifier
import ( import (
"context" "context"
"sync"
"time" "time"
gokit_log "github.com/go-kit/kit/log" gokit_log "github.com/go-kit/kit/log"
apimodels "github.com/grafana/alerting-api/pkg/api"
"github.com/prometheus/alertmanager/api/v2/models" "github.com/prometheus/alertmanager/api/v2/models"
"github.com/prometheus/alertmanager/notify"
"github.com/prometheus/alertmanager/provider" "github.com/prometheus/alertmanager/provider"
"github.com/prometheus/alertmanager/provider/mem" "github.com/prometheus/alertmanager/provider/mem"
"github.com/prometheus/alertmanager/types" "github.com/prometheus/alertmanager/types"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
) )
type PostableAlert struct {
models.PostableAlert
// List of receiver names to sent alert to
Receivers []string `json:"receivers"`
}
type AlertProvider struct { type AlertProvider struct {
provider.Alerts provider.Alerts
// TODO(codesome): This stage is temporary to get code out quickly.
// Eventually, the alerts meant directly for receivers and not routing
// will be stored in memory and provided via an iterator, for example
// GetPendingLegacy() AlertIterator, and the external code will use this
// iterator to send to the stage.
stage notify.Stage
stageMtx sync.Mutex
} }
// NewAlertProvider returns AlertProvider that also supports legacy alerts via PutPostableAlert. // NewAlertProvider returns AlertProvider that provides a method to translate
// The notify.Stage should be of the type notify.RoutingStage or something similar that takes // Grafana alerts to Prometheus Alertmanager alerts before passing it ahead.
// notification channel name from the context. func NewAlertProvider(m types.Marker) (*AlertProvider, error) {
func NewAlertProvider(s notify.Stage, m types.Marker) (*AlertProvider, error) {
alerts, err := mem.NewAlerts(context.Background(), m, 30*time.Minute, gokit_log.NewNopLogger()) alerts, err := mem.NewAlerts(context.Background(), m, 30*time.Minute, gokit_log.NewNopLogger())
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &AlertProvider{ return &AlertProvider{Alerts: alerts}, nil
Alerts: alerts,
stage: s,
}, nil
} }
func (ap *AlertProvider) PutPostableAlert(alerts ...*PostableAlert) error { func (ap *AlertProvider) PutPostableAlert(postableAlerts apimodels.PostableAlerts) error {
var alertsWithReceivers []*PostableAlert alerts := make([]*types.Alert, 0, len(postableAlerts.PostableAlerts))
var alertsWithoutReceivers []*types.Alert for _, a := range postableAlerts.PostableAlerts {
for _, a := range alerts { alerts = append(alerts, alertForDelivery(a))
if len(a.Receivers) > 0 {
alertsWithReceivers = append(alertsWithReceivers, a)
} else {
alertsWithoutReceivers = append(alertsWithoutReceivers, alertForDelivery(a))
} }
return ap.Alerts.Put(alerts...)
} }
// Without receiver names, alerts go through routing. func alertForDelivery(a models.PostableAlert) *types.Alert {
if err := ap.Alerts.Put(alertsWithoutReceivers...); err != nil {
return err
}
if len(alertsWithReceivers) == 0 || ap.stage == nil {
return nil
}
// Group alerts with receivers based on the receiver names.
groupedAlerts := make(map[string][]*types.Alert)
for _, a := range alertsWithReceivers {
for _, recv := range a.Receivers {
groupedAlerts[recv] = append(groupedAlerts[recv], alertForDelivery(a))
}
}
for recv, alerts := range groupedAlerts {
ap.stageMtx.Lock()
ctx := notify.WithReceiverName(context.Background(), recv)
_, _, err := ap.stage.Exec(ctx, gokit_log.NewNopLogger(), alerts...)
ap.stageMtx.Unlock()
if err != nil {
return err
}
}
return nil
}
func (ap *AlertProvider) SetStage(s notify.Stage) {
ap.stageMtx.Lock()
defer ap.stageMtx.Unlock()
ap.stage = s
}
func alertForDelivery(a *PostableAlert) *types.Alert {
lbls := model.LabelSet{} lbls := model.LabelSet{}
annotations := model.LabelSet{} annotations := model.LabelSet{}
for k, v := range a.Labels { for k, v := range a.Labels {

View File

@ -1,13 +1,12 @@
package notifier package notifier
import ( import (
"context"
"errors"
"testing" "testing"
"time"
"github.com/go-kit/kit/log" "github.com/go-openapi/strfmt"
apimodels "github.com/grafana/alerting-api/pkg/api"
"github.com/prometheus/alertmanager/api/v2/models" "github.com/prometheus/alertmanager/api/v2/models"
"github.com/prometheus/alertmanager/notify"
"github.com/prometheus/alertmanager/provider" "github.com/prometheus/alertmanager/provider"
"github.com/prometheus/alertmanager/types" "github.com/prometheus/alertmanager/types"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@ -15,91 +14,89 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestAlertProvider_PutPostableAlert(t *testing.T) { func TestAlertProvider(t *testing.T) {
marker := types.NewMarker(prometheus.DefaultRegisterer) marker := types.NewMarker(prometheus.DefaultRegisterer)
stage := &mockStage{alerts: make(map[string][]*types.Alert)} alertProvider := &mockAlertProvider{}
provider := &mockAlertProvider{}
ap, err := NewAlertProvider(stage, marker) ap, err := NewAlertProvider(marker)
require.NoError(t, err) require.NoError(t, err)
ap.Alerts = provider ap.Alerts = alertProvider
postableAlerts := []*PostableAlert{ startTime := time.Now()
{ endTime := startTime.Add(2 * time.Hour)
// Goes through routing since no receiver. postableAlerts := apimodels.PostableAlerts{
PostableAlert: models.PostableAlert{ PostableAlerts: []models.PostableAlert{
Annotations: models.LabelSet{"msg": "AlertOne annotation"}, { // Start and end set.
Annotations: models.LabelSet{"msg": "Alert1 annotation"},
Alert: models.Alert{ Alert: models.Alert{
Labels: models.LabelSet{"alertname": "AlertOne"}, Labels: models.LabelSet{"alertname": "Alert1"},
GeneratorURL: "http://localhost/url1",
}, },
}, StartsAt: strfmt.DateTime(startTime),
}, { EndsAt: strfmt.DateTime(endTime),
// Goes directly through notification pipeling since there is receiver. }, { // Only end is set.
PostableAlert: models.PostableAlert{ Annotations: models.LabelSet{"msg": "Alert2 annotation"},
Annotations: models.LabelSet{"msg": "AlertTwo annotation"},
Alert: models.Alert{ Alert: models.Alert{
Labels: models.LabelSet{"alertname": "AlertTwo"}, Labels: models.LabelSet{"alertname": "Alert2"},
GeneratorURL: "http://localhost/url2",
}, },
}, StartsAt: strfmt.DateTime{},
Receivers: []string{"recv1", "recv2"}, EndsAt: strfmt.DateTime(endTime),
}, { }, { // Only start is set.
// Goes directly through notification pipeling since there is receiver. Annotations: models.LabelSet{"msg": "Alert3 annotation"},
PostableAlert: models.PostableAlert{
Annotations: models.LabelSet{"msg": "AlertThree annotation"},
Alert: models.Alert{ Alert: models.Alert{
Labels: models.LabelSet{"alertname": "AlertThree"}, Labels: models.LabelSet{"alertname": "Alert3"},
GeneratorURL: "http://localhost/url3",
}, },
StartsAt: strfmt.DateTime(startTime),
EndsAt: strfmt.DateTime{},
}, { // Both start and end are not set.
Annotations: models.LabelSet{"msg": "Alert4 annotation"},
Alert: models.Alert{
Labels: models.LabelSet{"alertname": "Alert4"},
GeneratorURL: "http://localhost/url4",
},
StartsAt: strfmt.DateTime{},
EndsAt: strfmt.DateTime{},
}, },
Receivers: []string{"recv2", "recv3"},
}, },
} }
require.NoError(t, ap.PutPostableAlert(postableAlerts...)) require.NoError(t, ap.PutPostableAlert(postableAlerts))
// Alerts that should be sent for routing. // Alerts that should be sent for routing.
expProviderAlerts := []*types.Alert{ expProviderAlerts := []*types.Alert{
{ {
Alert: model.Alert{ Alert: model.Alert{
Annotations: model.LabelSet{"msg": "AlertOne annotation"}, Annotations: model.LabelSet{"msg": "Alert1 annotation"},
Labels: model.LabelSet{"alertname": "AlertOne"}, Labels: model.LabelSet{"alertname": "Alert1"},
}, StartsAt: startTime,
}, EndsAt: endTime,
} GeneratorURL: "http://localhost/url1",
require.Equal(t, expProviderAlerts, provider.alerts)
// Alerts that should go directly to the notification pipeline.
expPipelineAlerts := map[string][]*types.Alert{
"recv1": {
{
Alert: model.Alert{
Annotations: model.LabelSet{"msg": "AlertTwo annotation"},
Labels: model.LabelSet{"alertname": "AlertTwo"},
},
},
},
"recv2": {
{
Alert: model.Alert{
Annotations: model.LabelSet{"msg": "AlertTwo annotation"},
Labels: model.LabelSet{"alertname": "AlertTwo"},
}, },
}, { }, {
Alert: model.Alert{ Alert: model.Alert{
Annotations: model.LabelSet{"msg": "AlertThree annotation"}, Annotations: model.LabelSet{"msg": "Alert2 annotation"},
Labels: model.LabelSet{"alertname": "AlertThree"}, Labels: model.LabelSet{"alertname": "Alert2"},
EndsAt: endTime,
GeneratorURL: "http://localhost/url2",
}, },
}, }, {
},
"recv3": {
{
Alert: model.Alert{ Alert: model.Alert{
Annotations: model.LabelSet{"msg": "AlertThree annotation"}, Annotations: model.LabelSet{"msg": "Alert3 annotation"},
Labels: model.LabelSet{"alertname": "AlertThree"}, Labels: model.LabelSet{"alertname": "Alert3"},
StartsAt: startTime,
GeneratorURL: "http://localhost/url3",
}, },
}, {
Alert: model.Alert{
Annotations: model.LabelSet{"msg": "Alert4 annotation"},
Labels: model.LabelSet{"alertname": "Alert4"},
GeneratorURL: "http://localhost/url4",
}, },
}, },
} }
require.Equal(t, expPipelineAlerts, stage.alerts) require.Equal(t, expProviderAlerts, alertProvider.alerts)
} }
type mockAlertProvider struct { type mockAlertProvider struct {
@ -114,16 +111,3 @@ func (a *mockAlertProvider) Put(alerts ...*types.Alert) error {
a.alerts = append(a.alerts, alerts...) a.alerts = append(a.alerts, alerts...)
return nil return nil
} }
type mockStage struct {
alerts map[string][]*types.Alert
}
func (s *mockStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
recv, ok := notify.ReceiverName(ctx)
if !ok {
return ctx, nil, errors.New("receiver name not found")
}
s.alerts[recv] = append(s.alerts[recv], alerts...)
return ctx, nil, nil
}

View File

@ -123,7 +123,7 @@ func (am *Alertmanager) Init() (err error) {
return errors.Wrap(err, "unable to initialize the silencing component of alerting") return errors.Wrap(err, "unable to initialize the silencing component of alerting")
} }
am.alerts, err = NewAlertProvider(nil, am.marker) am.alerts, err = NewAlertProvider(am.marker)
if err != nil { if err != nil {
return errors.Wrap(err, "unable to initialize the alert provider component of alerting") return errors.Wrap(err, "unable to initialize the alert provider component of alerting")
} }
@ -267,8 +267,6 @@ func (am *Alertmanager) applyConfig(cfg *apimodels.PostableUserConfig) error {
routingStage[name] = notify.MultiStage{silencingStage, stage} routingStage[name] = notify.MultiStage{silencingStage, stage}
} }
am.alerts.SetStage(routingStage)
am.StopAndWait() am.StopAndWait()
am.route = dispatch.NewRoute(cfg.AlertmanagerConfig.Route, nil) am.route = dispatch.NewRoute(cfg.AlertmanagerConfig.Route, nil)
am.dispatcher = dispatch.NewDispatcher(am.alerts, am.route, routingStage, am.marker, timeoutFunc, gokit_log.NewNopLogger(), am.dispatcherMetrics) am.dispatcher = dispatch.NewDispatcher(am.alerts, am.route, routingStage, am.marker, timeoutFunc, gokit_log.NewNopLogger(), am.dispatcherMetrics)
@ -347,8 +345,8 @@ func (am *Alertmanager) buildReceiverIntegrations(receiver *apimodels.PostableAp
} }
// PutAlerts receives the alerts and then sends them through the corresponding route based on whenever the alert has a receiver embedded or not // PutAlerts receives the alerts and then sends them through the corresponding route based on whenever the alert has a receiver embedded or not
func (am *Alertmanager) PutAlerts(alerts ...*PostableAlert) error { func (am *Alertmanager) PutAlerts(alerts apimodels.PostableAlerts) error {
return am.alerts.PutPostableAlert(alerts...) return am.alerts.PutPostableAlert(alerts)
} }
// createReceiverStage creates a pipeline of stages for a receiver. // createReceiverStage creates a pipeline of stages for a receiver.

View File

@ -2,25 +2,25 @@ package schedule
import ( import (
"github.com/go-openapi/strfmt" "github.com/go-openapi/strfmt"
"github.com/grafana/grafana/pkg/services/ngalert/eval" apimodels "github.com/grafana/alerting-api/pkg/api"
"github.com/grafana/grafana/pkg/services/ngalert/notifier"
"github.com/grafana/grafana/pkg/services/ngalert/state"
"github.com/prometheus/alertmanager/api/v2/models" "github.com/prometheus/alertmanager/api/v2/models"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/services/ngalert/state"
) )
func FromAlertStateToPostableAlerts(firingStates []state.AlertState) []*notifier.PostableAlert { func FromAlertStateToPostableAlerts(firingStates []state.AlertState) apimodels.PostableAlerts {
alerts := make([]*notifier.PostableAlert, 0, len(firingStates)) alerts := apimodels.PostableAlerts{PostableAlerts: make([]models.PostableAlert, 0, len(firingStates))}
for _, alertState := range firingStates { for _, alertState := range firingStates {
if alertState.State == eval.Alerting { if alertState.State == eval.Alerting {
alerts = append(alerts, &notifier.PostableAlert{ alerts.PostableAlerts = append(alerts.PostableAlerts, models.PostableAlert{
PostableAlert: models.PostableAlert{
Annotations: alertState.Annotations, Annotations: alertState.Annotations,
StartsAt: strfmt.DateTime(alertState.StartsAt), StartsAt: strfmt.DateTime(alertState.StartsAt),
EndsAt: strfmt.DateTime(alertState.EndsAt), EndsAt: strfmt.DateTime(alertState.EndsAt),
Alert: models.Alert{ Alert: models.Alert{
Labels: models.LabelSet(alertState.Labels), Labels: models.LabelSet(alertState.Labels),
}, },
},
}) })
} }
} }

View File

@ -6,18 +6,16 @@ import (
"sync" "sync"
"time" "time"
"golang.org/x/sync/errgroup"
"github.com/benbjohnson/clock" "github.com/benbjohnson/clock"
apimodels "github.com/grafana/alerting-api/pkg/api"
"github.com/grafana/grafana/pkg/services/ngalert/models" "golang.org/x/sync/errgroup"
"github.com/grafana/grafana/pkg/services/ngalert/notifier"
"github.com/grafana/grafana/pkg/services/ngalert/state"
"github.com/grafana/grafana/pkg/services/ngalert/store"
"github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/alerting" "github.com/grafana/grafana/pkg/services/alerting"
"github.com/grafana/grafana/pkg/services/ngalert/eval" "github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/state"
"github.com/grafana/grafana/pkg/services/ngalert/store"
"github.com/grafana/grafana/pkg/tsdb" "github.com/grafana/grafana/pkg/tsdb"
) )
@ -83,10 +81,10 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul
processedStates := stateTracker.ProcessEvalResults(alertRule, results) processedStates := stateTracker.ProcessEvalResults(alertRule, results)
sch.saveAlertStates(processedStates) sch.saveAlertStates(processedStates)
alerts := FromAlertStateToPostableAlerts(processedStates) alerts := FromAlertStateToPostableAlerts(processedStates)
sch.log.Debug("sending alerts to notifier", "count", len(alerts)) sch.log.Debug("sending alerts to notifier", "count", len(alerts.PostableAlerts))
err = sch.sendAlerts(alerts) err = sch.sendAlerts(alerts)
if err != nil { if err != nil {
sch.log.Error("failed to put alerts in the notifier", "count", len(alerts), "err", err) sch.log.Error("failed to put alerts in the notifier", "count", len(alerts.PostableAlerts), "err", err)
} }
return nil return nil
} }
@ -118,7 +116,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul
// Notifier handles the delivery of alert notifications to the end user // Notifier handles the delivery of alert notifications to the end user
type Notifier interface { type Notifier interface {
PutAlerts(alerts ...*notifier.PostableAlert) error PutAlerts(alerts apimodels.PostableAlerts) error
} }
type schedule struct { type schedule struct {
@ -314,8 +312,8 @@ func (sch *schedule) Ticker(grafanaCtx context.Context, stateTracker *state.Stat
} }
} }
func (sch *schedule) sendAlerts(alerts []*notifier.PostableAlert) error { func (sch *schedule) sendAlerts(alerts apimodels.PostableAlerts) error {
return sch.notifier.PutAlerts(alerts...) return sch.notifier.PutAlerts(alerts)
} }
func (sch *schedule) saveAlertStates(states []state.AlertState) { func (sch *schedule) saveAlertStates(states []state.AlertState) {