Alerting: Move ExternalAlertmanager to its own package (#76854)

* Alerting: Move `ExternalAlertmanager` to its own package

We'll avoid import cycles when using components from other packages. In addition to that, I've created an `Options` approach for the multiorg alertmanger to allow us to override how per tenant alertmanagers are created.

* switch things around

* address review comments

* fix references and warnings
This commit is contained in:
gotjosh
2023-10-20 13:08:13 +01:00
committed by GitHub
parent ee484e3bbe
commit 866acbd5ac
4 changed files with 81 additions and 47 deletions

View File

@@ -1,316 +0,0 @@
package notifier
import (
"context"
"fmt"
"io"
"net/http"
"net/url"
"strings"
httptransport "github.com/go-openapi/runtime/client"
"github.com/go-openapi/strfmt"
alertingNotify "github.com/grafana/alerting/notify"
"github.com/grafana/grafana/pkg/infra/log"
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
"github.com/grafana/grafana/pkg/services/ngalert/models"
amclient "github.com/prometheus/alertmanager/api/v2/client"
amalert "github.com/prometheus/alertmanager/api/v2/client/alert"
amalertgroup "github.com/prometheus/alertmanager/api/v2/client/alertgroup"
amreceiver "github.com/prometheus/alertmanager/api/v2/client/receiver"
amsilence "github.com/prometheus/alertmanager/api/v2/client/silence"
)
type externalAlertmanager struct {
log log.Logger
url string
tenantID string
orgID int64
amClient *amclient.AlertmanagerAPI
httpClient *http.Client
defaultConfig string
}
type externalAlertmanagerConfig struct {
URL string
TenantID string
BasicAuthPassword string
DefaultConfig string
}
func newExternalAlertmanager(cfg externalAlertmanagerConfig, orgID int64) (*externalAlertmanager, error) {
client := http.Client{
Transport: &roundTripper{
tenantID: cfg.TenantID,
basicAuthPassword: cfg.BasicAuthPassword,
next: http.DefaultTransport,
},
}
if cfg.URL == "" {
return nil, fmt.Errorf("empty URL for tenant %s", cfg.TenantID)
}
u, err := url.Parse(cfg.URL)
if err != nil {
return nil, err
}
u = u.JoinPath(amclient.DefaultBasePath)
transport := httptransport.NewWithClient(u.Host, u.Path, []string{u.Scheme}, &client)
_, err = Load([]byte(cfg.DefaultConfig))
if err != nil {
return nil, err
}
return &externalAlertmanager{
amClient: amclient.New(transport, nil),
httpClient: &client,
log: log.New("ngalert.notifier.external-alertmanager"),
url: cfg.URL,
tenantID: cfg.TenantID,
orgID: orgID,
defaultConfig: cfg.DefaultConfig,
}, nil
}
func (am *externalAlertmanager) ApplyConfig(ctx context.Context, config *models.AlertConfiguration) error {
return nil
}
func (am *externalAlertmanager) SaveAndApplyConfig(ctx context.Context, cfg *apimodels.PostableUserConfig) error {
return nil
}
func (am *externalAlertmanager) SaveAndApplyDefaultConfig(ctx context.Context) error {
return nil
}
func (am *externalAlertmanager) CreateSilence(ctx context.Context, silence *apimodels.PostableSilence) (string, error) {
defer func() {
if r := recover(); r != nil {
am.log.Error("Panic while creating silence", "err", r)
}
}()
params := amsilence.NewPostSilencesParamsWithContext(ctx).WithSilence(silence)
res, err := am.amClient.Silence.PostSilences(params)
if err != nil {
return "", err
}
return res.Payload.SilenceID, nil
}
func (am *externalAlertmanager) DeleteSilence(ctx context.Context, silenceID string) error {
defer func() {
if r := recover(); r != nil {
am.log.Error("Panic while deleting silence", "err", r)
}
}()
params := amsilence.NewDeleteSilenceParamsWithContext(ctx).WithSilenceID(strfmt.UUID(silenceID))
_, err := am.amClient.Silence.DeleteSilence(params)
if err != nil {
return err
}
return nil
}
func (am *externalAlertmanager) GetSilence(ctx context.Context, silenceID string) (apimodels.GettableSilence, error) {
defer func() {
if r := recover(); r != nil {
am.log.Error("Panic while getting silence", "err", r)
}
}()
params := amsilence.NewGetSilenceParamsWithContext(ctx).WithSilenceID(strfmt.UUID(silenceID))
res, err := am.amClient.Silence.GetSilence(params)
if err != nil {
return apimodels.GettableSilence{}, err
}
return *res.Payload, nil
}
func (am *externalAlertmanager) ListSilences(ctx context.Context, filter []string) (apimodels.GettableSilences, error) {
defer func() {
if r := recover(); r != nil {
am.log.Error("Panic while listing silences", "err", r)
}
}()
params := amsilence.NewGetSilencesParamsWithContext(ctx).WithFilter(filter)
res, err := am.amClient.Silence.GetSilences(params)
if err != nil {
return apimodels.GettableSilences{}, err
}
return res.Payload, nil
}
func (am *externalAlertmanager) GetAlerts(ctx context.Context, active, silenced, inhibited bool, filter []string, receiver string) (apimodels.GettableAlerts, error) {
defer func() {
if r := recover(); r != nil {
am.log.Error("Panic while getting alerts", "err", r)
}
}()
params := amalert.NewGetAlertsParamsWithContext(ctx).
WithActive(&active).
WithSilenced(&silenced).
WithInhibited(&inhibited).
WithFilter(filter).
WithReceiver(&receiver)
res, err := am.amClient.Alert.GetAlerts(params)
if err != nil {
return apimodels.GettableAlerts{}, err
}
return res.Payload, nil
}
func (am *externalAlertmanager) GetAlertGroups(ctx context.Context, active, silenced, inhibited bool, filter []string, receiver string) (apimodels.AlertGroups, error) {
defer func() {
if r := recover(); r != nil {
am.log.Error("Panic while getting alert groups", "err", r)
}
}()
params := amalertgroup.NewGetAlertGroupsParamsWithContext(ctx).
WithActive(&active).
WithSilenced(&silenced).
WithInhibited(&inhibited).
WithFilter(filter).
WithReceiver(&receiver)
res, err := am.amClient.Alertgroup.GetAlertGroups(params)
if err != nil {
return apimodels.AlertGroups{}, err
}
return res.Payload, nil
}
// TODO: implement PutAlerts in a way that is similar to what Prometheus does.
// This current implementation is only good for testing methods that retrieve alerts from the remote Alertmanager.
// More details in issue https://github.com/grafana/grafana/issues/76692
func (am *externalAlertmanager) PutAlerts(ctx context.Context, postableAlerts apimodels.PostableAlerts) error {
defer func() {
if r := recover(); r != nil {
am.log.Error("Panic while putting alerts", "err", r)
}
}()
alerts := make(alertingNotify.PostableAlerts, 0, len(postableAlerts.PostableAlerts))
for _, pa := range postableAlerts.PostableAlerts {
alerts = append(alerts, &alertingNotify.PostableAlert{
Annotations: pa.Annotations,
EndsAt: pa.EndsAt,
StartsAt: pa.StartsAt,
Alert: pa.Alert,
})
}
params := amalert.NewPostAlertsParamsWithContext(ctx).WithAlerts(alerts)
_, err := am.amClient.Alert.PostAlerts(params)
return err
}
func (am *externalAlertmanager) GetStatus() apimodels.GettableStatus {
return apimodels.GettableStatus{}
}
func (am *externalAlertmanager) GetReceivers(ctx context.Context) ([]apimodels.Receiver, error) {
params := amreceiver.NewGetReceiversParamsWithContext(ctx)
res, err := am.amClient.Receiver.GetReceivers(params)
if err != nil {
return []apimodels.Receiver{}, err
}
var rcvs []apimodels.Receiver
for _, rcv := range res.Payload {
rcvs = append(rcvs, *rcv)
}
return rcvs, nil
}
func (am *externalAlertmanager) TestReceivers(ctx context.Context, c apimodels.TestReceiversConfigBodyParams) (*TestReceiversResult, error) {
return &TestReceiversResult{}, nil
}
func (am *externalAlertmanager) TestTemplate(ctx context.Context, c apimodels.TestTemplatesConfigBodyParams) (*TestTemplatesResults, error) {
return &TestTemplatesResults{}, nil
}
func (am *externalAlertmanager) StopAndWait() {
}
func (am *externalAlertmanager) Ready() bool {
return false
}
func (am *externalAlertmanager) FileStore() *FileStore {
return &FileStore{}
}
func (am *externalAlertmanager) OrgID() int64 {
return am.orgID
}
func (am *externalAlertmanager) ConfigHash() [16]byte {
return [16]byte{}
}
type roundTripper struct {
tenantID string
basicAuthPassword string
next http.RoundTripper
}
// RoundTrip implements the http.RoundTripper interface
// while adding the `X-Scope-OrgID` header and basic auth credentials.
func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
req.Header.Set("X-Scope-OrgID", r.tenantID)
if r.tenantID != "" && r.basicAuthPassword != "" {
req.SetBasicAuth(r.tenantID, r.basicAuthPassword)
}
return r.next.RoundTrip(req)
}
// TODO: change implementation, this is only useful for testing other methods.
func (am *externalAlertmanager) postConfig(ctx context.Context, rawConfig string) error {
url := strings.TrimSuffix(am.url, "/alertmanager") + "/api/v1/alerts"
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, strings.NewReader(rawConfig))
if err != nil {
return fmt.Errorf("error creating request: %v", err)
}
res, err := am.httpClient.Do(req)
if err != nil {
return err
}
if res.StatusCode == http.StatusNotFound {
return fmt.Errorf("config not found")
}
defer func() {
if err := res.Body.Close(); err != nil {
am.log.Warn("Error while closing body", "err", err)
}
}()
_, err = io.ReadAll(res.Body)
if err != nil {
return fmt.Errorf("error reading request response: %w", err)
}
if res.StatusCode != http.StatusCreated {
return fmt.Errorf("setting config failed with status code %d", res.StatusCode)
}
return nil
}

