Alerting: Send alerts to the remote Alertmanager (#77034)

* Alerting: Rename remote.ExternalAlertmanager to remote.Alertmanager

* Alerting: Send alerts to the remote Alertmanager

* add ticker to readiness check, add tests

* use options when creating a new sender.ExternaAlertmanager

* unexport defaultMaxQueueCapacity

* delete unused defaultConfig field

* add debug log line when sending alerts to the remote alertmanager

* move and refactor readiness check

* update tests to not include defaultConfig
This commit is contained in:
Santiago
2023-10-25 11:52:48 +02:00
committed by GitHub
parent 37dbf037de
commit 01add144b8
5 changed files with 153 additions and 120 deletions

View File

@@ -7,14 +7,15 @@ import (
"net/http"
"net/url"
"strings"
"time"
httptransport "github.com/go-openapi/runtime/client"
"github.com/go-openapi/strfmt"
alertingNotify "github.com/grafana/alerting/notify"
"github.com/grafana/grafana/pkg/infra/log"
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/notifier"
"github.com/grafana/grafana/pkg/services/ngalert/sender"
amclient "github.com/prometheus/alertmanager/api/v2/client"
amalert "github.com/prometheus/alertmanager/api/v2/client/alert"
amalertgroup "github.com/prometheus/alertmanager/api/v2/client/alertgroup"
@@ -22,21 +23,24 @@ import (
amsilence "github.com/prometheus/alertmanager/api/v2/client/silence"
)
const readyPath = "/-/ready"
type Alertmanager struct {
log log.Logger
url string
tenantID string
orgID int64
amClient *amclient.AlertmanagerAPI
httpClient *http.Client
defaultConfig string
log log.Logger
orgID int64
tenantID string
url string
amClient *amclient.AlertmanagerAPI
httpClient *http.Client
ready bool
sender *sender.ExternalAlertmanager
}
type AlertmanagerConfig struct {
URL string
TenantID string
BasicAuthPassword string
DefaultConfig string
}
func NewAlertmanager(cfg AlertmanagerConfig, orgID int64) (*Alertmanager, error) {
@@ -56,28 +60,83 @@ func NewAlertmanager(cfg AlertmanagerConfig, orgID int64) (*Alertmanager, error)
if err != nil {
return nil, err
}
u = u.JoinPath(amclient.DefaultBasePath)
u = u.JoinPath(amclient.DefaultBasePath)
transport := httptransport.NewWithClient(u.Host, u.Path, []string{u.Scheme}, &client)
_, err = notifier.Load([]byte(cfg.DefaultConfig))
// Using our client with custom headers and basic auth credentials.
doFunc := func(ctx context.Context, _ *http.Client, req *http.Request) (*http.Response, error) {
return client.Do(req.WithContext(ctx))
}
s := sender.NewExternalAlertmanagerSender(sender.WithDoFunc(doFunc))
s.Run()
err = s.ApplyConfig(orgID, 0, []sender.ExternalAMcfg{{
URL: cfg.URL,
}})
if err != nil {
return nil, err
}
return &Alertmanager{
amClient: amclient.New(transport, nil),
httpClient: &client,
log: log.New("ngalert.notifier.external-alertmanager"),
url: cfg.URL,
tenantID: cfg.TenantID,
orgID: orgID,
defaultConfig: cfg.DefaultConfig,
amClient: amclient.New(transport, nil),
httpClient: &client,
log: log.New("ngalert.remote.alertmanager"),
sender: s,
orgID: orgID,
tenantID: cfg.TenantID,
url: cfg.URL,
}, nil
}
func (am *Alertmanager) ApplyConfig(ctx context.Context, config *models.AlertConfiguration) error {
return nil
if am.ready {
return nil
}
return am.checkReadiness(ctx)
}
func (am *Alertmanager) checkReadiness(ctx context.Context) error {
readyURL := strings.TrimSuffix(am.url, "/") + readyPath
req, err := http.NewRequestWithContext(ctx, http.MethodGet, readyURL, nil)
if err != nil {
return fmt.Errorf("error creating readiness request: %w", err)
}
res, err := am.httpClient.Do(req)
if err != nil {
return fmt.Errorf("error performing readiness check: %w", err)
}
defer func() {
if err := res.Body.Close(); err != nil {
am.log.Warn("Error closing response body", "err", err)
}
}()
if res.StatusCode != http.StatusOK {
return fmt.Errorf("%w, status code: %d", notifier.ErrAlertmanagerNotReady, res.StatusCode)
}
// Wait for active senders.
var attempts int
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
attempts++
if len(am.sender.Alertmanagers()) > 0 {
am.log.Debug("Alertmanager readiness check successful", "attempts", attempts)
am.ready = true
return nil
}
case <-time.After(10 * time.Second):
return notifier.ErrAlertmanagerNotReady
}
}
}
func (am *Alertmanager) SaveAndApplyConfig(ctx context.Context, cfg *apimodels.PostableUserConfig) error {
@@ -195,29 +254,10 @@ func (am *Alertmanager) GetAlertGroups(ctx context.Context, active, silenced, in
return res.Payload, nil
}
// TODO: implement PutAlerts in a way that is similar to what Prometheus does.
// This current implementation is only good for testing methods that retrieve alerts from the remote Alertmanager.
// More details in issue https://github.com/grafana/grafana/issues/76692
func (am *Alertmanager) PutAlerts(ctx context.Context, postableAlerts apimodels.PostableAlerts) error {
defer func() {
if r := recover(); r != nil {
am.log.Error("Panic while putting alerts", "err", r)
}
}()
alerts := make(alertingNotify.PostableAlerts, 0, len(postableAlerts.PostableAlerts))
for _, pa := range postableAlerts.PostableAlerts {
alerts = append(alerts, &alertingNotify.PostableAlert{
Annotations: pa.Annotations,
EndsAt: pa.EndsAt,
StartsAt: pa.StartsAt,
Alert: pa.Alert,
})
}
params := amalert.NewPostAlertsParamsWithContext(ctx).WithAlerts(alerts)
_, err := am.amClient.Alert.PostAlerts(params)
return err
func (am *Alertmanager) PutAlerts(ctx context.Context, alerts apimodels.PostableAlerts) error {
am.log.Debug("Sending alerts to a remote alertmanager", "url", am.url, "alerts", len(alerts.PostableAlerts))
am.sender.SendAlerts(alerts)
return nil
}
func (am *Alertmanager) GetStatus() apimodels.GettableStatus {
@@ -247,10 +287,11 @@ func (am *Alertmanager) TestTemplate(ctx context.Context, c apimodels.TestTempla
}
func (am *Alertmanager) StopAndWait() {
am.sender.Stop()
}
func (am *Alertmanager) Ready() bool {
return false
return am.ready
}
func (am *Alertmanager) FileStore() *notifier.FileStore {

View File

@@ -14,57 +14,32 @@ import (
"github.com/stretchr/testify/require"
)
const (
validConfig = `{"template_files":{},"alertmanager_config":{"route":{"receiver":"grafana-default-email","group_by":["grafana_folder","alertname"]},"templates":null,"receivers":[{"name":"grafana-default-email","grafana_managed_receiver_configs":[{"uid":"","name":"some other name","type":"email","disableResolveMessage":false,"settings":{"addresses":"\u003cexample@email.com\u003e"},"secureSettings":null}]}]}}`
// Valid config for Cloud AM, no `grafana_managed_receievers` field.
upstreamConfig = `{"template_files": {}, "alertmanager_config": "{\"global\": {\"smtp_from\": \"test@test.com\"}, \"route\": {\"receiver\": \"discord\"}, \"receivers\": [{\"name\": \"discord\", \"discord_configs\": [{\"webhook_url\": \"http://localhost:1234\"}]}]}"}`
)
// Valid config for Cloud AM, no `grafana_managed_receievers` field.
const upstreamConfig = `{"template_files": {}, "alertmanager_config": "{\"global\": {\"smtp_from\": \"test@test.com\"}, \"route\": {\"receiver\": \"discord\"}, \"receivers\": [{\"name\": \"discord\", \"discord_configs\": [{\"webhook_url\": \"http://localhost:1234\"}]}]}"}`
func TestNewAlertmanager(t *testing.T) {
tests := []struct {
name string
url string
tenantID string
password string
orgID int64
defaultConfig string
expErr string
name string
url string
tenantID string
password string
orgID int64
expErr string
}{
{
name: "empty URL",
url: "",
tenantID: "1234",
password: "test",
defaultConfig: validConfig,
orgID: 1,
expErr: "empty URL for tenant 1234",
name: "empty URL",
url: "",
tenantID: "1234",
password: "test",
orgID: 1,
expErr: "empty URL for tenant 1234",
},
{
name: "empty default config",
url: "http://localhost:8080",
tenantID: "1234",
defaultConfig: "",
password: "test",
orgID: 1,
expErr: "unable to parse Alertmanager configuration: unexpected end of JSON input",
},
{
name: "invalid default config",
url: "http://localhost:8080",
tenantID: "1234",
defaultConfig: `{"invalid": true}`,
password: "test",
orgID: 1,
expErr: "unable to parse Alertmanager configuration: no route provided in config",
},
{
name: "valid parameters",
url: "http://localhost:8080",
tenantID: "1234",
defaultConfig: validConfig,
password: "test",
orgID: 1,
name: "valid parameters",
url: "http://localhost:8080",
tenantID: "1234",
password: "test",
orgID: 1,
},
}
@@ -74,7 +49,6 @@ func TestNewAlertmanager(t *testing.T) {
URL: test.url,
TenantID: test.tenantID,
BasicAuthPassword: test.password,
DefaultConfig: test.defaultConfig,
}
am, err := NewAlertmanager(cfg, test.orgID)
if test.expErr != "" {
@@ -85,7 +59,6 @@ func TestNewAlertmanager(t *testing.T) {
require.NoError(tt, err)
require.Equal(tt, am.tenantID, test.tenantID)
require.Equal(tt, am.url, test.url)
require.Equal(tt, am.defaultConfig, test.defaultConfig)
require.Equal(tt, am.OrgID(), test.orgID)
require.NotNil(tt, am.amClient)
require.NotNil(tt, am.httpClient)
@@ -109,7 +82,6 @@ func TestIntegrationRemoteAlertmanagerSilences(t *testing.T) {
URL: amURL + "/alertmanager",
TenantID: tenantID,
BasicAuthPassword: password,
DefaultConfig: validConfig,
}
am, err := NewAlertmanager(cfg, 1)
require.NoError(t, err)
@@ -189,11 +161,14 @@ func TestIntegrationRemoteAlertmanagerAlerts(t *testing.T) {
URL: amURL + "/alertmanager",
TenantID: tenantID,
BasicAuthPassword: password,
DefaultConfig: validConfig,
}
am, err := NewAlertmanager(cfg, 1)
require.NoError(t, err)
// Wait until the Alertmanager is ready to send alerts.
require.NoError(t, am.checkReadiness(context.Background()))
require.True(t, am.Ready())
// We should have no alerts and no groups at first.
alerts, err := am.GetAlerts(context.Background(), true, true, true, []string{}, "")
require.NoError(t, err)
@@ -214,9 +189,11 @@ func TestIntegrationRemoteAlertmanagerAlerts(t *testing.T) {
require.NoError(t, err)
// We should have two alerts and one group now.
alerts, err = am.GetAlerts(context.Background(), true, true, true, []string{}, "")
require.NoError(t, err)
require.Equal(t, 2, len(alerts))
require.Eventually(t, func() bool {
alerts, err = am.GetAlerts(context.Background(), true, true, true, []string{}, "")
require.NoError(t, err)
return len(alerts) == 2
}, 16*time.Second, 1*time.Second)
alertGroups, err = am.GetAlertGroups(context.Background(), true, true, true, []string{}, "")
require.NoError(t, err)
@@ -245,7 +222,6 @@ func TestIntegrationRemoteAlertmanagerReceivers(t *testing.T) {
URL: amURL + "/alertmanager",
TenantID: tenantID,
BasicAuthPassword: password,
DefaultConfig: validConfig,
}
am, err := NewAlertmanager(cfg, 1)