diff --git a/pkg/services/ngalert/notifier/alertmanager.go b/pkg/services/ngalert/notifier/alertmanager.go index 489a10853e5..69e1f7d5f98 100644 --- a/pkg/services/ngalert/notifier/alertmanager.go +++ b/pkg/services/ngalert/notifier/alertmanager.go @@ -27,8 +27,8 @@ import ( ) const ( - notificationLogFilename = "notifications" - silencesFilename = "silences" + NotificationLogFilename = "notifications" + SilencesFilename = "silences" workingDir = "alerting" // maintenanceNotificationAndSilences how often should we flush and garbage collect notifications @@ -89,22 +89,22 @@ func newAlertmanager(ctx context.Context, orgID int64, cfg *setting.Cfg, store A workingPath := filepath.Join(cfg.DataPath, workingDir, strconv.Itoa(int(orgID))) fileStore := NewFileStore(orgID, kvStore, workingPath) - nflogFilepath, err := fileStore.FilepathFor(ctx, notificationLogFilename) + nflogFilepath, err := fileStore.FilepathFor(ctx, NotificationLogFilename) if err != nil { return nil, err } - silencesFilePath, err := fileStore.FilepathFor(ctx, silencesFilename) + silencesFilepath, err := fileStore.FilepathFor(ctx, SilencesFilename) if err != nil { return nil, err } silencesOptions := maintenanceOptions{ - filepath: silencesFilePath, + filepath: silencesFilepath, retention: retentionNotificationsAndSilences, maintenanceFrequency: silenceMaintenanceInterval, maintenanceFunc: func(state alertingNotify.State) (int64, error) { // Detached context here is to make sure that when the service is shut down the persist operation is executed. - return fileStore.Persist(context.Background(), silencesFilename, state) + return fileStore.Persist(context.Background(), SilencesFilename, state) }, } @@ -114,7 +114,7 @@ func newAlertmanager(ctx context.Context, orgID int64, cfg *setting.Cfg, store A maintenanceFrequency: notificationLogMaintenanceInterval, maintenanceFunc: func(state alertingNotify.State) (int64, error) { // Detached context here is to make sure that when the service is shut down the persist operation is executed. - return fileStore.Persist(context.Background(), notificationLogFilename, state) + return fileStore.Persist(context.Background(), NotificationLogFilename, state) }, } diff --git a/pkg/services/ngalert/notifier/multiorg_alertmanager.go b/pkg/services/ngalert/notifier/multiorg_alertmanager.go index 886b038896d..d19810b1e64 100644 --- a/pkg/services/ngalert/notifier/multiorg_alertmanager.go +++ b/pkg/services/ngalert/notifier/multiorg_alertmanager.go @@ -356,7 +356,7 @@ func (moa *MultiOrgAlertmanager) cleanupOrphanLocalOrgState(ctx context.Context, // Remove all orphaned items from kvstore by listing all existing items // in our used namespace and comparing them to the currently active // organizations. - storedFiles := []string{notificationLogFilename, silencesFilename} + storedFiles := []string{NotificationLogFilename, SilencesFilename} for _, fileName := range storedFiles { keys, err := moa.kvStore.Keys(ctx, kvstore.AllOrganizations, KVNamespace, fileName) if err != nil { diff --git a/pkg/services/ngalert/notifier/multiorg_alertmanager_test.go b/pkg/services/ngalert/notifier/multiorg_alertmanager_test.go index 440180ffc5d..68d9310f4ae 100644 --- a/pkg/services/ngalert/notifier/multiorg_alertmanager_test.go +++ b/pkg/services/ngalert/notifier/multiorg_alertmanager_test.go @@ -114,27 +114,27 @@ grafana_alerting_discovered_configurations 4 err := os.Mkdir(orphanDir, 0750) require.NoError(t, err) - silencesPath := filepath.Join(orphanDir, silencesFilename) + silencesPath := filepath.Join(orphanDir, SilencesFilename) err = os.WriteFile(silencesPath, []byte("file_1"), 0644) require.NoError(t, err) - notificationPath := filepath.Join(orphanDir, notificationLogFilename) + notificationPath := filepath.Join(orphanDir, NotificationLogFilename) err = os.WriteFile(notificationPath, []byte("file_2"), 0644) require.NoError(t, err) // We make sure that both files are on disk. info, err := os.Stat(silencesPath) require.NoError(t, err) - require.Equal(t, info.Name(), silencesFilename) + require.Equal(t, info.Name(), SilencesFilename) info, err = os.Stat(notificationPath) require.NoError(t, err) - require.Equal(t, info.Name(), notificationLogFilename) + require.Equal(t, info.Name(), NotificationLogFilename) // We also populate the kvstore with orphaned records. - err = kvStore.Set(ctx, orgID, KVNamespace, silencesFilename, "file_1") + err = kvStore.Set(ctx, orgID, KVNamespace, SilencesFilename, "file_1") require.NoError(t, err) - err = kvStore.Set(ctx, orgID, KVNamespace, notificationLogFilename, "file_1") + err = kvStore.Set(ctx, orgID, KVNamespace, NotificationLogFilename, "file_1") require.NoError(t, err) // Now re run the sync job once. @@ -145,10 +145,10 @@ grafana_alerting_discovered_configurations 4 require.True(t, errors.Is(err, fs.ErrNotExist)) // The organization kvstore records should be gone by now. - _, exists, _ := kvStore.Get(ctx, orgID, KVNamespace, silencesFilename) + _, exists, _ := kvStore.Get(ctx, orgID, KVNamespace, SilencesFilename) require.False(t, exists) - _, exists, _ = kvStore.Get(ctx, orgID, KVNamespace, notificationLogFilename) + _, exists, _ = kvStore.Get(ctx, orgID, KVNamespace, NotificationLogFilename) require.False(t, exists) } } diff --git a/pkg/services/ngalert/remote/alertmanager.go b/pkg/services/ngalert/remote/alertmanager.go index e7f4b70559f..c3caead7b9f 100644 --- a/pkg/services/ngalert/remote/alertmanager.go +++ b/pkg/services/ngalert/remote/alertmanager.go @@ -33,15 +33,15 @@ type stateStore interface { type Alertmanager struct { log log.Logger orgID int64 + ready bool + sender *sender.ExternalAlertmanager + state stateStore tenantID string url string amClient *amclient.AlertmanagerAPI - mimirClient mimirClient.MimirClient httpClient *http.Client - ready bool - sender *sender.ExternalAlertmanager - stateStore stateStore + mimirClient mimirClient.MimirClient } type AlertmanagerConfig struct { @@ -50,7 +50,7 @@ type AlertmanagerConfig struct { BasicAuthPassword string } -func NewAlertmanager(cfg AlertmanagerConfig, orgID int64, store stateStore) (*Alertmanager, error) { +func NewAlertmanager(cfg AlertmanagerConfig, orgID int64, stateStore stateStore) (*Alertmanager, error) { client := http.Client{ Transport: &mimirClient.MimirAuthRoundTripper{ TenantID: cfg.TenantID, @@ -104,8 +104,8 @@ func NewAlertmanager(cfg AlertmanagerConfig, orgID int64, store stateStore) (*Al mimirClient: mc, amClient: amclient.New(transport, nil), httpClient: &client, + state: stateStore, sender: s, - stateStore: store, orgID: orgID, tenantID: cfg.TenantID, url: cfg.URL, @@ -126,30 +126,33 @@ func (am *Alertmanager) ApplyConfig(ctx context.Context, config *models.AlertCon // First, execute a readiness check to make sure the remote Alertmanager is ready. am.log.Debug("Start readiness check for remote Alertmanager", "url", am.url) if err := am.checkReadiness(ctx); err != nil { - am.log.Error("unable to pass the readiness check", "err", err) + am.log.Error("Unable to pass the readiness check", "err", err) return err } am.log.Debug("Completed readiness check for remote Alertmanager", "url", am.url) + // Send configuration if necessary. am.log.Debug("Start configuration upload to remote Alertmanager", "url", am.url) - if ok := am.compareRemoteConfig(ctx, config); !ok { + if am.shouldSendConfig(ctx, config) { err := am.mimirClient.CreateGrafanaAlertmanagerConfig(ctx, config.AlertmanagerConfiguration, config.ConfigurationHash, config.ID, config.CreatedAt, config.Default) if err != nil { am.log.Error("Unable to upload the configuration to the remote Alertmanager", "err", err) - } else { - am.log.Debug("Completed configuration upload to remote Alertmanager", "url", am.url) } } + am.log.Debug("Completed configuration upload to remote Alertmanager", "url", am.url) + // Send base64-encoded state if necessary. am.log.Debug("Start state upload to remote Alertmanager", "url", am.url) - if ok := am.compareRemoteState(ctx, ""); !ok { - if err := am.mimirClient.CreateGrafanaAlertmanagerState(ctx, ""); err != nil { + state, err := am.state.GetFullState(ctx, notifier.SilencesFilename, notifier.NotificationLogFilename) + if err != nil { + am.log.Error("error getting the Alertmanager's full state", "err", err) + } else if am.shouldSendState(ctx, state) { + if err := am.mimirClient.CreateGrafanaAlertmanagerState(ctx, state); err != nil { am.log.Error("Unable to upload the state to the remote Alertmanager", "err", err) } } - am.log.Debug("Completed state upload to remote Alertmanager", "url", am.url) - // upload the state + am.log.Debug("Completed state upload to remote Alertmanager", "url", am.url) return nil } @@ -357,26 +360,28 @@ func (am *Alertmanager) Ready() bool { // CleanUp does not have an equivalent in a "remote Alertmanager" context, we don't have files on disk, no-op. func (am *Alertmanager) CleanUp() {} -// compareRemoteConfig gets the remote Alertmanager config and compares it to the existing configuration. -func (am *Alertmanager) compareRemoteConfig(ctx context.Context, config *models.AlertConfiguration) bool { +// shouldSendConfig compares the remote Alertmanager configuration with our local one. +// It returns true if the configurations are different. +func (am *Alertmanager) shouldSendConfig(ctx context.Context, config *models.AlertConfiguration) bool { rc, err := am.mimirClient.GetGrafanaAlertmanagerConfig(ctx) if err != nil { - // If we get an error trying to compare log it and return false so that we try to upload it anyway. + // Log the error and return true so we try to upload our config anyway. am.log.Error("Unable to get the remote Alertmanager Configuration for comparison", "err", err) - return false + return true } - return md5.Sum([]byte(rc.GrafanaAlertmanagerConfig)) == md5.Sum([]byte(config.AlertmanagerConfiguration)) + return md5.Sum([]byte(rc.GrafanaAlertmanagerConfig)) != md5.Sum([]byte(config.AlertmanagerConfiguration)) } -// compareRemoteState gets the remote Alertmanager state and compares it to the existing state. -func (am *Alertmanager) compareRemoteState(ctx context.Context, state string) bool { +// shouldSendState compares the remote Alertmanager state with our local one. +// It returns true if the states are different. +func (am *Alertmanager) shouldSendState(ctx context.Context, state string) bool { rs, err := am.mimirClient.GetGrafanaAlertmanagerState(ctx) if err != nil { - // If we get an error trying to compare log it and return false so that we try to upload it anyway. + // Log the error and return true so we try to upload our state anyway. am.log.Error("Unable to get the remote Alertmanager state for comparison", "err", err) - return false + return true } - return rs.State == state + return rs.State != state } diff --git a/pkg/services/ngalert/remote/alertmanager_test.go b/pkg/services/ngalert/remote/alertmanager_test.go index f80346583a7..17d6b1ede39 100644 --- a/pkg/services/ngalert/remote/alertmanager_test.go +++ b/pkg/services/ngalert/remote/alertmanager_test.go @@ -3,6 +3,7 @@ package remote import ( "context" "crypto/md5" + "encoding/base64" "fmt" "math/rand" "net/http" @@ -14,8 +15,11 @@ import ( "github.com/go-openapi/strfmt" apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions" ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models" + "github.com/grafana/grafana/pkg/services/ngalert/notifier" + "github.com/grafana/grafana/pkg/services/ngalert/tests/fakes" "github.com/grafana/grafana/pkg/util" amv2 "github.com/prometheus/alertmanager/api/v2/models" + "github.com/prometheus/alertmanager/cluster/clusterpb" "github.com/stretchr/testify/require" ) @@ -93,11 +97,17 @@ func TestApplyConfig(t *testing.T) { cfg := AlertmanagerConfig{ URL: server.URL, } - am, err := NewAlertmanager(cfg, 1, nil) + + ctx := context.Background() + store := fakes.NewFakeKVStore(t) + fstore := notifier.NewFileStore(1, store, "") + require.NoError(t, store.Set(ctx, 1, "alertmanager", notifier.SilencesFilename, "test")) + require.NoError(t, store.Set(ctx, 1, "alertmanager", notifier.NotificationLogFilename, "test")) + + am, err := NewAlertmanager(cfg, 1, fstore) require.NoError(t, err) config := &ngmodels.AlertConfiguration{} - ctx := context.Background() require.Error(t, am.ApplyConfig(ctx, config)) require.False(t, am.Ready()) @@ -143,13 +153,35 @@ func TestIntegrationRemoteAlertmanagerApplyConfigOnlyUploadsOnce(t *testing.T) { OrgID: 1, } + silences := []byte("test-silences") + nflog := []byte("test-notifications") + store := fakes.NewFakeKVStore(t) + fstore := notifier.NewFileStore(1, store, "") + ctx := context.Background() - am, err := NewAlertmanager(cfg, 1, nil) + require.NoError(t, store.Set(ctx, 1, "alertmanager", notifier.SilencesFilename, base64.StdEncoding.EncodeToString(silences))) + require.NoError(t, store.Set(ctx, 1, "alertmanager", notifier.NotificationLogFilename, base64.StdEncoding.EncodeToString(nflog))) + + fs := clusterpb.FullState{ + Parts: []clusterpb.Part{ + {Key: "silences", Data: silences}, + {Key: "notifications", Data: nflog}, + }, + } + fullState, err := fs.Marshal() + require.NoError(t, err) + encodedFullState := base64.StdEncoding.EncodeToString(fullState) + + am, err := NewAlertmanager(cfg, 1, fstore) require.NoError(t, err) - // We should have no configuration at first. + // We should have no configuration or state at first. { - _, err = am.mimirClient.GetGrafanaAlertmanagerConfig(ctx) + _, err := am.mimirClient.GetGrafanaAlertmanagerConfig(ctx) + require.Error(t, err) + require.Equal(t, "Error response from the Mimir API: alertmanager storage object not found", err.Error()) + + _, err = am.mimirClient.GetGrafanaAlertmanagerState(ctx) require.Error(t, err) require.Equal(t, "Error response from the Mimir API: alertmanager storage object not found", err.Error()) } @@ -162,7 +194,7 @@ func TestIntegrationRemoteAlertmanagerApplyConfigOnlyUploadsOnce(t *testing.T) { // First, we need to verify that the readiness check passes. require.True(t, am.Ready()) - // Next, we need to verify that Mimir received the configuration. + // Next, we need to verify that Mimir received both the configuration and state. config, err := am.mimirClient.GetGrafanaAlertmanagerConfig(ctx) require.NoError(t, err) require.Equal(t, int64(100), config.ID) @@ -171,11 +203,15 @@ func TestIntegrationRemoteAlertmanagerApplyConfigOnlyUploadsOnce(t *testing.T) { require.Equal(t, fakeConfigCreatedAt, config.CreatedAt) require.Equal(t, true, config.Default) - // TODO: Check that the state was uploaded. + state, err := am.mimirClient.GetGrafanaAlertmanagerState(ctx) + require.NoError(t, err) + require.Equal(t, encodedFullState, state.State) } - // Calling `ApplyConfig` again with a changed configuration yields no effect. + // Calling `ApplyConfig` again with a changed configuration and state yields no effect. { + require.NoError(t, store.Set(ctx, 1, "alertmanager", "silences", base64.StdEncoding.EncodeToString([]byte("abc123")))) + require.NoError(t, store.Set(ctx, 1, "alertmanager", "notifications", base64.StdEncoding.EncodeToString([]byte("abc123")))) fakeConfig.ID = 30000000000000000 require.NoError(t, am.ApplyConfig(ctx, fakeConfig)) @@ -190,6 +226,11 @@ func TestIntegrationRemoteAlertmanagerApplyConfigOnlyUploadsOnce(t *testing.T) { require.Equal(t, fakeConfigHash, config.Hash) require.Equal(t, fakeConfigCreatedAt, config.CreatedAt) require.Equal(t, true, config.Default) + + // Check that the state is the same as before. + state, err := am.mimirClient.GetGrafanaAlertmanagerState(ctx) + require.NoError(t, err) + require.Equal(t, encodedFullState, state.State) } // TODO: Now, shutdown the Alertmanager and we expect the latest configuration to be uploaded. diff --git a/pkg/services/ngalert/remote/client/alertmanager_configuration.go b/pkg/services/ngalert/remote/client/alertmanager_configuration.go index 52ce13df282..dd8ab90318a 100644 --- a/pkg/services/ngalert/remote/client/alertmanager_configuration.go +++ b/pkg/services/ngalert/remote/client/alertmanager_configuration.go @@ -39,13 +39,13 @@ func (mc *Mimir) GetGrafanaAlertmanagerConfig(ctx context.Context) (*UserGrafana return gc, nil } -func (mc *Mimir) CreateGrafanaAlertmanagerConfig(ctx context.Context, c, hash string, id, created int64, d bool) error { +func (mc *Mimir) CreateGrafanaAlertmanagerConfig(ctx context.Context, cfg, hash string, id, createdAt int64, isDefault bool) error { payload, err := json.Marshal(&UserGrafanaConfig{ ID: id, - GrafanaAlertmanagerConfig: c, + GrafanaAlertmanagerConfig: cfg, Hash: hash, - CreatedAt: created, - Default: d, + CreatedAt: createdAt, + Default: isDefault, }) if err != nil { return err diff --git a/pkg/services/ngalert/remote/client/alertmanager_state.go b/pkg/services/ngalert/remote/client/alertmanager_state.go index f9cbe79c850..cac59c01192 100644 --- a/pkg/services/ngalert/remote/client/alertmanager_state.go +++ b/pkg/services/ngalert/remote/client/alertmanager_state.go @@ -9,7 +9,7 @@ import ( ) const ( - grafanaAlertmanagerStatePath = "/grafana/state" + grafanaAlertmanagerStatePath = "/api/v1/grafana/state" ) type UserGrafanaState struct { diff --git a/pkg/services/ngalert/remote/client/mimir.go b/pkg/services/ngalert/remote/client/mimir.go index a1f03a160a8..25bfa15ca7f 100644 --- a/pkg/services/ngalert/remote/client/mimir.go +++ b/pkg/services/ngalert/remote/client/mimir.go @@ -17,11 +17,11 @@ import ( // MimirClient contains all the methods to query the migration critical endpoints of Mimir instance, it's an interface to allow multiple implementations. type MimirClient interface { GetGrafanaAlertmanagerState(ctx context.Context) (*UserGrafanaState, error) - CreateGrafanaAlertmanagerState(ctx context.Context, s string) error + CreateGrafanaAlertmanagerState(ctx context.Context, state string) error DeleteGrafanaAlertmanagerState(ctx context.Context) error GetGrafanaAlertmanagerConfig(ctx context.Context) (*UserGrafanaConfig, error) - CreateGrafanaAlertmanagerConfig(ctx context.Context, configuration string, hash string, id int64, at int64, d bool) error + CreateGrafanaAlertmanagerConfig(ctx context.Context, configuration, hash string, id, updatedAt int64, isDefault bool) error DeleteGrafanaAlertmanagerConfig(ctx context.Context) error }