View File

@@ -1,304 +0,0 @@
package notifier
import (
"context"
"math/rand"
"os"
"testing"
"time"
"github.com/go-openapi/strfmt"
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
"github.com/grafana/grafana/pkg/util"
amv2 "github.com/prometheus/alertmanager/api/v2/models"
"github.com/stretchr/testify/require"
)
const (
validConfig = `{"template_files":{},"alertmanager_config":{"route":{"receiver":"grafana-default-email","group_by":["grafana_folder","alertname"]},"templates":null,"receivers":[{"name":"grafana-default-email","grafana_managed_receiver_configs":[{"uid":"","name":"some other name","type":"email","disableResolveMessage":false,"settings":{"addresses":"\u003cexample@email.com\u003e"},"secureSettings":null}]}]}}`
// Valid config for Cloud AM, no `grafana_managed_receievers` field.
upstreamConfig = `{"template_files": {}, "alertmanager_config": "{\"global\": {\"smtp_from\": \"test@test.com\"}, \"route\": {\"receiver\": \"discord\"}, \"receivers\": [{\"name\": \"discord\", \"discord_configs\": [{\"webhook_url\": \"http://localhost:1234\"}]}]}"}`
)
func TestNewExternalAlertmanager(t *testing.T) {
tests := []struct {
name string
url string
tenantID string
password string
orgID int64
defaultConfig string
expErr string
}{
{
name: "empty URL",
url: "",
tenantID: "1234",
password: "test",
defaultConfig: validConfig,
orgID: 1,
expErr: "empty URL for tenant 1234",
},
{
name: "empty default config",
url: "http://localhost:8080",
tenantID: "1234",
defaultConfig: "",
password: "test",
orgID: 1,
expErr: "unable to parse Alertmanager configuration: unexpected end of JSON input",
},
{
name: "invalid default config",
url: "http://localhost:8080",
tenantID: "1234",
defaultConfig: `{"invalid": true}`,
password: "test",
orgID: 1,
expErr: "unable to parse Alertmanager configuration: no route provided in config",
},
{
name: "valid parameters",
url: "http://localhost:8080",
tenantID: "1234",
defaultConfig: validConfig,
password: "test",
orgID: 1,
},
}
for _, test := range tests {
t.Run(test.name, func(tt *testing.T) {
cfg := externalAlertmanagerConfig{
URL: test.url,
TenantID: test.tenantID,
BasicAuthPassword: test.password,
DefaultConfig: test.defaultConfig,
}
am, err := newExternalAlertmanager(cfg, test.orgID)
if test.expErr != "" {
require.EqualError(tt, err, test.expErr)
return
}
require.NoError(tt, err)
require.Equal(tt, am.tenantID, test.tenantID)
require.Equal(tt, am.url, test.url)
require.Equal(tt, am.defaultConfig, test.defaultConfig)
require.Equal(tt, am.OrgID(), test.orgID)
require.NotNil(tt, am.amClient)
require.NotNil(tt, am.httpClient)
})
}
}
func TestIntegrationRemoteAlertmanagerSilences(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
amURL, ok := os.LookupEnv("AM_URL")
if !ok {
t.Skip("No Alertmanager URL provided")
}
tenantID := os.Getenv("AM_TENANT_ID")
password := os.Getenv("AM_PASSWORD")
cfg := externalAlertmanagerConfig{
URL: amURL + "/alertmanager",
TenantID: tenantID,
BasicAuthPassword: password,
DefaultConfig: validConfig,
}
am, err := newExternalAlertmanager(cfg, 1)
require.NoError(t, err)
// We should have no silences at first.
silences, err := am.ListSilences(context.Background(), []string{})
require.NoError(t, err)
require.Equal(t, 0, len(silences))
// Creating a silence should succeed.
testSilence := genSilence("test")
id, err := am.CreateSilence(context.Background(), &testSilence)
require.NoError(t, err)
require.NotEmpty(t, id)
testSilence.ID = id
// We should be able to retrieve a specific silence.
silence, err := am.GetSilence(context.Background(), testSilence.ID)
require.NoError(t, err)
require.Equal(t, testSilence.ID, *silence.ID)
// Trying to retrieve a non-existing silence should fail.
_, err = am.GetSilence(context.Background(), util.GenerateShortUID())
require.Error(t, err)
// After creating another silence, the total amount should be 2.
testSilence2 := genSilence("test")
id, err = am.CreateSilence(context.Background(), &testSilence2)
require.NoError(t, err)
require.NotEmpty(t, id)
testSilence2.ID = id
silences, err = am.ListSilences(context.Background(), []string{})
require.NoError(t, err)
require.Equal(t, 2, len(silences))
require.True(t, *silences[0].ID == testSilence.ID || *silences[0].ID == testSilence2.ID)
require.True(t, *silences[1].ID == testSilence.ID || *silences[1].ID == testSilence2.ID)
// After deleting one of those silences, the total amount should be 2 but one of those should be expired.
err = am.DeleteSilence(context.Background(), testSilence.ID)
require.NoError(t, err)
silences, err = am.ListSilences(context.Background(), []string{})
require.NoError(t, err)
for _, s := range silences {
if *s.ID == testSilence.ID {
require.Equal(t, *s.Status.State, "expired")
} else {
require.Equal(t, *s.Status.State, "pending")
}
}
// When deleting the other silence, both should be expired.
err = am.DeleteSilence(context.Background(), testSilence2.ID)
require.NoError(t, err)
silences, err = am.ListSilences(context.Background(), []string{})
require.NoError(t, err)
require.Equal(t, *silences[0].Status.State, "expired")
require.Equal(t, *silences[1].Status.State, "expired")
}
func TestIntegrationRemoteAlertmanagerAlerts(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
amURL, ok := os.LookupEnv("AM_URL")
if !ok {
t.Skip("No Alertmanager URL provided")
}
tenantID := os.Getenv("AM_TENANT_ID")
password := os.Getenv("AM_PASSWORD")
cfg := externalAlertmanagerConfig{
URL: amURL + "/alertmanager",
TenantID: tenantID,
BasicAuthPassword: password,
DefaultConfig: validConfig,
}
am, err := newExternalAlertmanager(cfg, 1)
require.NoError(t, err)
// We should have no alerts and no groups at first.
alerts, err := am.GetAlerts(context.Background(), true, true, true, []string{}, "")
require.NoError(t, err)
require.Equal(t, 0, len(alerts))
alertGroups, err := am.GetAlertGroups(context.Background(), true, true, true, []string{}, "")
require.NoError(t, err)
require.Equal(t, 0, len(alertGroups))
// Let's create two active alerts and one expired one.
alert1 := genAlert(true, map[string]string{"test_1": "test_1"})
alert2 := genAlert(true, map[string]string{"test_2": "test_2"})
alert3 := genAlert(false, map[string]string{"test_3": "test_3"})
postableAlerts := apimodels.PostableAlerts{
PostableAlerts: []amv2.PostableAlert{alert1, alert2, alert3},
}
err = am.PutAlerts(context.Background(), postableAlerts)
require.NoError(t, err)
// We should have two alerts and one group now.
alerts, err = am.GetAlerts(context.Background(), true, true, true, []string{}, "")
require.NoError(t, err)
require.Equal(t, 2, len(alerts))
alertGroups, err = am.GetAlertGroups(context.Background(), true, true, true, []string{}, "")
require.NoError(t, err)
require.Equal(t, 1, len(alertGroups))
// Filtering by `test_1=test_1` should return one alert.
alerts, err = am.GetAlerts(context.Background(), true, true, true, []string{"test_1=test_1"}, "")
require.NoError(t, err)
require.Equal(t, 1, len(alerts))
}
func TestIntegrationRemoteAlertmanagerReceivers(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
amURL, ok := os.LookupEnv("AM_URL")
if !ok {
t.Skip("No Alertmanager URL provided")
}
tenantID := os.Getenv("AM_TENANT_ID")
password := os.Getenv("AM_PASSWORD")
cfg := externalAlertmanagerConfig{
URL: amURL + "/alertmanager",
TenantID: tenantID,
BasicAuthPassword: password,
DefaultConfig: validConfig,
}
am, err := newExternalAlertmanager(cfg, 1)
require.NoError(t, err)
// We should start with the default config.
rcvs, err := am.GetReceivers(context.Background())
require.NoError(t, err)
require.Equal(t, "empty-receiver", *rcvs[0].Name)
// After changing the configuration, we should have a new `discord` receiver.
require.NoError(t, am.postConfig(context.Background(), upstreamConfig))
require.Eventually(t, func() bool {
rcvs, err = am.GetReceivers(context.Background())
require.NoError(t, err)
return *rcvs[0].Name == "discord"
}, 16*time.Second, 1*time.Second)
}
func genSilence(createdBy string) apimodels.PostableSilence {
starts := strfmt.DateTime(time.Now().Add(time.Duration(rand.Int63n(9)+1) * time.Second))
ends := strfmt.DateTime(time.Now().Add(time.Duration(rand.Int63n(9)+10) * time.Second))
comment := "test comment"
isEqual := true
name := "test"
value := "test"
isRegex := false
matchers := amv2.Matchers{&amv2.Matcher{IsEqual: &isEqual, Name: &name, Value: &value, IsRegex: &isRegex}}
return apimodels.PostableSilence{
Silence: amv2.Silence{
Comment: &comment,
CreatedBy: &createdBy,
Matchers: matchers,
StartsAt: &starts,
EndsAt: &ends,
},
}
}
func genAlert(active bool, labels map[string]string) amv2.PostableAlert {
endsAt := time.Now()
if active {
endsAt = time.Now().Add(1 * time.Minute)
}
return amv2.PostableAlert{
Annotations: amv2.LabelSet(map[string]string{"test_annotation": "test_annotation_value"}),
StartsAt: strfmt.DateTime(time.Now()),
EndsAt: strfmt.DateTime(endsAt),
Alert: amv2.Alert{
GeneratorURL: strfmt.URI("http://localhost:8080"),
Labels: amv2.LabelSet(labels),
},
}
}

