From 01add144b86ca42e848c6b15373fd70924bbc84b Mon Sep 17 00:00:00 2001 From: Santiago Date: Wed, 25 Oct 2023 11:52:48 +0200 Subject: [PATCH] Alerting: Send alerts to the remote Alertmanager (#77034) * Alerting: Rename remote.ExternalAlertmanager to remote.Alertmanager * Alerting: Send alerts to the remote Alertmanager * add ticker to readiness check, add tests * use options when creating a new sender.ExternaAlertmanager * unexport defaultMaxQueueCapacity * delete unused defaultConfig field * add debug log line when sending alerts to the remote alertmanager * move and refactor readiness check * update tests to not include defaultConfig --- pkg/services/ngalert/remote/alertmanager.go | 127 ++++++++++++------ .../ngalert/remote/alertmanager_test.go | 80 ++++------- pkg/services/ngalert/sender/router.go | 14 +- pkg/services/ngalert/sender/router_test.go | 6 +- pkg/services/ngalert/sender/sender.go | 46 ++++--- 5 files changed, 153 insertions(+), 120 deletions(-) diff --git a/pkg/services/ngalert/remote/alertmanager.go b/pkg/services/ngalert/remote/alertmanager.go index 52ca4d6730d..073400a333d 100644 --- a/pkg/services/ngalert/remote/alertmanager.go +++ b/pkg/services/ngalert/remote/alertmanager.go @@ -7,14 +7,15 @@ import ( "net/http" "net/url" "strings" + "time" httptransport "github.com/go-openapi/runtime/client" "github.com/go-openapi/strfmt" - alertingNotify "github.com/grafana/alerting/notify" "github.com/grafana/grafana/pkg/infra/log" apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions" "github.com/grafana/grafana/pkg/services/ngalert/models" "github.com/grafana/grafana/pkg/services/ngalert/notifier" + "github.com/grafana/grafana/pkg/services/ngalert/sender" amclient "github.com/prometheus/alertmanager/api/v2/client" amalert "github.com/prometheus/alertmanager/api/v2/client/alert" amalertgroup "github.com/prometheus/alertmanager/api/v2/client/alertgroup" @@ -22,21 +23,24 @@ import ( amsilence "github.com/prometheus/alertmanager/api/v2/client/silence" ) +const readyPath = "/-/ready" + type Alertmanager struct { - log log.Logger - url string - tenantID string - orgID int64 - amClient *amclient.AlertmanagerAPI - httpClient *http.Client - defaultConfig string + log log.Logger + orgID int64 + tenantID string + url string + + amClient *amclient.AlertmanagerAPI + httpClient *http.Client + ready bool + sender *sender.ExternalAlertmanager } type AlertmanagerConfig struct { URL string TenantID string BasicAuthPassword string - DefaultConfig string } func NewAlertmanager(cfg AlertmanagerConfig, orgID int64) (*Alertmanager, error) { @@ -56,28 +60,83 @@ func NewAlertmanager(cfg AlertmanagerConfig, orgID int64) (*Alertmanager, error) if err != nil { return nil, err } - u = u.JoinPath(amclient.DefaultBasePath) + u = u.JoinPath(amclient.DefaultBasePath) transport := httptransport.NewWithClient(u.Host, u.Path, []string{u.Scheme}, &client) - _, err = notifier.Load([]byte(cfg.DefaultConfig)) + // Using our client with custom headers and basic auth credentials. + doFunc := func(ctx context.Context, _ *http.Client, req *http.Request) (*http.Response, error) { + return client.Do(req.WithContext(ctx)) + } + s := sender.NewExternalAlertmanagerSender(sender.WithDoFunc(doFunc)) + s.Run() + + err = s.ApplyConfig(orgID, 0, []sender.ExternalAMcfg{{ + URL: cfg.URL, + }}) if err != nil { return nil, err } return &Alertmanager{ - amClient: amclient.New(transport, nil), - httpClient: &client, - log: log.New("ngalert.notifier.external-alertmanager"), - url: cfg.URL, - tenantID: cfg.TenantID, - orgID: orgID, - defaultConfig: cfg.DefaultConfig, + amClient: amclient.New(transport, nil), + httpClient: &client, + log: log.New("ngalert.remote.alertmanager"), + sender: s, + orgID: orgID, + tenantID: cfg.TenantID, + url: cfg.URL, }, nil } func (am *Alertmanager) ApplyConfig(ctx context.Context, config *models.AlertConfiguration) error { - return nil + if am.ready { + return nil + } + + return am.checkReadiness(ctx) +} + +func (am *Alertmanager) checkReadiness(ctx context.Context) error { + readyURL := strings.TrimSuffix(am.url, "/") + readyPath + req, err := http.NewRequestWithContext(ctx, http.MethodGet, readyURL, nil) + if err != nil { + return fmt.Errorf("error creating readiness request: %w", err) + } + + res, err := am.httpClient.Do(req) + if err != nil { + return fmt.Errorf("error performing readiness check: %w", err) + } + + defer func() { + if err := res.Body.Close(); err != nil { + am.log.Warn("Error closing response body", "err", err) + } + }() + + if res.StatusCode != http.StatusOK { + return fmt.Errorf("%w, status code: %d", notifier.ErrAlertmanagerNotReady, res.StatusCode) + } + + // Wait for active senders. + var attempts int + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + attempts++ + if len(am.sender.Alertmanagers()) > 0 { + am.log.Debug("Alertmanager readiness check successful", "attempts", attempts) + am.ready = true + return nil + } + case <-time.After(10 * time.Second): + return notifier.ErrAlertmanagerNotReady + } + } } func (am *Alertmanager) SaveAndApplyConfig(ctx context.Context, cfg *apimodels.PostableUserConfig) error { @@ -195,29 +254,10 @@ func (am *Alertmanager) GetAlertGroups(ctx context.Context, active, silenced, in return res.Payload, nil } -// TODO: implement PutAlerts in a way that is similar to what Prometheus does. -// This current implementation is only good for testing methods that retrieve alerts from the remote Alertmanager. -// More details in issue https://github.com/grafana/grafana/issues/76692 -func (am *Alertmanager) PutAlerts(ctx context.Context, postableAlerts apimodels.PostableAlerts) error { - defer func() { - if r := recover(); r != nil { - am.log.Error("Panic while putting alerts", "err", r) - } - }() - - alerts := make(alertingNotify.PostableAlerts, 0, len(postableAlerts.PostableAlerts)) - for _, pa := range postableAlerts.PostableAlerts { - alerts = append(alerts, &alertingNotify.PostableAlert{ - Annotations: pa.Annotations, - EndsAt: pa.EndsAt, - StartsAt: pa.StartsAt, - Alert: pa.Alert, - }) - } - - params := amalert.NewPostAlertsParamsWithContext(ctx).WithAlerts(alerts) - _, err := am.amClient.Alert.PostAlerts(params) - return err +func (am *Alertmanager) PutAlerts(ctx context.Context, alerts apimodels.PostableAlerts) error { + am.log.Debug("Sending alerts to a remote alertmanager", "url", am.url, "alerts", len(alerts.PostableAlerts)) + am.sender.SendAlerts(alerts) + return nil } func (am *Alertmanager) GetStatus() apimodels.GettableStatus { @@ -247,10 +287,11 @@ func (am *Alertmanager) TestTemplate(ctx context.Context, c apimodels.TestTempla } func (am *Alertmanager) StopAndWait() { + am.sender.Stop() } func (am *Alertmanager) Ready() bool { - return false + return am.ready } func (am *Alertmanager) FileStore() *notifier.FileStore { diff --git a/pkg/services/ngalert/remote/alertmanager_test.go b/pkg/services/ngalert/remote/alertmanager_test.go index 741d7a8ea76..822c2c6f05a 100644 --- a/pkg/services/ngalert/remote/alertmanager_test.go +++ b/pkg/services/ngalert/remote/alertmanager_test.go @@ -14,57 +14,32 @@ import ( "github.com/stretchr/testify/require" ) -const ( - validConfig = `{"template_files":{},"alertmanager_config":{"route":{"receiver":"grafana-default-email","group_by":["grafana_folder","alertname"]},"templates":null,"receivers":[{"name":"grafana-default-email","grafana_managed_receiver_configs":[{"uid":"","name":"some other name","type":"email","disableResolveMessage":false,"settings":{"addresses":"\u003cexample@email.com\u003e"},"secureSettings":null}]}]}}` - - // Valid config for Cloud AM, no `grafana_managed_receievers` field. - upstreamConfig = `{"template_files": {}, "alertmanager_config": "{\"global\": {\"smtp_from\": \"test@test.com\"}, \"route\": {\"receiver\": \"discord\"}, \"receivers\": [{\"name\": \"discord\", \"discord_configs\": [{\"webhook_url\": \"http://localhost:1234\"}]}]}"}` -) +// Valid config for Cloud AM, no `grafana_managed_receievers` field. +const upstreamConfig = `{"template_files": {}, "alertmanager_config": "{\"global\": {\"smtp_from\": \"test@test.com\"}, \"route\": {\"receiver\": \"discord\"}, \"receivers\": [{\"name\": \"discord\", \"discord_configs\": [{\"webhook_url\": \"http://localhost:1234\"}]}]}"}` func TestNewAlertmanager(t *testing.T) { tests := []struct { - name string - url string - tenantID string - password string - orgID int64 - defaultConfig string - expErr string + name string + url string + tenantID string + password string + orgID int64 + expErr string }{ { - name: "empty URL", - url: "", - tenantID: "1234", - password: "test", - defaultConfig: validConfig, - orgID: 1, - expErr: "empty URL for tenant 1234", + name: "empty URL", + url: "", + tenantID: "1234", + password: "test", + orgID: 1, + expErr: "empty URL for tenant 1234", }, { - name: "empty default config", - url: "http://localhost:8080", - tenantID: "1234", - defaultConfig: "", - password: "test", - orgID: 1, - expErr: "unable to parse Alertmanager configuration: unexpected end of JSON input", - }, - { - name: "invalid default config", - url: "http://localhost:8080", - tenantID: "1234", - defaultConfig: `{"invalid": true}`, - password: "test", - orgID: 1, - expErr: "unable to parse Alertmanager configuration: no route provided in config", - }, - { - name: "valid parameters", - url: "http://localhost:8080", - tenantID: "1234", - defaultConfig: validConfig, - password: "test", - orgID: 1, + name: "valid parameters", + url: "http://localhost:8080", + tenantID: "1234", + password: "test", + orgID: 1, }, } @@ -74,7 +49,6 @@ func TestNewAlertmanager(t *testing.T) { URL: test.url, TenantID: test.tenantID, BasicAuthPassword: test.password, - DefaultConfig: test.defaultConfig, } am, err := NewAlertmanager(cfg, test.orgID) if test.expErr != "" { @@ -85,7 +59,6 @@ func TestNewAlertmanager(t *testing.T) { require.NoError(tt, err) require.Equal(tt, am.tenantID, test.tenantID) require.Equal(tt, am.url, test.url) - require.Equal(tt, am.defaultConfig, test.defaultConfig) require.Equal(tt, am.OrgID(), test.orgID) require.NotNil(tt, am.amClient) require.NotNil(tt, am.httpClient) @@ -109,7 +82,6 @@ func TestIntegrationRemoteAlertmanagerSilences(t *testing.T) { URL: amURL + "/alertmanager", TenantID: tenantID, BasicAuthPassword: password, - DefaultConfig: validConfig, } am, err := NewAlertmanager(cfg, 1) require.NoError(t, err) @@ -189,11 +161,14 @@ func TestIntegrationRemoteAlertmanagerAlerts(t *testing.T) { URL: amURL + "/alertmanager", TenantID: tenantID, BasicAuthPassword: password, - DefaultConfig: validConfig, } am, err := NewAlertmanager(cfg, 1) require.NoError(t, err) + // Wait until the Alertmanager is ready to send alerts. + require.NoError(t, am.checkReadiness(context.Background())) + require.True(t, am.Ready()) + // We should have no alerts and no groups at first. alerts, err := am.GetAlerts(context.Background(), true, true, true, []string{}, "") require.NoError(t, err) @@ -214,9 +189,11 @@ func TestIntegrationRemoteAlertmanagerAlerts(t *testing.T) { require.NoError(t, err) // We should have two alerts and one group now. - alerts, err = am.GetAlerts(context.Background(), true, true, true, []string{}, "") - require.NoError(t, err) - require.Equal(t, 2, len(alerts)) + require.Eventually(t, func() bool { + alerts, err = am.GetAlerts(context.Background(), true, true, true, []string{}, "") + require.NoError(t, err) + return len(alerts) == 2 + }, 16*time.Second, 1*time.Second) alertGroups, err = am.GetAlertGroups(context.Background(), true, true, true, []string{}, "") require.NoError(t, err) @@ -245,7 +222,6 @@ func TestIntegrationRemoteAlertmanagerReceivers(t *testing.T) { URL: amURL + "/alertmanager", TenantID: tenantID, BasicAuthPassword: password, - DefaultConfig: validConfig, } am, err := NewAlertmanager(cfg, 1) diff --git a/pkg/services/ngalert/sender/router.go b/pkg/services/ngalert/sender/router.go index cce4af1399b..6cac839f3cb 100644 --- a/pkg/services/ngalert/sender/router.go +++ b/pkg/services/ngalert/sender/router.go @@ -188,10 +188,10 @@ func (d *AlertsRouter) SyncAndApplyConfigFromDatabase() error { return nil } -func buildRedactedAMs(l log.Logger, alertmanagers []externalAMcfg, ordId int64) []string { +func buildRedactedAMs(l log.Logger, alertmanagers []ExternalAMcfg, ordId int64) []string { var redactedAMs []string for _, am := range alertmanagers { - parsedAM, err := url.Parse(am.amURL) + parsedAM, err := url.Parse(am.URL) if err != nil { l.Error("Failed to parse alertmanager string", "org", ordId, "error", err) continue @@ -208,9 +208,9 @@ func asSHA256(strings []string) string { return fmt.Sprintf("%x", h.Sum(nil)) } -func (d *AlertsRouter) alertmanagersFromDatasources(orgID int64) ([]externalAMcfg, error) { +func (d *AlertsRouter) alertmanagersFromDatasources(orgID int64) ([]ExternalAMcfg, error) { var ( - alertmanagers []externalAMcfg + alertmanagers []ExternalAMcfg ) // We might have alertmanager datasources that are acting as external // alertmanager, let's fetch them. @@ -246,9 +246,9 @@ func (d *AlertsRouter) alertmanagersFromDatasources(orgID int64) ([]externalAMcf "error", err) continue } - alertmanagers = append(alertmanagers, externalAMcfg{ - amURL: amURL, - headers: headers, + alertmanagers = append(alertmanagers, ExternalAMcfg{ + URL: amURL, + Headers: headers, }) } return alertmanagers, nil diff --git a/pkg/services/ngalert/sender/router_test.go b/pkg/services/ngalert/sender/router_test.go index 5fb627ca09d..e9cfafeed8f 100644 --- a/pkg/services/ngalert/sender/router_test.go +++ b/pkg/services/ngalert/sender/router_test.go @@ -590,10 +590,10 @@ func TestAlertManagers_buildRedactedAMs(t *testing.T) { for _, tt := range tc { t.Run(tt.name, func(t *testing.T) { - var cfgs []externalAMcfg + var cfgs []ExternalAMcfg for _, url := range tt.amUrls { - cfgs = append(cfgs, externalAMcfg{ - amURL: url, + cfgs = append(cfgs, ExternalAMcfg{ + URL: url, }) } require.Equal(t, tt.expected, buildRedactedAMs(&fakeLogger, cfgs, tt.orgId)) diff --git a/pkg/services/ngalert/sender/sender.go b/pkg/services/ngalert/sender/sender.go index 8e09015a463..eef29402c05 100644 --- a/pkg/services/ngalert/sender/sender.go +++ b/pkg/services/ngalert/sender/sender.go @@ -5,6 +5,7 @@ import ( "crypto/md5" "errors" "fmt" + "net/http" "net/url" "sort" "strings" @@ -40,35 +41,46 @@ type ExternalAlertmanager struct { sdManager *discovery.Manager } -type externalAMcfg struct { - amURL string - headers map[string]string +type ExternalAMcfg struct { + URL string + Headers map[string]string } -func (cfg *externalAMcfg) SHA256() string { - return asSHA256([]string{cfg.headerString(), cfg.amURL}) +type Option func(*ExternalAlertmanager) + +type doFunc func(context.Context, *http.Client, *http.Request) (*http.Response, error) + +// WithDoFunc receives a function to use when making HTTP requests from the Manager. +func WithDoFunc(doFunc doFunc) Option { + return func(s *ExternalAlertmanager) { + s.manager.opts.Do = doFunc + } +} + +func (cfg *ExternalAMcfg) SHA256() string { + return asSHA256([]string{cfg.headerString(), cfg.URL}) } // headersString transforms all the headers in a sorted way as a // single string so it can be used for hashing and comparing. -func (cfg *externalAMcfg) headerString() string { +func (cfg *ExternalAMcfg) headerString() string { var result strings.Builder - headerKeys := make([]string, 0, len(cfg.headers)) - for key := range cfg.headers { + headerKeys := make([]string, 0, len(cfg.Headers)) + for key := range cfg.Headers { headerKeys = append(headerKeys, key) } sort.Strings(headerKeys) for _, key := range headerKeys { - result.WriteString(fmt.Sprintf("%s:%s", key, cfg.headers[key])) + result.WriteString(fmt.Sprintf("%s:%s", key, cfg.Headers[key])) } return result.String() } -func NewExternalAlertmanagerSender() *ExternalAlertmanager { +func NewExternalAlertmanagerSender(opts ...Option) *ExternalAlertmanager { l := log.New("ngalert.sender.external-alertmanager") sdCtx, sdCancel := context.WithCancel(context.Background()) s := &ExternalAlertmanager{ @@ -85,11 +97,15 @@ func NewExternalAlertmanagerSender() *ExternalAlertmanager { s.sdManager = discovery.NewManager(sdCtx, s.logger) + for _, opt := range opts { + opt(s) + } + return s } // ApplyConfig syncs a configuration with the sender. -func (s *ExternalAlertmanager) ApplyConfig(orgId, id int64, alertmanagers []externalAMcfg) error { +func (s *ExternalAlertmanager) ApplyConfig(orgId, id int64, alertmanagers []ExternalAMcfg) error { notifierCfg, headers, err := buildNotifierConfig(alertmanagers) if err != nil { return err @@ -160,11 +176,11 @@ func (s *ExternalAlertmanager) DroppedAlertmanagers() []*url.URL { return s.manager.DroppedAlertmanagers() } -func buildNotifierConfig(alertmanagers []externalAMcfg) (*config.Config, map[string]map[string]string, error) { +func buildNotifierConfig(alertmanagers []ExternalAMcfg) (*config.Config, map[string]map[string]string, error) { amConfigs := make([]*config.AlertmanagerConfig, 0, len(alertmanagers)) headers := map[string]map[string]string{} for i, am := range alertmanagers { - u, err := url.Parse(am.amURL) + u, err := url.Parse(am.URL) if err != nil { return nil, nil, err } @@ -185,10 +201,10 @@ func buildNotifierConfig(alertmanagers []externalAMcfg) (*config.Config, map[str ServiceDiscoveryConfigs: sdConfig, } - if am.headers != nil { + if am.Headers != nil { // The key has the same format as the AlertmanagerConfigs.ToMap() would generate // so we can use it later on when working with the alertmanager config map. - headers[fmt.Sprintf("config-%d", i)] = am.headers + headers[fmt.Sprintf("config-%d", i)] = am.Headers } // Check the URL for basic authentication information first