Alerting: Stop persisting silences and nflog to disk (#84706)

With this change, we no longer need to persist silence/nflog states to disk in addition to the kvstore
This commit is contained in:
Matthew Jacobson 2024-03-22 18:37:33 -04:00 committed by GitHub
parent 48de8657c9
commit 0c3c5c5607
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 345 additions and 350 deletions

10
go.mod
View File

@ -47,7 +47,7 @@ require (
github.com/google/uuid v1.6.0 // @grafana/backend-platform
github.com/google/wire v0.5.0 // @grafana/backend-platform
github.com/gorilla/websocket v1.5.0 // @grafana/grafana-app-platform-squad
github.com/grafana/alerting v0.0.0-20240320131513-a4bb859cccf2 // @grafana/alerting-squad-backend
github.com/grafana/alerting v0.0.0-20240322221449-89ae4e299bf8 // @grafana/alerting-squad-backend
github.com/grafana/cuetsy v0.1.11 // @grafana/grafana-as-code
github.com/grafana/grafana-aws-sdk v0.25.0 // @grafana/aws-datasources
github.com/grafana/grafana-azure-sdk-go/v2 v2.0.1 // @grafana/partner-datasources
@ -67,7 +67,7 @@ require (
github.com/magefile/mage v1.15.0 // @grafana/grafana-release-guild
github.com/mattn/go-isatty v0.0.19 // @grafana/backend-platform
github.com/mattn/go-sqlite3 v1.14.19 // @grafana/backend-platform
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect; @grafana/alerting-squad-backend
github.com/matttproud/golang_protobuf_extensions v1.0.4 // @grafana/alerting-squad-backend
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // @grafana/grafana-operator-experience-squad
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // @grafana/alerting-squad-backend
@ -109,7 +109,7 @@ require (
gopkg.in/mail.v2 v2.3.1 // @grafana/backend-platform
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // @grafana/alerting-squad-backend
xorm.io/builder v0.3.6 // @grafana/backend-platform
xorm.io/builder v0.3.6 // indirect; @grafana/backend-platform
xorm.io/core v0.7.3 // @grafana/backend-platform
xorm.io/xorm v0.8.2 // @grafana/alerting-squad-backend
)
@ -161,7 +161,7 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.1-0.20191002090509-6af20e3a5340 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-msgpack v0.5.5 // indirect
github.com/hashicorp/go-multierror v1.1.1 // @grafana/alerting-squad
github.com/hashicorp/go-multierror v1.1.1 // indirect; @grafana/alerting-squad
github.com/hashicorp/go-sockaddr v1.0.6 // indirect
github.com/hashicorp/golang-lru v0.6.0 // indirect
github.com/hashicorp/yamux v0.1.1 // indirect
@ -317,7 +317,7 @@ require (
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/grafana/regexp v0.0.0-20221123153739-15dc172cd2db // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // @grafana/alerting-squad-backend
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect; @grafana/alerting-squad-backend
github.com/hashicorp/memberlist v0.5.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/invopop/yaml v0.2.0 // indirect

6
go.sum
View File

@ -2159,10 +2159,8 @@ github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grafana/alerting v0.0.0-20240306130925-bc622368256d h1:YxLsj/C75sW90gzYK27XEaJ1sL89lYxuntmHaytFP80=
github.com/grafana/alerting v0.0.0-20240306130925-bc622368256d/go.mod h1:0nHKO0w8OTemvZ3eh7+s1EqGGhgbs0kvkTeLU1FrbTw=
github.com/grafana/alerting v0.0.0-20240320131513-a4bb859cccf2 h1:Rwo/FbZQEjM6TmZ7wRr12hoNlCAyuemPtSe1Whap6Eo=
github.com/grafana/alerting v0.0.0-20240320131513-a4bb859cccf2/go.mod h1:0nHKO0w8OTemvZ3eh7+s1EqGGhgbs0kvkTeLU1FrbTw=
github.com/grafana/alerting v0.0.0-20240322221449-89ae4e299bf8 h1:ndBSFAHmJRWqln2uNys7lV0+9U8tlW6ZuNz8ETW60Us=
github.com/grafana/alerting v0.0.0-20240322221449-89ae4e299bf8/go.mod h1:0nHKO0w8OTemvZ3eh7+s1EqGGhgbs0kvkTeLU1FrbTw=
github.com/grafana/codejen v0.0.3 h1:tAWxoTUuhgmEqxJPOLtJoxlPBbMULFwKFOcRsPRPXDw=
github.com/grafana/codejen v0.0.3/go.mod h1:zmwwM/DRyQB7pfuBjTWII3CWtxcXh8LTwAYGfDfpR6s=
github.com/grafana/cue v0.0.0-20230926092038-971951014e3f h1:TmYAMnqg3d5KYEAaT6PtTguL2GjLfvr6wnAX8Azw6tQ=

View File

@ -544,7 +544,5 @@ func createRemoteAlertmanager(orgID int64, amCfg setting.RemoteAlertmanagerSetti
TenantID: amCfg.TenantID,
BasicAuthPassword: amCfg.Password,
}
// We won't be handling files on disk, we can pass an empty string as workingDirPath.
stateStore := notifier.NewFileStore(orgID, kvstore, "")
return remote.NewAlertmanager(externalAMCfg, stateStore, decryptFn, m)
return remote.NewAlertmanager(externalAMCfg, notifier.NewFileStore(orgID, kvstore), decryptFn, m)
}

View File

@ -6,7 +6,6 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"path/filepath"
"strconv"
"time"
@ -17,7 +16,6 @@ import (
amv2 "github.com/prometheus/alertmanager/api/v2/models"
"github.com/grafana/grafana/pkg/infra/kvstore"
"github.com/grafana/grafana/pkg/infra/log"
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
@ -28,10 +26,6 @@ import (
)
const (
NotificationLogFilename = "notifications"
SilencesFilename = "silences"
workingDir = "alerting"
// maintenanceNotificationAndSilences how often should we flush and garbage collect notifications
notificationLogMaintenanceInterval = 15 * time.Minute
)
@ -46,6 +40,13 @@ type AlertingStore interface {
autogenRuleStore
}
type stateStore interface {
SaveSilences(ctx context.Context, st alertingNotify.State) (int64, error)
SaveNotificationLog(ctx context.Context, st alertingNotify.State) (int64, error)
GetSilences(ctx context.Context) (string, error)
GetNotificationLog(ctx context.Context) (string, error)
}
type alertmanager struct {
Base *alertingNotify.GrafanaAlertmanager
logger log.Logger
@ -53,7 +54,7 @@ type alertmanager struct {
ConfigMetrics *metrics.AlertmanagerConfigMetrics
Settings *setting.Cfg
Store AlertingStore
fileStore *FileStore
stateStore stateStore
NotificationService notifications.Service
decryptFn alertingNotify.GetDecryptedValueFn
@ -65,14 +66,16 @@ type alertmanager struct {
// maintenanceOptions represent the options for components that need maintenance on a frequency within the Alertmanager.
// It implements the alerting.MaintenanceOptions interface.
type maintenanceOptions struct {
filepath string
initialState string
retention time.Duration
maintenanceFrequency time.Duration
maintenanceFunc func(alertingNotify.State) (int64, error)
}
func (m maintenanceOptions) Filepath() string {
return m.filepath
var _ alertingNotify.MaintenanceOptions = maintenanceOptions{}
func (m maintenanceOptions) InitialState() string {
return m.initialState
}
func (m maintenanceOptions) Retention() time.Duration {
@ -87,38 +90,35 @@ func (m maintenanceOptions) MaintenanceFunc(state alertingNotify.State) (int64,
return m.maintenanceFunc(state)
}
func NewAlertmanager(ctx context.Context, orgID int64, cfg *setting.Cfg, store AlertingStore, kvStore kvstore.KVStore,
func NewAlertmanager(ctx context.Context, orgID int64, cfg *setting.Cfg, store AlertingStore, stateStore stateStore,
peer alertingNotify.ClusterPeer, decryptFn alertingNotify.GetDecryptedValueFn, ns notifications.Service,
m *metrics.Alertmanager, withAutogen bool) (*alertmanager, error) {
workingPath := filepath.Join(cfg.DataPath, workingDir, strconv.Itoa(int(orgID)))
fileStore := NewFileStore(orgID, kvStore, workingPath)
nflogFilepath, err := fileStore.FilepathFor(ctx, NotificationLogFilename)
nflog, err := stateStore.GetNotificationLog(ctx)
if err != nil {
return nil, err
}
silencesFilepath, err := fileStore.FilepathFor(ctx, SilencesFilename)
silences, err := stateStore.GetSilences(ctx)
if err != nil {
return nil, err
}
silencesOptions := maintenanceOptions{
filepath: silencesFilepath,
initialState: silences,
retention: retentionNotificationsAndSilences,
maintenanceFrequency: silenceMaintenanceInterval,
maintenanceFunc: func(state alertingNotify.State) (int64, error) {
// Detached context here is to make sure that when the service is shut down the persist operation is executed.
return fileStore.Persist(context.Background(), SilencesFilename, state)
return stateStore.SaveSilences(context.Background(), state)
},
}
nflogOptions := maintenanceOptions{
filepath: nflogFilepath,
initialState: nflog,
retention: retentionNotificationsAndSilences,
maintenanceFrequency: notificationLogMaintenanceInterval,
maintenanceFunc: func(state alertingNotify.State) (int64, error) {
// Detached context here is to make sure that when the service is shut down the persist operation is executed.
return fileStore.Persist(context.Background(), NotificationLogFilename, state)
return stateStore.SaveNotificationLog(context.Background(), state)
},
}
@ -144,7 +144,7 @@ func NewAlertmanager(ctx context.Context, orgID int64, cfg *setting.Cfg, store A
NotificationService: ns,
orgID: orgID,
decryptFn: decryptFn,
fileStore: fileStore,
stateStore: stateStore,
logger: l,
// TODO: Preferably, logic around autogen would be outside of the specific alertmanager implementation so that remote alertmanager will get it for free.
@ -418,10 +418,8 @@ func (am *alertmanager) PutAlerts(_ context.Context, postableAlerts apimodels.Po
return am.Base.PutAlerts(alerts)
}
// CleanUp removes the directory containing the alertmanager files from disk.
func (am *alertmanager) CleanUp() {
am.fileStore.CleanUp()
}
// CleanUp no-ops as no files are stored on disk.
func (am *alertmanager) CleanUp() {}
// AlertValidationError is the error capturing the validation errors
// faced on the alerts.

View File

@ -46,7 +46,11 @@ func setupAMTest(t *testing.T) *alertmanager {
kvStore := fakes.NewFakeKVStore(t)
secretsService := secretsManager.SetupTestService(t, database.ProvideSecretsStore(sqlStore))
decryptFn := secretsService.GetDecryptedValue
am, err := NewAlertmanager(context.Background(), 1, cfg, s, kvStore, &NilPeer{}, decryptFn, nil, m, false)
orgID := 1
stateStore := NewFileStore(int64(orgID), kvStore)
am, err := NewAlertmanager(context.Background(), 1, cfg, s, stateStore, &NilPeer{}, decryptFn, nil, m, false)
require.NoError(t, err)
return am
}

View File

@ -4,50 +4,55 @@ import (
"context"
"encoding/base64"
"fmt"
"os"
"path/filepath"
alertingClusterPB "github.com/grafana/alerting/cluster/clusterpb"
alertingNotify "github.com/grafana/alerting/notify"
"github.com/grafana/grafana/pkg/infra/kvstore"
"github.com/grafana/grafana/pkg/infra/log"
)
const KVNamespace = "alertmanager"
const (
KVNamespace = "alertmanager"
NotificationLogFilename = "notifications"
SilencesFilename = "silences"
)
// FileStore is in charge of persisting the alertmanager files to the database.
// It uses the KVstore table and encodes the files as a base64 string.
type FileStore struct {
kv *kvstore.NamespacedKVStore
orgID int64
workingDirPath string
logger log.Logger
kv *kvstore.NamespacedKVStore
orgID int64
logger log.Logger
}
func NewFileStore(orgID int64, store kvstore.KVStore, workingDirPath string) *FileStore {
func NewFileStore(orgID int64, store kvstore.KVStore) *FileStore {
return &FileStore{
workingDirPath: workingDirPath,
orgID: orgID,
kv: kvstore.WithNamespace(store, orgID, KVNamespace),
logger: log.New("ngalert.notifier.alertmanager.file_store", orgID),
orgID: orgID,
kv: kvstore.WithNamespace(store, orgID, KVNamespace),
logger: log.New("ngalert.notifier.alertmanager.file_store", orgID),
}
}
// FilepathFor returns the filepath to an Alertmanager file.
// If the file is already present on disk it no-ops.
// If not, it tries to read the database and if there's no file it no-ops.
// If there is a file in the database, it decodes it and writes to disk for Alertmanager consumption.
func (fileStore *FileStore) FilepathFor(ctx context.Context, filename string) (string, error) {
// GetSilences returns the content of the silences file from kvstore.
func (fileStore *FileStore) GetSilences(ctx context.Context) (string, error) {
return fileStore.contentFor(ctx, SilencesFilename)
}
func (fileStore *FileStore) GetNotificationLog(ctx context.Context) (string, error) {
return fileStore.contentFor(ctx, NotificationLogFilename)
}
// contentFor returns the content for the given Alertmanager kvstore key.
func (fileStore *FileStore) contentFor(ctx context.Context, filename string) (string, error) {
// Then, let's attempt to read it from the database.
content, exists, err := fileStore.kv.Get(ctx, filename)
if err != nil {
return "", fmt.Errorf("error reading file '%s' from database: %w", filename, err)
}
// if it doesn't exist, let's no-op and let the Alertmanager create one. We'll eventually save it to the database.
// File doesn't exist, Alertmanager will eventually save it to the database.
if !exists {
return fileStore.pathFor(filename), nil
return "", nil
}
// If we have a file stored in the database, let's decode it and write it to disk to perform that initial load to memory.
@ -56,54 +61,21 @@ func (fileStore *FileStore) FilepathFor(ctx context.Context, filename string) (s
return "", fmt.Errorf("error decoding file '%s': %w", filename, err)
}
if err := fileStore.WriteFileToDisk(filename, bytes); err != nil {
return "", fmt.Errorf("error writing file %s: %w", filename, err)
}
return fileStore.pathFor(filename), err
return string(bytes), err
}
// GetFullState receives a list of keys, looks for the corresponding values in the kvstore,
// and returns a base64-encoded protobuf message containing those key-value pairs.
// That base64-encoded string represents the Alertmanager's internal state.
func (fileStore *FileStore) GetFullState(ctx context.Context, filenames ...string) (string, error) {
all, err := fileStore.kv.GetAll(ctx)
if err != nil {
return "", err
}
keys, ok := all[fileStore.orgID]
if !ok {
return "", fmt.Errorf("no values for org %d", fileStore.orgID)
}
var parts []alertingClusterPB.Part
for _, f := range filenames {
v, ok := keys[f]
if !ok {
return "", fmt.Errorf("no value found for key %q", f)
}
b, err := decode(v)
if err != nil {
return "", fmt.Errorf("error decoding value for key %q", f)
}
parts = append(parts, alertingClusterPB.Part{Key: f, Data: b})
}
fs := alertingClusterPB.FullState{
Parts: parts,
}
b, err := fs.Marshal()
if err != nil {
return "", fmt.Errorf("error marshaling full state: %w", err)
}
return encode(b), nil
// SaveSilences saves the silences to the database and returns the size of the unencoded state.
func (fileStore *FileStore) SaveSilences(ctx context.Context, st alertingNotify.State) (int64, error) {
return fileStore.persist(ctx, SilencesFilename, st)
}
// Persist takes care of persisting the binary representation of internal state to the database as a base64 encoded string.
func (fileStore *FileStore) Persist(ctx context.Context, filename string, st alertingNotify.State) (int64, error) {
// SaveNotificationLog saves the notification log to the database and returns the size of the unencoded state.
func (fileStore *FileStore) SaveNotificationLog(ctx context.Context, st alertingNotify.State) (int64, error) {
return fileStore.persist(ctx, NotificationLogFilename, st)
}
// persist takes care of persisting the binary representation of internal state to the database as a base64 encoded string.
func (fileStore *FileStore) persist(ctx context.Context, filename string, st alertingNotify.State) (int64, error) {
var size int64
bytes, err := st.MarshalBinary()
@ -118,31 +90,6 @@ func (fileStore *FileStore) Persist(ctx context.Context, filename string, st ale
return int64(len(bytes)), err
}
// WriteFileToDisk writes a file with the provided name and contents to the Alertmanager working directory with the default grafana permission.
func (fileStore *FileStore) WriteFileToDisk(fn string, content []byte) error {
// Ensure the working directory is created
err := os.MkdirAll(fileStore.workingDirPath, 0750)
if err != nil {
return fmt.Errorf("unable to create the working directory %q: %s", fileStore.workingDirPath, err)
}
return os.WriteFile(fileStore.pathFor(fn), content, 0644)
}
// CleanUp will remove the working directory from disk.
func (fileStore *FileStore) CleanUp() {
if err := os.RemoveAll(fileStore.workingDirPath); err != nil {
fileStore.logger.Warn("Unable to delete the local working directory", "dir", fileStore.workingDirPath,
"error", err)
return
}
fileStore.logger.Info("Successfully deleted working directory", "dir", fileStore.workingDirPath)
}
func (fileStore *FileStore) pathFor(fn string) string {
return filepath.Join(fileStore.workingDirPath, fn)
}
func decode(s string) ([]byte, error) {
return base64.StdEncoding.DecodeString(s)
}

View File

@ -3,142 +3,106 @@ package notifier
import (
"context"
"encoding/base64"
"os"
"path/filepath"
"strings"
"testing"
"time"
"github.com/prometheus/alertmanager/cluster/clusterpb"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/services/ngalert/tests/fakes"
)
func TestFileStore_FilepathFor_DirectoryNotExist(t *testing.T) {
func TestFileStore_Silences(t *testing.T) {
store := fakes.NewFakeKVStore(t)
workingDir := filepath.Join(t.TempDir(), "notexistdir")
fs := NewFileStore(1, store, workingDir)
filekey := "silences"
filePath := filepath.Join(workingDir, filekey)
// With a file already on the database and the path does not exist yet, it creates the path,
// writes the file to disk, then returns the filepath.
{
require.NoError(t, store.Set(context.Background(), 1, KVNamespace, filekey, encode([]byte("silence1,silence3"))))
r, err := fs.FilepathFor(context.Background(), filekey)
require.NoError(t, err)
require.Equal(t, filePath, r)
f, err := os.ReadFile(filepath.Clean(filePath))
require.NoError(t, err)
require.Equal(t, "silence1,silence3", string(f))
require.NoError(t, os.Remove(filePath))
require.NoError(t, store.Del(context.Background(), 1, KVNamespace, filekey))
}
}
func TestFileStore_FilepathFor(t *testing.T) {
store := fakes.NewFakeKVStore(t)
workingDir := t.TempDir()
fs := NewFileStore(1, store, workingDir)
filekey := "silences"
filePath := filepath.Join(workingDir, filekey)
// With a file already on disk, it returns the existing file's filepath and no modification to the original file.
{
require.NoError(t, os.WriteFile(filePath, []byte("silence1,silence2"), 0644))
r, err := fs.FilepathFor(context.Background(), filekey)
require.NoError(t, err)
require.Equal(t, filePath, r)
f, err := os.ReadFile(filepath.Clean(filePath))
require.NoError(t, err)
require.Equal(t, "silence1,silence2", string(f))
require.NoError(t, os.Remove(filePath))
}
// With a file already on the database, it writes the file to disk and returns the filepath.
{
require.NoError(t, store.Set(context.Background(), 1, KVNamespace, filekey, encode([]byte("silence1,silence3"))))
r, err := fs.FilepathFor(context.Background(), filekey)
require.NoError(t, err)
require.Equal(t, filePath, r)
f, err := os.ReadFile(filepath.Clean(filePath))
require.NoError(t, err)
require.Equal(t, "silence1,silence3", string(f))
require.NoError(t, os.Remove(filePath))
require.NoError(t, store.Del(context.Background(), 1, KVNamespace, filekey))
}
// With no file on disk or database, it returns the original filepath.
{
r, err := fs.FilepathFor(context.Background(), filekey)
require.NoError(t, err)
require.Equal(t, filePath, r)
_, err = os.ReadFile(filepath.Clean(filePath))
require.Error(t, err)
}
}
func TestFileStore_GetFullState(t *testing.T) {
ctx := context.Background()
var orgId int64 = 1
t.Run("empty store", func(tt *testing.T) {
store := fakes.NewFakeKVStore(t)
fs := NewFileStore(1, store, workingDir)
_, err := fs.GetFullState(ctx, "silences", "notifications")
require.NotNil(tt, err)
require.Equal(tt, "no values for org 1", err.Error())
})
// Initialize kvstore.
now := time.Now()
oneHour := now.Add(time.Hour)
initialState := silenceState{
"1": createSilence("1", now, oneHour),
"2": createSilence("2", now, oneHour),
}
decodedState, err := initialState.MarshalBinary()
require.NoError(t, err)
encodedState := base64.StdEncoding.EncodeToString(decodedState)
err = store.Set(ctx, orgId, KVNamespace, SilencesFilename, encodedState)
require.NoError(t, err)
t.Run("no values for key", func(tt *testing.T) {
store := fakes.NewFakeKVStore(t)
require.NoError(t, store.Set(ctx, 1, "alertmanager", "test-key", "test-value"))
fs := NewFileStore(1, store, workingDir)
_, err := fs.GetFullState(ctx, "silences")
require.NotNil(tt, err)
require.Equal(tt, "no value found for key \"silences\"", err.Error())
})
fs := NewFileStore(orgId, store)
t.Run("non-empty values", func(tt *testing.T) {
store := fakes.NewFakeKVStore(t)
silences := []byte("test-silences")
nflog := []byte("test-notifications")
require.NoError(t, store.Set(ctx, 1, "alertmanager", "silences", base64.StdEncoding.EncodeToString(silences)))
require.NoError(t, store.Set(ctx, 1, "alertmanager", "notifications", base64.StdEncoding.EncodeToString(nflog)))
// Load initial.
silences, err := fs.GetSilences(ctx)
require.NoError(t, err)
decoded, err := decodeSilenceState(strings.NewReader(silences))
require.NoError(t, err)
if !cmp.Equal(initialState, decoded) {
t.Errorf("Unexpected Diff: %v", cmp.Diff(initialState, decoded))
}
state := clusterpb.FullState{
Parts: []clusterpb.Part{
{Key: "silences", Data: silences},
{Key: "notifications", Data: nflog},
},
}
b, err := state.Marshal()
require.NoError(t, err)
// Save new.
newState := silenceState{
"a": createSilence("a", now, oneHour),
"b": createSilence("b", now, oneHour),
}
size, err := fs.SaveSilences(ctx, newState)
require.NoError(t, err)
require.EqualValues(t, len(decodedState), size)
encodedFullState := base64.StdEncoding.EncodeToString(b)
fs := NewFileStore(1, store, workingDir)
got, err := fs.GetFullState(ctx, "silences", "notifications")
require.NoError(t, err)
require.Equal(t, encodedFullState, got)
})
// Load new.
silences, err = fs.GetSilences(ctx)
require.NoError(t, err)
decoded, err = decodeSilenceState(strings.NewReader(silences))
require.NoError(t, err)
if !cmp.Equal(newState, decoded) {
t.Errorf("Unexpected Diff: %v", cmp.Diff(newState, decoded))
}
}
func TestFileStore_Persist(t *testing.T) {
func TestFileStore_NotificationLog(t *testing.T) {
store := fakes.NewFakeKVStore(t)
state := &fakeState{data: "something to marshal"}
workingDir := t.TempDir()
fs := NewFileStore(1, store, workingDir)
filekey := "silences"
ctx := context.Background()
var orgId int64 = 1
size, err := fs.Persist(context.Background(), filekey, state)
// Initialize kvstore.
now := time.Now()
oneHour := now.Add(time.Hour)
k1, v1 := createNotificationLog("group1", "receiver1", now, oneHour)
k2, v2 := createNotificationLog("group2", "receiver2", now, oneHour)
initialState := nflogState{k1: v1, k2: v2}
decodedState, err := initialState.MarshalBinary()
require.NoError(t, err)
require.Equal(t, int64(20), size)
store.Mtx.Lock()
require.Len(t, store.Store, 1)
store.Mtx.Unlock()
v, ok, err := store.Get(context.Background(), 1, KVNamespace, filekey)
encodedState := base64.StdEncoding.EncodeToString(decodedState)
err = store.Set(ctx, orgId, KVNamespace, NotificationLogFilename, encodedState)
require.NoError(t, err)
require.True(t, ok)
b, err := decode(v)
fs := NewFileStore(orgId, store)
// Load initial.
nflog, err := fs.GetNotificationLog(ctx)
require.NoError(t, err)
require.Equal(t, "something to marshal", string(b))
decoded, err := decodeNflogState(strings.NewReader(nflog))
require.NoError(t, err)
if !cmp.Equal(initialState, decoded) {
t.Errorf("Unexpected Diff: %v", cmp.Diff(initialState, decoded))
}
// Save new.
k1, v1 = createNotificationLog("groupA", "receiverA", now, oneHour)
k2, v2 = createNotificationLog("groupB", "receiverB", now, oneHour)
newState := nflogState{k1: v1, k2: v2}
size, err := fs.SaveNotificationLog(ctx, newState)
require.NoError(t, err)
require.EqualValues(t, len(decodedState), size)
// Load new.
nflog, err = fs.GetNotificationLog(ctx)
require.NoError(t, err)
decoded, err = decodeNflogState(strings.NewReader(nflog))
require.NoError(t, err)
if !cmp.Equal(newState, decoded) {
t.Errorf("Unexpected Diff: %v", cmp.Diff(newState, decoded))
}
}

View File

@ -3,9 +3,6 @@ package notifier
import (
"context"
"fmt"
"os"
"path/filepath"
"strconv"
"sync"
"time"
@ -135,7 +132,8 @@ func NewMultiOrgAlertmanager(
// Set up the default per tenant Alertmanager factory.
moa.factory = func(ctx context.Context, orgID int64) (Alertmanager, error) {
m := metrics.NewAlertmanagerMetrics(moa.metrics.GetOrCreateOrgRegistry(orgID))
return NewAlertmanager(ctx, orgID, moa.settings, moa.configStore, moa.kvStore, moa.peer, moa.decryptFn, moa.ns, m, featureManager.IsEnabled(ctx, featuremgmt.FlagAlertingSimplifiedRouting))
stateStore := NewFileStore(orgID, kvStore)
return NewAlertmanager(ctx, orgID, moa.settings, moa.configStore, stateStore, moa.peer, moa.decryptFn, moa.ns, m, featureManager.IsEnabled(ctx, featuremgmt.FlagAlertingSimplifiedRouting))
}
for _, opt := range opts {
@ -331,45 +329,14 @@ func (moa *MultiOrgAlertmanager) SyncAlertmanagersForOrgs(ctx context.Context, o
am.CleanUp()
}
// We look for orphan directories and remove them. Orphan directories can
// occur when an organization is deleted and the node running Grafana is
// shutdown before the next sync is executed.
moa.cleanupOrphanLocalOrgState(ctx, orgsFound)
}
// cleanupOrphanLocalOrgState will check if there is any organization on
// disk that is not part of the active organizations. If this is the case
// it will delete the local state from disk.
// cleanupOrphanLocalOrgState will remove all orphaned nflog and silence states in kvstore by existing to currently
// active organizations. The original intention for this was the cleanup deleted orgs, that have had their states
// saved to the kvstore after deletion on instance shutdown.
func (moa *MultiOrgAlertmanager) cleanupOrphanLocalOrgState(ctx context.Context,
activeOrganizations map[int64]struct{}) {
dataDir := filepath.Join(moa.settings.DataPath, workingDir)
files, err := os.ReadDir(dataDir)
if err != nil {
moa.logger.Error("Failed to list local working directory", "dir", dataDir, "error", err)
return
}
for _, file := range files {
if !file.IsDir() {
moa.logger.Warn("Ignoring unexpected file while scanning local working directory", "filename", filepath.Join(dataDir, file.Name()))
continue
}
orgID, err := strconv.ParseInt(file.Name(), 10, 64)
if err != nil {
moa.logger.Error("Unable to parse orgID from directory name", "name", file.Name(), "error", err)
continue
}
_, exists := activeOrganizations[orgID]
if !exists {
moa.logger.Info("Found orphan organization directory", "orgID", orgID)
workingDirPath := filepath.Join(dataDir, strconv.FormatInt(orgID, 10))
fileStore := NewFileStore(orgID, moa.kvStore, workingDirPath)
// Clean up all the remaining resources from this alertmanager.
fileStore.CleanUp()
}
}
// Remove all orphaned items from kvstore by listing all existing items
// in our used namespace and comparing them to the currently active
// organizations.
storedFiles := []string{NotificationLogFilename, SilencesFilename}
for _, fileName := range storedFiles {
keys, err := moa.kvStore.Keys(ctx, kvstore.AllOrganizations, KVNamespace, fileName)

View File

@ -8,6 +8,9 @@ import (
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
@ -19,8 +22,6 @@ import (
"github.com/grafana/grafana/pkg/services/secrets/fakes"
secretsManager "github.com/grafana/grafana/pkg/services/secrets/manager"
"github.com/grafana/grafana/pkg/setting"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)
func TestMultiorgAlertmanager_RemoteSecondaryMode(t *testing.T) {
@ -63,10 +64,8 @@ func TestMultiorgAlertmanager_RemoteSecondaryMode(t *testing.T) {
TenantID: tenantID,
BasicAuthPassword: password,
}
// We won't be handling files on disk, we can pass an empty string as workingDirPath.
stateStore := notifier.NewFileStore(orgID, kvStore, "")
m := metrics.NewRemoteAlertmanagerMetrics(prometheus.NewRegistry())
remoteAM, err := remote.NewAlertmanager(externalAMCfg, stateStore, secretsService.Decrypt, m)
remoteAM, err := remote.NewAlertmanager(externalAMCfg, notifier.NewFileStore(orgID, kvStore), secretsService.Decrypt, m)
require.NoError(t, err)
// Use both Alertmanager implementations in the forked Alertmanager.

View File

@ -3,10 +3,6 @@ package notifier
import (
"bytes"
"context"
"errors"
"io/fs"
"os"
"path/filepath"
"testing"
"time"
@ -108,29 +104,8 @@ grafana_alerting_discovered_configurations 4
// Orphaned state should be removed.
{
orgID := int64(6)
// First we create a directory and two files for an ograniztation that
// is not existing in the current state.
orphanDir := filepath.Join(tmpDir, "alerting", "6")
err := os.Mkdir(orphanDir, 0750)
require.NoError(t, err)
silencesPath := filepath.Join(orphanDir, SilencesFilename)
err = os.WriteFile(silencesPath, []byte("file_1"), 0644)
require.NoError(t, err)
notificationPath := filepath.Join(orphanDir, NotificationLogFilename)
err = os.WriteFile(notificationPath, []byte("file_2"), 0644)
require.NoError(t, err)
// We make sure that both files are on disk.
info, err := os.Stat(silencesPath)
require.NoError(t, err)
require.Equal(t, info.Name(), SilencesFilename)
info, err = os.Stat(notificationPath)
require.NoError(t, err)
require.Equal(t, info.Name(), NotificationLogFilename)
// We also populate the kvstore with orphaned records.
// Populate the kvstore with orphaned records.
err = kvStore.Set(ctx, orgID, KVNamespace, SilencesFilename, "file_1")
require.NoError(t, err)
@ -140,10 +115,6 @@ grafana_alerting_discovered_configurations 4
// Now re run the sync job once.
require.NoError(t, mam.LoadAndSyncAlertmanagersForOrgs(ctx))
// The organization directory should be gone by now.
_, err = os.Stat(orphanDir)
require.True(t, errors.Is(err, fs.ErrNotExist))
// The organization kvstore records should be gone by now.
_, exists, _ := kvStore.Get(ctx, orgID, KVNamespace, SilencesFilename)
require.False(t, exists)

View File

@ -1,13 +1,20 @@
package notifier
import (
"bytes"
"context"
"crypto/md5"
"errors"
"fmt"
"io"
"testing"
"time"
"github.com/matttproud/golang_protobuf_extensions/pbutil"
"github.com/prometheus/alertmanager/nflog/nflogpb"
"github.com/prometheus/alertmanager/silence/silencepb"
"github.com/prometheus/common/model"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/store"
@ -221,17 +228,132 @@ func (f *FakeOrgStore) GetOrgs(_ context.Context) ([]int64, error) {
return f.orgs, nil
}
type fakeState struct {
data string
}
func (fs *fakeState) MarshalBinary() ([]byte, error) {
return []byte(fs.data), nil
}
type NoValidation struct {
}
func (n NoValidation) Validate(_ models.NotificationSettings) error {
return nil
}
var errInvalidState = fmt.Errorf("invalid state")
// silenceState copied from state in prometheus-alertmanager/silence/silence.go.
type silenceState map[string]*silencepb.MeshSilence
// MarshalBinary copied from prometheus-alertmanager/silence/silence.go.
func (s silenceState) MarshalBinary() ([]byte, error) {
var buf bytes.Buffer
for _, e := range s {
if _, err := pbutil.WriteDelimited(&buf, e); err != nil {
return nil, err
}
}
return buf.Bytes(), nil
}
// decodeSilenceState copied from decodeState in prometheus-alertmanager/silence/silence.go.
func decodeSilenceState(r io.Reader) (silenceState, error) {
st := silenceState{}
for {
var s silencepb.MeshSilence
_, err := pbutil.ReadDelimited(r, &s)
if err == nil {
if s.Silence == nil {
return nil, errInvalidState
}
st[s.Silence.Id] = &s
continue
}
//nolint:errorlint
if err == io.EOF {
break
}
return nil, err
}
return st, nil
}
func createSilence(id string, startsAt, expiresAt time.Time) *silencepb.MeshSilence {
return &silencepb.MeshSilence{
Silence: &silencepb.Silence{
Id: id,
Matchers: []*silencepb.Matcher{
{
Type: silencepb.Matcher_EQUAL,
Name: model.AlertNameLabel,
Pattern: "test_alert",
},
{
Type: silencepb.Matcher_EQUAL,
Name: models.FolderTitleLabel,
Pattern: "test_alert_folder",
},
},
StartsAt: startsAt,
EndsAt: expiresAt,
CreatedBy: "Grafana Test",
Comment: "Test Silence",
},
ExpiresAt: expiresAt,
}
}
// receiverKey copied from prometheus-alertmanager/nflog/nflog.go.
func receiverKey(r *nflogpb.Receiver) string {
return fmt.Sprintf("%s/%s/%d", r.GroupName, r.Integration, r.Idx)
}
// stateKey copied from prometheus-alertmanager/nflog/nflog.go.
func stateKey(k string, r *nflogpb.Receiver) string {
return fmt.Sprintf("%s:%s", k, receiverKey(r))
}
// nflogState copied from state in prometheus-alertmanager/nflog/nflog.go.
type nflogState map[string]*nflogpb.MeshEntry
// MarshalBinary copied from prometheus-alertmanager/nflog/nflog.go.
func (s nflogState) MarshalBinary() ([]byte, error) {
var buf bytes.Buffer
for _, e := range s {
if _, err := pbutil.WriteDelimited(&buf, e); err != nil {
return nil, err
}
}
return buf.Bytes(), nil
}
// decodeNflogState copied from decodeState in prometheus-alertmanager/nflog/nflog.go.
func decodeNflogState(r io.Reader) (nflogState, error) {
st := nflogState{}
for {
var e nflogpb.MeshEntry
_, err := pbutil.ReadDelimited(r, &e)
if err == nil {
if e.Entry == nil || e.Entry.Receiver == nil {
return nil, errInvalidState
}
st[stateKey(string(e.Entry.GroupKey), e.Entry.Receiver)] = &e
continue
}
if errors.Is(err, io.EOF) {
break
}
return nil, err
}
return st, nil
}
func createNotificationLog(groupKey string, receiverName string, sentAt, expiresAt time.Time) (string, *nflogpb.MeshEntry) {
recv := nflogpb.Receiver{GroupName: receiverName, Integration: "test3", Idx: 0}
return stateKey(groupKey, &recv), &nflogpb.MeshEntry{
Entry: &nflogpb.Entry{
GroupKey: []byte(groupKey),
Receiver: &recv,
Resolved: false,
Timestamp: sentAt,
},
ExpiresAt: expiresAt,
}
}

View File

@ -3,12 +3,18 @@ package remote
import (
"context"
"crypto/md5"
"encoding/base64"
"encoding/json"
"fmt"
"net/http"
"net/url"
"github.com/go-openapi/strfmt"
amalert "github.com/prometheus/alertmanager/api/v2/client/alert"
amalertgroup "github.com/prometheus/alertmanager/api/v2/client/alertgroup"
amreceiver "github.com/prometheus/alertmanager/api/v2/client/receiver"
amsilence "github.com/prometheus/alertmanager/api/v2/client/silence"
"github.com/grafana/grafana/pkg/infra/log"
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
@ -16,14 +22,13 @@ import (
"github.com/grafana/grafana/pkg/services/ngalert/notifier"
remoteClient "github.com/grafana/grafana/pkg/services/ngalert/remote/client"
"github.com/grafana/grafana/pkg/services/ngalert/sender"
amalert "github.com/prometheus/alertmanager/api/v2/client/alert"
amalertgroup "github.com/prometheus/alertmanager/api/v2/client/alertgroup"
amreceiver "github.com/prometheus/alertmanager/api/v2/client/receiver"
amsilence "github.com/prometheus/alertmanager/api/v2/client/silence"
alertingClusterPB "github.com/grafana/alerting/cluster/clusterpb"
)
type stateStore interface {
GetFullState(ctx context.Context, keys ...string) (string, error)
GetSilences(ctx context.Context) (string, error)
GetNotificationLog(ctx context.Context) (string, error)
}
// DecryptFn is a function that takes in an encrypted value and returns it decrypted.
@ -223,7 +228,7 @@ func (am *Alertmanager) CompareAndSendConfiguration(ctx context.Context, config
// CompareAndSendState gets the Alertmanager's internal state and compares it with the remote Alertmanager's one.
// If the states are different, it updates the remote Alertmanager's state with that of the internal Alertmanager.
func (am *Alertmanager) CompareAndSendState(ctx context.Context) error {
state, err := am.state.GetFullState(ctx, notifier.SilencesFilename, notifier.NotificationLogFilename)
state, err := am.getFullState(ctx)
if err != nil {
return err
}
@ -399,6 +404,33 @@ func (am *Alertmanager) Ready() bool {
// CleanUp does not have an equivalent in a "remote Alertmanager" context, we don't have files on disk, no-op.
func (am *Alertmanager) CleanUp() {}
// getFullState returns a base64-encoded protobuf message representing the Alertmanager's internal state.
func (am *Alertmanager) getFullState(ctx context.Context) (string, error) {
var parts []alertingClusterPB.Part
silences, err := am.state.GetSilences(ctx)
if err != nil {
return "", fmt.Errorf("error getting silences: %w", err)
}
parts = append(parts, alertingClusterPB.Part{Key: notifier.SilencesFilename, Data: []byte(silences)})
notificationLog, err := am.state.GetNotificationLog(ctx)
if err != nil {
return "", fmt.Errorf("error getting notification log: %w", err)
}
parts = append(parts, alertingClusterPB.Part{Key: notifier.NotificationLogFilename, Data: []byte(notificationLog)})
fs := alertingClusterPB.FullState{
Parts: parts,
}
b, err := fs.Marshal()
if err != nil {
return "", fmt.Errorf("error marshaling full state: %w", err)
}
return base64.StdEncoding.EncodeToString(b), nil
}
// shouldSendConfig compares the remote Alertmanager configuration with our local one.
// It returns true if the configurations are different.
func (am *Alertmanager) shouldSendConfig(ctx context.Context, rawConfig []byte) bool {

View File

@ -17,6 +17,10 @@ import (
"time"
"github.com/go-openapi/strfmt"
amv2 "github.com/prometheus/alertmanager/api/v2/models"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/infra/db"
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
@ -30,10 +34,6 @@ import (
secretsManager "github.com/grafana/grafana/pkg/services/secrets/manager"
"github.com/grafana/grafana/pkg/tests/testsuite"
"github.com/grafana/grafana/pkg/util"
amv2 "github.com/prometheus/alertmanager/api/v2/models"
"github.com/prometheus/alertmanager/cluster/clusterpb"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)
// Valid Grafana Alertmanager configurations.
@ -144,7 +144,7 @@ func TestApplyConfig(t *testing.T) {
ctx := context.Background()
store := ngfakes.NewFakeKVStore(t)
fstore := notifier.NewFileStore(1, store, "")
fstore := notifier.NewFileStore(1, store)
require.NoError(t, store.Set(ctx, cfg.OrgID, "alertmanager", notifier.SilencesFilename, "test"))
require.NoError(t, store.Set(ctx, cfg.OrgID, "alertmanager", notifier.NotificationLogFilename, "test"))
@ -196,7 +196,7 @@ func TestCompareAndSendConfiguration(t *testing.T) {
require.NoError(t, err)
}))
fstore := notifier.NewFileStore(1, ngfakes.NewFakeKVStore(t), "")
fstore := notifier.NewFileStore(1, ngfakes.NewFakeKVStore(t))
m := metrics.NewRemoteAlertmanagerMetrics(prometheus.NewRegistry())
cfg := AlertmanagerConfig{
OrgID: 1,
@ -296,27 +296,20 @@ func TestIntegrationRemoteAlertmanagerApplyConfigOnlyUploadsOnce(t *testing.T) {
silences := []byte("test-silences")
nflog := []byte("test-notifications")
store := ngfakes.NewFakeKVStore(t)
fstore := notifier.NewFileStore(cfg.OrgID, store, "")
fstore := notifier.NewFileStore(cfg.OrgID, store)
ctx := context.Background()
require.NoError(t, store.Set(ctx, cfg.OrgID, "alertmanager", notifier.SilencesFilename, base64.StdEncoding.EncodeToString(silences)))
require.NoError(t, store.Set(ctx, cfg.OrgID, "alertmanager", notifier.NotificationLogFilename, base64.StdEncoding.EncodeToString(nflog)))
fs := clusterpb.FullState{
Parts: []clusterpb.Part{
{Key: "silences", Data: silences},
{Key: "notifications", Data: nflog},
},
}
fullState, err := fs.Marshal()
require.NoError(t, err)
encodedFullState := base64.StdEncoding.EncodeToString(fullState)
secretsService := secretsManager.SetupTestService(t, fakes.NewFakeSecretsStore())
m := metrics.NewRemoteAlertmanagerMetrics(prometheus.NewRegistry())
am, err := NewAlertmanager(cfg, fstore, secretsService.Decrypt, m)
require.NoError(t, err)
encodedFullState, err := am.getFullState(ctx)
require.NoError(t, err)
// We should have no configuration or state at first.
{
_, err := am.mimirClient.GetGrafanaAlertmanagerConfig(ctx)

View File

@ -15,6 +15,7 @@ import (
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/infra/appcontext"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/kvstore"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
acmock "github.com/grafana/grafana/pkg/services/accesscontrol/mock"
@ -65,7 +66,7 @@ func SetupTestEnv(tb testing.TB, baseInterval time.Duration) (*ngalert.AlertNG,
ruleStore, err := store.ProvideDBStore(cfg, featuremgmt.WithFeatures(), sqlStore, folderService, &dashboards.FakeDashboardService{}, ac)
require.NoError(tb, err)
ng, err := ngalert.ProvideService(
cfg, features, nil, nil, routing.NewRouteRegister(), sqlStore, nil, nil, nil, quotatest.New(false, nil),
cfg, features, nil, nil, routing.NewRouteRegister(), sqlStore, kvstore.NewFakeKVStore(), nil, nil, quotatest.New(false, nil),
secretsService, nil, m, folderService, ac, &dashboards.FakeDashboardService{}, nil, bus, ac,
annotationstest.NewFakeAnnotationsRepo(), &pluginstore.FakePluginStore{}, tracer, ruleStore,
)

View File

@ -29,6 +29,7 @@ import (
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
ngalertmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
ngstore "github.com/grafana/grafana/pkg/services/ngalert/store"
ngalertfakes "github.com/grafana/grafana/pkg/services/ngalert/tests/fakes"
"github.com/grafana/grafana/pkg/services/org"
"github.com/grafana/grafana/pkg/services/org/orgimpl"
"github.com/grafana/grafana/pkg/services/pluginsintegration/pluginstore"
@ -485,7 +486,7 @@ func setupEnv(t *testing.T, sqlStore *sqlstore.SQLStore, b bus.Bus, quotaService
ruleStore, err := ngstore.ProvideDBStore(sqlStore.Cfg, featuremgmt.WithFeatures(), sqlStore, &foldertest.FakeService{}, &dashboards.FakeDashboardService{}, ac)
require.NoError(t, err)
_, err = ngalert.ProvideService(
sqlStore.Cfg, featuremgmt.WithFeatures(), nil, nil, routing.NewRouteRegister(), sqlStore, nil, nil, nil, quotaService,
sqlStore.Cfg, featuremgmt.WithFeatures(), nil, nil, routing.NewRouteRegister(), sqlStore, ngalertfakes.NewFakeKVStore(t), nil, nil, quotaService,
secretsService, nil, m, &foldertest.FakeService{}, &acmock.Mock{}, &dashboards.FakeDashboardService{}, nil, b, &acmock.Mock{},
annotationstest.NewFakeAnnotationsRepo(), &pluginstore.FakePluginStore{}, tracer, ruleStore,
)