mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Alerting: cleanup alert resources on org removal (#39938)
This commit is contained in:
parent
6081790992
commit
e1dfec49f9
@ -8,6 +8,7 @@ import (
|
||||
"path/filepath"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/kvstore"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
)
|
||||
|
||||
const KVNamespace = "alertmanager"
|
||||
@ -24,6 +25,7 @@ type FileStore struct {
|
||||
kv *kvstore.NamespacedKVStore
|
||||
orgID int64
|
||||
workingDirPath string
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
func NewFileStore(orgID int64, store kvstore.KVStore, workingDirPath string) *FileStore {
|
||||
@ -31,6 +33,7 @@ func NewFileStore(orgID int64, store kvstore.KVStore, workingDirPath string) *Fi
|
||||
workingDirPath: workingDirPath,
|
||||
orgID: orgID,
|
||||
kv: kvstore.WithNamespace(store, orgID, KVNamespace),
|
||||
logger: log.New("filestore", "org", orgID),
|
||||
}
|
||||
}
|
||||
|
||||
@ -38,22 +41,16 @@ func NewFileStore(orgID int64, store kvstore.KVStore, workingDirPath string) *Fi
|
||||
// If the file is already present on disk it no-ops.
|
||||
// If not, it tries to read the database and if there's no file it no-ops.
|
||||
// If there is a file in the database, it decodes it and writes to disk for Alertmanager consumption.
|
||||
func (fs *FileStore) FilepathFor(ctx context.Context, filename string) (string, error) {
|
||||
// If a file is already present, we'll use that one and eventually save it to the database.
|
||||
// We don't need to do anything else.
|
||||
if fs.IsExists(filename) {
|
||||
return fs.pathFor(filename), nil
|
||||
}
|
||||
|
||||
func (fileStore *FileStore) FilepathFor(ctx context.Context, filename string) (string, error) {
|
||||
// Then, let's attempt to read it from the database.
|
||||
content, exists, err := fs.kv.Get(ctx, filename)
|
||||
content, exists, err := fileStore.kv.Get(ctx, filename)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("error reading file '%s' from database: %w", filename, err)
|
||||
}
|
||||
|
||||
// if it doesn't exist, let's no-op and let the Alertmanager create one. We'll eventually save it to the database.
|
||||
if !exists {
|
||||
return fs.pathFor(filename), nil
|
||||
return fileStore.pathFor(filename), nil
|
||||
}
|
||||
|
||||
// If we have a file stored in the database, let's decode it and write it to disk to perform that initial load to memory.
|
||||
@ -62,15 +59,15 @@ func (fs *FileStore) FilepathFor(ctx context.Context, filename string) (string,
|
||||
return "", fmt.Errorf("error decoding file '%s': %w", filename, err)
|
||||
}
|
||||
|
||||
if err := fs.WriteFileToDisk(filename, bytes); err != nil {
|
||||
if err := fileStore.WriteFileToDisk(filename, bytes); err != nil {
|
||||
return "", fmt.Errorf("error writing file %s: %w", filename, err)
|
||||
}
|
||||
|
||||
return fs.pathFor(filename), err
|
||||
return fileStore.pathFor(filename), err
|
||||
}
|
||||
|
||||
// Persist takes care of persisting the binary representation of internal state to the database as a base64 encoded string.
|
||||
func (fs *FileStore) Persist(ctx context.Context, filename string, st State) (int64, error) {
|
||||
func (fileStore *FileStore) Persist(ctx context.Context, filename string, st State) (int64, error) {
|
||||
var size int64
|
||||
|
||||
bytes, err := st.MarshalBinary()
|
||||
@ -78,32 +75,36 @@ func (fs *FileStore) Persist(ctx context.Context, filename string, st State) (in
|
||||
return size, err
|
||||
}
|
||||
|
||||
if err = fs.kv.Set(ctx, filename, encode(bytes)); err != nil {
|
||||
if err = fileStore.kv.Set(ctx, filename, encode(bytes)); err != nil {
|
||||
return size, err
|
||||
}
|
||||
|
||||
return int64(len(bytes)), err
|
||||
}
|
||||
|
||||
// IsExists verifies if the file exists or not.
|
||||
func (fs *FileStore) IsExists(fn string) bool {
|
||||
_, err := os.Stat(fs.pathFor(fn))
|
||||
return os.IsExist(err)
|
||||
}
|
||||
|
||||
// WriteFileToDisk writes a file with the provided name and contents to the Alertmanager working directory with the default grafana permission.
|
||||
func (fs *FileStore) WriteFileToDisk(fn string, content []byte) error {
|
||||
func (fileStore *FileStore) WriteFileToDisk(fn string, content []byte) error {
|
||||
// Ensure the working directory is created
|
||||
err := os.MkdirAll(fs.workingDirPath, 0750)
|
||||
err := os.MkdirAll(fileStore.workingDirPath, 0750)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to create the working directory %q: %s", fs.workingDirPath, err)
|
||||
return fmt.Errorf("unable to create the working directory %q: %s", fileStore.workingDirPath, err)
|
||||
}
|
||||
|
||||
return os.WriteFile(fs.pathFor(fn), content, 0644)
|
||||
return os.WriteFile(fileStore.pathFor(fn), content, 0644)
|
||||
}
|
||||
|
||||
func (fs *FileStore) pathFor(fn string) string {
|
||||
return filepath.Join(fs.workingDirPath, fn)
|
||||
// CleanUp will remove the working directory from disk.
|
||||
func (fileStore *FileStore) CleanUp() {
|
||||
if err := os.RemoveAll(fileStore.workingDirPath); err != nil {
|
||||
fileStore.logger.Warn("unable to delete the local working directory", "dir", fileStore.workingDirPath,
|
||||
"err", err)
|
||||
return
|
||||
}
|
||||
fileStore.logger.Info("successfully deleted working directory", "dir", fileStore.workingDirPath)
|
||||
}
|
||||
|
||||
func (fileStore *FileStore) pathFor(fn string) string {
|
||||
return filepath.Join(fileStore.workingDirPath, fn)
|
||||
}
|
||||
|
||||
func decode(s string) ([]byte, error) {
|
||||
|
@ -3,6 +3,9 @@ package notifier
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -213,6 +216,44 @@ func (moa *MultiOrgAlertmanager) SyncAlertmanagersForOrgs(ctx context.Context, o
|
||||
moa.logger.Info("stopping Alertmanager", "org", orgID)
|
||||
am.StopAndWait()
|
||||
moa.logger.Info("stopped Alertmanager", "org", orgID)
|
||||
// Cleanup all the remaining resources from this alertmanager.
|
||||
am.fileStore.CleanUp()
|
||||
}
|
||||
|
||||
// We look for orphan directories and remove them. Orphan directories can
|
||||
// occur when an organization is deleted and the node running Grafana is
|
||||
// shutdown before the next sync is executed.
|
||||
moa.cleanupOrphanLocalOrgState(orgsFound)
|
||||
}
|
||||
|
||||
// cleanupOrphanLocalOrgState will check if there is any organization on
|
||||
// disk that is not part of the active organizations. If this is the case
|
||||
// it will delete the local state from disk.
|
||||
func (moa *MultiOrgAlertmanager) cleanupOrphanLocalOrgState(activeOrganizations map[int64]struct{}) {
|
||||
dataDir := filepath.Join(moa.settings.DataPath, workingDir)
|
||||
files, err := ioutil.ReadDir(dataDir)
|
||||
if err != nil {
|
||||
moa.logger.Error("failed to list local working directory", "dir", dataDir, "err", err)
|
||||
return
|
||||
}
|
||||
for _, file := range files {
|
||||
if !file.IsDir() {
|
||||
moa.logger.Warn("ignoring unexpected file while scanning local working directory", "filename", filepath.Join(dataDir, file.Name()))
|
||||
continue
|
||||
}
|
||||
orgID, err := strconv.ParseInt(file.Name(), 10, 64)
|
||||
if err != nil {
|
||||
moa.logger.Error("unable to parse orgID from directory name", "name", file.Name(), "err", err)
|
||||
continue
|
||||
}
|
||||
_, exists := activeOrganizations[orgID]
|
||||
if !exists {
|
||||
moa.logger.Info("found orphan organization directory", "orgID", orgID)
|
||||
workingDirPath := filepath.Join(dataDir, strconv.FormatInt(orgID, 10))
|
||||
fileStore := NewFileStore(orgID, moa.kvStore, workingDirPath)
|
||||
// Cleanup all the remaining resources from this alertmanager.
|
||||
fileStore.CleanUp()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3,8 +3,11 @@ package notifier
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"io/fs"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -93,6 +96,38 @@ grafana_alerting_discovered_configurations 4
|
||||
require.NoError(t, mam.LoadAndSyncAlertmanagersForOrgs(ctx))
|
||||
require.Len(t, mam.alertmanagers, 4)
|
||||
}
|
||||
|
||||
// Orphaned local state should be removed.
|
||||
{
|
||||
// First we create a directory and two files for an ograniztation that
|
||||
// is not existing in the current state.
|
||||
orphanDir := filepath.Join(tmpDir, "alerting", "6")
|
||||
err := os.Mkdir(orphanDir, 0750)
|
||||
require.NoError(t, err)
|
||||
|
||||
silencesPath := filepath.Join(orphanDir, silencesFilename)
|
||||
err = os.WriteFile(silencesPath, []byte("file_1"), 0644)
|
||||
require.NoError(t, err)
|
||||
|
||||
notificationPath := filepath.Join(orphanDir, notificationLogFilename)
|
||||
err = os.WriteFile(notificationPath, []byte("file_2"), 0644)
|
||||
require.NoError(t, err)
|
||||
|
||||
// We make sure that both files are on disk.
|
||||
info, err := os.Stat(silencesPath)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, info.Name(), silencesFilename)
|
||||
info, err = os.Stat(notificationPath)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, info.Name(), notificationLogFilename)
|
||||
|
||||
// Now re run the sync job once.
|
||||
require.NoError(t, mam.LoadAndSyncAlertmanagersForOrgs(ctx))
|
||||
|
||||
// The organization directory should be gone by now.
|
||||
_, err = os.Stat(orphanDir)
|
||||
require.True(t, errors.Is(err, fs.ErrNotExist))
|
||||
}
|
||||
}
|
||||
|
||||
func TestMultiOrgAlertmanager_AlertmanagerFor(t *testing.T) {
|
||||
|
@ -241,6 +241,17 @@ func DeleteOrg(cmd *models.DeleteOrgCommand) error {
|
||||
"DELETE FROM org_user WHERE org_id = ?",
|
||||
"DELETE FROM org WHERE id = ?",
|
||||
"DELETE FROM temp_user WHERE org_id = ?",
|
||||
"DELETE FROM ngalert_configuration WHERE org_id = ?",
|
||||
"DELETE FROM alert_configuration WHERE org_id = ?",
|
||||
"DELETE FROM alert_instance WHERE rule_org_id = ?",
|
||||
"DELETE FROM alert_notification WHERE org_id = ?",
|
||||
"DELETE FROM alert_notification_state WHERE org_id = ?",
|
||||
"DELETE FROM alert_rule WHERE org_id = ?",
|
||||
"DELETE FROM alert_rule_tag WHERE EXISTS (SELECT 1 FROM alert WHERE alert.org_id = ? AND alert.id = alert_rule_tag.alert_id)",
|
||||
"DELETE FROM alert_rule_version WHERE rule_org_id = ?",
|
||||
"DELETE FROM alert WHERE org_id = ?",
|
||||
"DELETE FROM annotation WHERE org_id = ?",
|
||||
"DELETE FROM kv_store WHERE org_id = ?",
|
||||
}
|
||||
|
||||
for _, sql := range deletes {
|
||||
|
Loading…
Reference in New Issue
Block a user