View File

@@ -78,6 +78,7 @@ type MultiOrgAlertmanager struct {
configStore AlertingStore
orgStore store.OrgStore
kvStore kvstore.KVStore
factory orgAlertmanagerFactory
decryptFn alertingNotify.GetDecryptedValueFn
@@ -85,9 +86,19 @@ type MultiOrgAlertmanager struct {
ns notifications.Service
}
type orgAlertmanagerFactory func(ctx context.Context, orgID int64) (Alertmanager, error)
type Option func(*MultiOrgAlertmanager)
func WithAlertmanagerOverride(f orgAlertmanagerFactory) Option {
return func(moa *MultiOrgAlertmanager) {
moa.factory = f
}
}
func NewMultiOrgAlertmanager(cfg *setting.Cfg, configStore AlertingStore, orgStore store.OrgStore,
kvStore kvstore.KVStore, provStore provisioningStore, decryptFn alertingNotify.GetDecryptedValueFn,
m *metrics.MultiOrgAlertmanager, ns notifications.Service, l log.Logger, s secrets.Service,
m *metrics.MultiOrgAlertmanager, ns notifications.Service, l log.Logger, s secrets.Service, opts ...Option,
) (*MultiOrgAlertmanager, error) {
moa := &MultiOrgAlertmanager{
Crypto: NewCrypto(s, configStore, l),
@@ -104,9 +115,21 @@ func NewMultiOrgAlertmanager(cfg *setting.Cfg, configStore AlertingStore, orgSto
ns: ns,
peer: &NilPeer{},
}
if err := moa.setupClustering(cfg); err != nil {
return nil, err
}
// Set up the default per tenant Alertmanager factory.
moa.factory = func(ctx context.Context, orgID int64) (Alertmanager, error) {
m := metrics.NewAlertmanagerMetrics(moa.metrics.GetOrCreateOrgRegistry(orgID))
return newAlertmanager(ctx, orgID, moa.settings, moa.configStore, moa.kvStore, moa.peer, moa.decryptFn, moa.ns, m)
}
for _, opt := range opts {
opt(moa)
}
return moa, nil
}
@@ -244,8 +267,7 @@ func (moa *MultiOrgAlertmanager) SyncAlertmanagersForOrgs(ctx context.Context, o
// These metrics are not exported by Grafana and are mostly a placeholder.
// To export them, we need to translate the metrics from each individual registry and,
// then aggregate them on the main registry.
m := metrics.NewAlertmanagerMetrics(moa.metrics.GetOrCreateOrgRegistry(orgID))
am, err := newAlertmanager(ctx, orgID, moa.settings, moa.configStore, moa.kvStore, moa.peer, moa.decryptFn, moa.ns, m)
am, err := moa.factory(ctx, orgID)
if err != nil {
moa.logger.Error("Unable to create Alertmanager for org", "org", orgID, "error", err)
}