Alerting: Send state to the remote Alertmanager (#78538)

* Alerting: Introduce a Mimir client as part of the Remote Alertmanager

Mimir client that understands the new APIs developed for mimir. Very much a WIP still.

* more wip

* appease the linter

* more linting

* add more code

* get state from kvstore, encode, send

* send state to the remote Alertmanager, extract fullstate logic into its own function

* pass kvstore to remote.NewAlertmanager()

* refactor

* add fake kvstore to tests

* tests

* use FileStore to get state

* always log 'completed state upload'

* refactor compareRemoteConfig

* base64-encode the state in the file store

* export silences and nflog filenames, refactor

* log 'completed state/config upload...' regardless of outcome

* add values to the state store in tests

* address code review comments

* log error from filestore

---------

Co-authored-by: gotjosh <josue.abreu@gmail.com>
This commit is contained in:
Santiago 2023-11-29 12:49:39 +01:00 committed by GitHub
parent 72d32eed27
commit 73776f37eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 101 additions and 55 deletions

View File

@ -27,8 +27,8 @@ import (
) )
const ( const (
notificationLogFilename = "notifications" NotificationLogFilename = "notifications"
silencesFilename = "silences" SilencesFilename = "silences"
workingDir = "alerting" workingDir = "alerting"
// maintenanceNotificationAndSilences how often should we flush and garbage collect notifications // maintenanceNotificationAndSilences how often should we flush and garbage collect notifications
@ -89,22 +89,22 @@ func newAlertmanager(ctx context.Context, orgID int64, cfg *setting.Cfg, store A
workingPath := filepath.Join(cfg.DataPath, workingDir, strconv.Itoa(int(orgID))) workingPath := filepath.Join(cfg.DataPath, workingDir, strconv.Itoa(int(orgID)))
fileStore := NewFileStore(orgID, kvStore, workingPath) fileStore := NewFileStore(orgID, kvStore, workingPath)
nflogFilepath, err := fileStore.FilepathFor(ctx, notificationLogFilename) nflogFilepath, err := fileStore.FilepathFor(ctx, NotificationLogFilename)
if err != nil { if err != nil {
return nil, err return nil, err
} }
silencesFilePath, err := fileStore.FilepathFor(ctx, silencesFilename) silencesFilepath, err := fileStore.FilepathFor(ctx, SilencesFilename)
if err != nil { if err != nil {
return nil, err return nil, err
} }
silencesOptions := maintenanceOptions{ silencesOptions := maintenanceOptions{
filepath: silencesFilePath, filepath: silencesFilepath,
retention: retentionNotificationsAndSilences, retention: retentionNotificationsAndSilences,
maintenanceFrequency: silenceMaintenanceInterval, maintenanceFrequency: silenceMaintenanceInterval,
maintenanceFunc: func(state alertingNotify.State) (int64, error) { maintenanceFunc: func(state alertingNotify.State) (int64, error) {
// Detached context here is to make sure that when the service is shut down the persist operation is executed. // Detached context here is to make sure that when the service is shut down the persist operation is executed.
return fileStore.Persist(context.Background(), silencesFilename, state) return fileStore.Persist(context.Background(), SilencesFilename, state)
}, },
} }
@ -114,7 +114,7 @@ func newAlertmanager(ctx context.Context, orgID int64, cfg *setting.Cfg, store A
maintenanceFrequency: notificationLogMaintenanceInterval, maintenanceFrequency: notificationLogMaintenanceInterval,
maintenanceFunc: func(state alertingNotify.State) (int64, error) { maintenanceFunc: func(state alertingNotify.State) (int64, error) {
// Detached context here is to make sure that when the service is shut down the persist operation is executed. // Detached context here is to make sure that when the service is shut down the persist operation is executed.
return fileStore.Persist(context.Background(), notificationLogFilename, state) return fileStore.Persist(context.Background(), NotificationLogFilename, state)
}, },
} }

View File

@ -356,7 +356,7 @@ func (moa *MultiOrgAlertmanager) cleanupOrphanLocalOrgState(ctx context.Context,
// Remove all orphaned items from kvstore by listing all existing items // Remove all orphaned items from kvstore by listing all existing items
// in our used namespace and comparing them to the currently active // in our used namespace and comparing them to the currently active
// organizations. // organizations.
storedFiles := []string{notificationLogFilename, silencesFilename} storedFiles := []string{NotificationLogFilename, SilencesFilename}
for _, fileName := range storedFiles { for _, fileName := range storedFiles {
keys, err := moa.kvStore.Keys(ctx, kvstore.AllOrganizations, KVNamespace, fileName) keys, err := moa.kvStore.Keys(ctx, kvstore.AllOrganizations, KVNamespace, fileName)
if err != nil { if err != nil {

View File

@ -114,27 +114,27 @@ grafana_alerting_discovered_configurations 4
err := os.Mkdir(orphanDir, 0750) err := os.Mkdir(orphanDir, 0750)
require.NoError(t, err) require.NoError(t, err)
silencesPath := filepath.Join(orphanDir, silencesFilename) silencesPath := filepath.Join(orphanDir, SilencesFilename)
err = os.WriteFile(silencesPath, []byte("file_1"), 0644) err = os.WriteFile(silencesPath, []byte("file_1"), 0644)
require.NoError(t, err) require.NoError(t, err)
notificationPath := filepath.Join(orphanDir, notificationLogFilename) notificationPath := filepath.Join(orphanDir, NotificationLogFilename)
err = os.WriteFile(notificationPath, []byte("file_2"), 0644) err = os.WriteFile(notificationPath, []byte("file_2"), 0644)
require.NoError(t, err) require.NoError(t, err)
// We make sure that both files are on disk. // We make sure that both files are on disk.
info, err := os.Stat(silencesPath) info, err := os.Stat(silencesPath)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, info.Name(), silencesFilename) require.Equal(t, info.Name(), SilencesFilename)
info, err = os.Stat(notificationPath) info, err = os.Stat(notificationPath)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, info.Name(), notificationLogFilename) require.Equal(t, info.Name(), NotificationLogFilename)
// We also populate the kvstore with orphaned records. // We also populate the kvstore with orphaned records.
err = kvStore.Set(ctx, orgID, KVNamespace, silencesFilename, "file_1") err = kvStore.Set(ctx, orgID, KVNamespace, SilencesFilename, "file_1")
require.NoError(t, err) require.NoError(t, err)
err = kvStore.Set(ctx, orgID, KVNamespace, notificationLogFilename, "file_1") err = kvStore.Set(ctx, orgID, KVNamespace, NotificationLogFilename, "file_1")
require.NoError(t, err) require.NoError(t, err)
// Now re run the sync job once. // Now re run the sync job once.
@ -145,10 +145,10 @@ grafana_alerting_discovered_configurations 4
require.True(t, errors.Is(err, fs.ErrNotExist)) require.True(t, errors.Is(err, fs.ErrNotExist))
// The organization kvstore records should be gone by now. // The organization kvstore records should be gone by now.
_, exists, _ := kvStore.Get(ctx, orgID, KVNamespace, silencesFilename) _, exists, _ := kvStore.Get(ctx, orgID, KVNamespace, SilencesFilename)
require.False(t, exists) require.False(t, exists)
_, exists, _ = kvStore.Get(ctx, orgID, KVNamespace, notificationLogFilename) _, exists, _ = kvStore.Get(ctx, orgID, KVNamespace, NotificationLogFilename)
require.False(t, exists) require.False(t, exists)
} }
} }

View File

@ -33,15 +33,15 @@ type stateStore interface {
type Alertmanager struct { type Alertmanager struct {
log log.Logger log log.Logger
orgID int64 orgID int64
ready bool
sender *sender.ExternalAlertmanager
state stateStore
tenantID string tenantID string
url string url string
amClient *amclient.AlertmanagerAPI amClient *amclient.AlertmanagerAPI
mimirClient mimirClient.MimirClient
httpClient *http.Client httpClient *http.Client
ready bool mimirClient mimirClient.MimirClient
sender *sender.ExternalAlertmanager
stateStore stateStore
} }
type AlertmanagerConfig struct { type AlertmanagerConfig struct {
@ -50,7 +50,7 @@ type AlertmanagerConfig struct {
BasicAuthPassword string BasicAuthPassword string
} }
func NewAlertmanager(cfg AlertmanagerConfig, orgID int64, store stateStore) (*Alertmanager, error) { func NewAlertmanager(cfg AlertmanagerConfig, orgID int64, stateStore stateStore) (*Alertmanager, error) {
client := http.Client{ client := http.Client{
Transport: &mimirClient.MimirAuthRoundTripper{ Transport: &mimirClient.MimirAuthRoundTripper{
TenantID: cfg.TenantID, TenantID: cfg.TenantID,
@ -104,8 +104,8 @@ func NewAlertmanager(cfg AlertmanagerConfig, orgID int64, store stateStore) (*Al
mimirClient: mc, mimirClient: mc,
amClient: amclient.New(transport, nil), amClient: amclient.New(transport, nil),
httpClient: &client, httpClient: &client,
state: stateStore,
sender: s, sender: s,
stateStore: store,
orgID: orgID, orgID: orgID,
tenantID: cfg.TenantID, tenantID: cfg.TenantID,
url: cfg.URL, url: cfg.URL,
@ -126,30 +126,33 @@ func (am *Alertmanager) ApplyConfig(ctx context.Context, config *models.AlertCon
// First, execute a readiness check to make sure the remote Alertmanager is ready. // First, execute a readiness check to make sure the remote Alertmanager is ready.
am.log.Debug("Start readiness check for remote Alertmanager", "url", am.url) am.log.Debug("Start readiness check for remote Alertmanager", "url", am.url)
if err := am.checkReadiness(ctx); err != nil { if err := am.checkReadiness(ctx); err != nil {
am.log.Error("unable to pass the readiness check", "err", err) am.log.Error("Unable to pass the readiness check", "err", err)
return err return err
} }
am.log.Debug("Completed readiness check for remote Alertmanager", "url", am.url) am.log.Debug("Completed readiness check for remote Alertmanager", "url", am.url)
// Send configuration if necessary.
am.log.Debug("Start configuration upload to remote Alertmanager", "url", am.url) am.log.Debug("Start configuration upload to remote Alertmanager", "url", am.url)
if ok := am.compareRemoteConfig(ctx, config); !ok { if am.shouldSendConfig(ctx, config) {
err := am.mimirClient.CreateGrafanaAlertmanagerConfig(ctx, config.AlertmanagerConfiguration, config.ConfigurationHash, config.ID, config.CreatedAt, config.Default) err := am.mimirClient.CreateGrafanaAlertmanagerConfig(ctx, config.AlertmanagerConfiguration, config.ConfigurationHash, config.ID, config.CreatedAt, config.Default)
if err != nil { if err != nil {
am.log.Error("Unable to upload the configuration to the remote Alertmanager", "err", err) am.log.Error("Unable to upload the configuration to the remote Alertmanager", "err", err)
} else {
am.log.Debug("Completed configuration upload to remote Alertmanager", "url", am.url)
} }
} }
am.log.Debug("Completed configuration upload to remote Alertmanager", "url", am.url)
// Send base64-encoded state if necessary.
am.log.Debug("Start state upload to remote Alertmanager", "url", am.url) am.log.Debug("Start state upload to remote Alertmanager", "url", am.url)
if ok := am.compareRemoteState(ctx, ""); !ok { state, err := am.state.GetFullState(ctx, notifier.SilencesFilename, notifier.NotificationLogFilename)
if err := am.mimirClient.CreateGrafanaAlertmanagerState(ctx, ""); err != nil { if err != nil {
am.log.Error("error getting the Alertmanager's full state", "err", err)
} else if am.shouldSendState(ctx, state) {
if err := am.mimirClient.CreateGrafanaAlertmanagerState(ctx, state); err != nil {
am.log.Error("Unable to upload the state to the remote Alertmanager", "err", err) am.log.Error("Unable to upload the state to the remote Alertmanager", "err", err)
} }
} }
am.log.Debug("Completed state upload to remote Alertmanager", "url", am.url)
// upload the state
am.log.Debug("Completed state upload to remote Alertmanager", "url", am.url)
return nil return nil
} }
@ -357,26 +360,28 @@ func (am *Alertmanager) Ready() bool {
// CleanUp does not have an equivalent in a "remote Alertmanager" context, we don't have files on disk, no-op. // CleanUp does not have an equivalent in a "remote Alertmanager" context, we don't have files on disk, no-op.
func (am *Alertmanager) CleanUp() {} func (am *Alertmanager) CleanUp() {}
// compareRemoteConfig gets the remote Alertmanager config and compares it to the existing configuration. // shouldSendConfig compares the remote Alertmanager configuration with our local one.
func (am *Alertmanager) compareRemoteConfig(ctx context.Context, config *models.AlertConfiguration) bool { // It returns true if the configurations are different.
func (am *Alertmanager) shouldSendConfig(ctx context.Context, config *models.AlertConfiguration) bool {
rc, err := am.mimirClient.GetGrafanaAlertmanagerConfig(ctx) rc, err := am.mimirClient.GetGrafanaAlertmanagerConfig(ctx)
if err != nil { if err != nil {
// If we get an error trying to compare log it and return false so that we try to upload it anyway. // Log the error and return true so we try to upload our config anyway.
am.log.Error("Unable to get the remote Alertmanager Configuration for comparison", "err", err) am.log.Error("Unable to get the remote Alertmanager Configuration for comparison", "err", err)
return false return true
} }
return md5.Sum([]byte(rc.GrafanaAlertmanagerConfig)) == md5.Sum([]byte(config.AlertmanagerConfiguration)) return md5.Sum([]byte(rc.GrafanaAlertmanagerConfig)) != md5.Sum([]byte(config.AlertmanagerConfiguration))
} }
// compareRemoteState gets the remote Alertmanager state and compares it to the existing state. // shouldSendState compares the remote Alertmanager state with our local one.
func (am *Alertmanager) compareRemoteState(ctx context.Context, state string) bool { // It returns true if the states are different.
func (am *Alertmanager) shouldSendState(ctx context.Context, state string) bool {
rs, err := am.mimirClient.GetGrafanaAlertmanagerState(ctx) rs, err := am.mimirClient.GetGrafanaAlertmanagerState(ctx)
if err != nil { if err != nil {
// If we get an error trying to compare log it and return false so that we try to upload it anyway. // Log the error and return true so we try to upload our state anyway.
am.log.Error("Unable to get the remote Alertmanager state for comparison", "err", err) am.log.Error("Unable to get the remote Alertmanager state for comparison", "err", err)
return false return true
} }
return rs.State == state return rs.State != state
} }

View File

@ -3,6 +3,7 @@ package remote
import ( import (
"context" "context"
"crypto/md5" "crypto/md5"
"encoding/base64"
"fmt" "fmt"
"math/rand" "math/rand"
"net/http" "net/http"
@ -14,8 +15,11 @@ import (
"github.com/go-openapi/strfmt" "github.com/go-openapi/strfmt"
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions" apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models" ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/notifier"
"github.com/grafana/grafana/pkg/services/ngalert/tests/fakes"
"github.com/grafana/grafana/pkg/util" "github.com/grafana/grafana/pkg/util"
amv2 "github.com/prometheus/alertmanager/api/v2/models" amv2 "github.com/prometheus/alertmanager/api/v2/models"
"github.com/prometheus/alertmanager/cluster/clusterpb"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -93,11 +97,17 @@ func TestApplyConfig(t *testing.T) {
cfg := AlertmanagerConfig{ cfg := AlertmanagerConfig{
URL: server.URL, URL: server.URL,
} }
am, err := NewAlertmanager(cfg, 1, nil)
ctx := context.Background()
store := fakes.NewFakeKVStore(t)
fstore := notifier.NewFileStore(1, store, "")
require.NoError(t, store.Set(ctx, 1, "alertmanager", notifier.SilencesFilename, "test"))
require.NoError(t, store.Set(ctx, 1, "alertmanager", notifier.NotificationLogFilename, "test"))
am, err := NewAlertmanager(cfg, 1, fstore)
require.NoError(t, err) require.NoError(t, err)
config := &ngmodels.AlertConfiguration{} config := &ngmodels.AlertConfiguration{}
ctx := context.Background()
require.Error(t, am.ApplyConfig(ctx, config)) require.Error(t, am.ApplyConfig(ctx, config))
require.False(t, am.Ready()) require.False(t, am.Ready())
@ -143,13 +153,35 @@ func TestIntegrationRemoteAlertmanagerApplyConfigOnlyUploadsOnce(t *testing.T) {
OrgID: 1, OrgID: 1,
} }
silences := []byte("test-silences")
nflog := []byte("test-notifications")
store := fakes.NewFakeKVStore(t)
fstore := notifier.NewFileStore(1, store, "")
ctx := context.Background() ctx := context.Background()
am, err := NewAlertmanager(cfg, 1, nil) require.NoError(t, store.Set(ctx, 1, "alertmanager", notifier.SilencesFilename, base64.StdEncoding.EncodeToString(silences)))
require.NoError(t, store.Set(ctx, 1, "alertmanager", notifier.NotificationLogFilename, base64.StdEncoding.EncodeToString(nflog)))
fs := clusterpb.FullState{
Parts: []clusterpb.Part{
{Key: "silences", Data: silences},
{Key: "notifications", Data: nflog},
},
}
fullState, err := fs.Marshal()
require.NoError(t, err)
encodedFullState := base64.StdEncoding.EncodeToString(fullState)
am, err := NewAlertmanager(cfg, 1, fstore)
require.NoError(t, err) require.NoError(t, err)
// We should have no configuration at first. // We should have no configuration or state at first.
{ {
_, err = am.mimirClient.GetGrafanaAlertmanagerConfig(ctx) _, err := am.mimirClient.GetGrafanaAlertmanagerConfig(ctx)
require.Error(t, err)
require.Equal(t, "Error response from the Mimir API: alertmanager storage object not found", err.Error())
_, err = am.mimirClient.GetGrafanaAlertmanagerState(ctx)
require.Error(t, err) require.Error(t, err)
require.Equal(t, "Error response from the Mimir API: alertmanager storage object not found", err.Error()) require.Equal(t, "Error response from the Mimir API: alertmanager storage object not found", err.Error())
} }
@ -162,7 +194,7 @@ func TestIntegrationRemoteAlertmanagerApplyConfigOnlyUploadsOnce(t *testing.T) {
// First, we need to verify that the readiness check passes. // First, we need to verify that the readiness check passes.
require.True(t, am.Ready()) require.True(t, am.Ready())
// Next, we need to verify that Mimir received the configuration. // Next, we need to verify that Mimir received both the configuration and state.
config, err := am.mimirClient.GetGrafanaAlertmanagerConfig(ctx) config, err := am.mimirClient.GetGrafanaAlertmanagerConfig(ctx)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, int64(100), config.ID) require.Equal(t, int64(100), config.ID)
@ -171,11 +203,15 @@ func TestIntegrationRemoteAlertmanagerApplyConfigOnlyUploadsOnce(t *testing.T) {
require.Equal(t, fakeConfigCreatedAt, config.CreatedAt) require.Equal(t, fakeConfigCreatedAt, config.CreatedAt)
require.Equal(t, true, config.Default) require.Equal(t, true, config.Default)
// TODO: Check that the state was uploaded. state, err := am.mimirClient.GetGrafanaAlertmanagerState(ctx)
require.NoError(t, err)
require.Equal(t, encodedFullState, state.State)
} }
// Calling `ApplyConfig` again with a changed configuration yields no effect. // Calling `ApplyConfig` again with a changed configuration and state yields no effect.
{ {
require.NoError(t, store.Set(ctx, 1, "alertmanager", "silences", base64.StdEncoding.EncodeToString([]byte("abc123"))))
require.NoError(t, store.Set(ctx, 1, "alertmanager", "notifications", base64.StdEncoding.EncodeToString([]byte("abc123"))))
fakeConfig.ID = 30000000000000000 fakeConfig.ID = 30000000000000000
require.NoError(t, am.ApplyConfig(ctx, fakeConfig)) require.NoError(t, am.ApplyConfig(ctx, fakeConfig))
@ -190,6 +226,11 @@ func TestIntegrationRemoteAlertmanagerApplyConfigOnlyUploadsOnce(t *testing.T) {
require.Equal(t, fakeConfigHash, config.Hash) require.Equal(t, fakeConfigHash, config.Hash)
require.Equal(t, fakeConfigCreatedAt, config.CreatedAt) require.Equal(t, fakeConfigCreatedAt, config.CreatedAt)
require.Equal(t, true, config.Default) require.Equal(t, true, config.Default)
// Check that the state is the same as before.
state, err := am.mimirClient.GetGrafanaAlertmanagerState(ctx)
require.NoError(t, err)
require.Equal(t, encodedFullState, state.State)
} }
// TODO: Now, shutdown the Alertmanager and we expect the latest configuration to be uploaded. // TODO: Now, shutdown the Alertmanager and we expect the latest configuration to be uploaded.

View File

@ -39,13 +39,13 @@ func (mc *Mimir) GetGrafanaAlertmanagerConfig(ctx context.Context) (*UserGrafana
return gc, nil return gc, nil
} }
func (mc *Mimir) CreateGrafanaAlertmanagerConfig(ctx context.Context, c, hash string, id, created int64, d bool) error { func (mc *Mimir) CreateGrafanaAlertmanagerConfig(ctx context.Context, cfg, hash string, id, createdAt int64, isDefault bool) error {
payload, err := json.Marshal(&UserGrafanaConfig{ payload, err := json.Marshal(&UserGrafanaConfig{
ID: id, ID: id,
GrafanaAlertmanagerConfig: c, GrafanaAlertmanagerConfig: cfg,
Hash: hash, Hash: hash,
CreatedAt: created, CreatedAt: createdAt,
Default: d, Default: isDefault,
}) })
if err != nil { if err != nil {
return err return err

View File

@ -9,7 +9,7 @@ import (
) )
const ( const (
grafanaAlertmanagerStatePath = "/grafana/state" grafanaAlertmanagerStatePath = "/api/v1/grafana/state"
) )
type UserGrafanaState struct { type UserGrafanaState struct {

View File

@ -17,11 +17,11 @@ import (
// MimirClient contains all the methods to query the migration critical endpoints of Mimir instance, it's an interface to allow multiple implementations. // MimirClient contains all the methods to query the migration critical endpoints of Mimir instance, it's an interface to allow multiple implementations.
type MimirClient interface { type MimirClient interface {
GetGrafanaAlertmanagerState(ctx context.Context) (*UserGrafanaState, error) GetGrafanaAlertmanagerState(ctx context.Context) (*UserGrafanaState, error)
CreateGrafanaAlertmanagerState(ctx context.Context, s string) error CreateGrafanaAlertmanagerState(ctx context.Context, state string) error
DeleteGrafanaAlertmanagerState(ctx context.Context) error DeleteGrafanaAlertmanagerState(ctx context.Context) error
GetGrafanaAlertmanagerConfig(ctx context.Context) (*UserGrafanaConfig, error) GetGrafanaAlertmanagerConfig(ctx context.Context) (*UserGrafanaConfig, error)
CreateGrafanaAlertmanagerConfig(ctx context.Context, configuration string, hash string, id int64, at int64, d bool) error CreateGrafanaAlertmanagerConfig(ctx context.Context, configuration, hash string, id, updatedAt int64, isDefault bool) error
DeleteGrafanaAlertmanagerConfig(ctx context.Context) error DeleteGrafanaAlertmanagerConfig(ctx context.Context) error
} }