Alerting: delete orphaned records from kvstore (#40337)

This commit is contained in:
Jean-Philippe Quéméner
2021-10-14 12:04:00 +02:00
committed by GitHub
parent 1b7ca0413a
commit 153c356993
7 changed files with 172 additions and 3 deletions

View File

@@ -7,6 +7,11 @@ import (
"github.com/grafana/grafana/pkg/services/sqlstore"
)
const (
// Wildcard to query all organizations
AllOrganizations = -1
)
func ProvideService(sqlStore *sqlstore.SQLStore) KVStore {
return &kvStoreSQL{
sqlStore: sqlStore,
@@ -19,6 +24,7 @@ type KVStore interface {
Get(ctx context.Context, orgId int64, namespace string, key string) (string, bool, error)
Set(ctx context.Context, orgId int64, namespace string, key string, value string) error
Del(ctx context.Context, orgId int64, namespace string, key string) error
Keys(ctx context.Context, orgId int64, namespace string, keyPrefix string) ([]Key, error)
}
// WithNamespace returns a kvstore wrapper with fixed orgId and namespace.
@@ -48,3 +54,7 @@ func (kv *NamespacedKVStore) Set(ctx context.Context, key string, value string)
func (kv *NamespacedKVStore) Del(ctx context.Context, key string) error {
return kv.kvStore.Del(ctx, kv.orgId, kv.namespace, key)
}
func (kv *NamespacedKVStore) Keys(ctx context.Context, keyPrefix string) ([]Key, error) {
return kv.kvStore.Keys(ctx, kv.orgId, kv.namespace, keyPrefix)
}

View File

@@ -165,4 +165,77 @@ func TestKVStore(t *testing.T) {
require.False(t, ok, "all keys should be deleted at this point")
}
})
t.Run("listing existing keys", func(t *testing.T) {
kv := createTestableKVStore(t)
ctx := context.Background()
namespace, key := "listtest", "listtest"
testCases := []*TestCase{
{
OrgId: 1,
Namespace: namespace,
Key: key + "_1",
},
{
OrgId: 2,
Namespace: namespace,
Key: key + "_2",
},
{
OrgId: 3,
Namespace: namespace,
Key: key + "_3",
},
{
OrgId: 4,
Namespace: namespace,
Key: key + "_4",
},
{
OrgId: 1,
Namespace: namespace,
Key: "other_key",
},
{
OrgId: 4,
Namespace: namespace,
Key: "another_one",
},
}
for _, tc := range testCases {
err := kv.Set(ctx, tc.OrgId, tc.Namespace, tc.Key, tc.Value())
require.NoError(t, err)
}
keys, err := kv.Keys(ctx, AllOrganizations, namespace, key[0:6])
require.NoError(t, err)
require.Len(t, keys, 4)
found := 0
for _, key := range keys {
for _, tc := range testCases {
if key.Key == tc.Key {
found++
break
}
}
}
require.Equal(t, 4, found, "querying with the wildcard should return 4 records")
keys, err = kv.Keys(ctx, 1, namespace, key[0:6])
require.NoError(t, err)
require.Len(t, keys, 1, "querying for a specific org should return 1 record")
keys, err = kv.Keys(ctx, AllOrganizations, "not_existing_namespace", "not_existing_key")
require.NoError(t, err, "querying a not existing namespace and key should not throw an error")
require.Len(t, keys, 0, "querying a not existing namespace and key should return an empty slice")
})
}

View File

@@ -19,3 +19,13 @@ type Item struct {
func (i *Item) TableName() string {
return "kv_store"
}
type Key struct {
OrgId int64
Namespace string
Key string
}
func (i *Key) TableName() string {
return "kv_store"
}

View File

@@ -93,3 +93,17 @@ func (kv *kvStoreSQL) Del(ctx context.Context, orgId int64, namespace string, ke
})
return err
}
// Keys get all keys for a given namespace and keyPrefix. To query for all
// organizations the constant 'kvstore.AllOrganizations' can be passed as orgId.
func (kv *kvStoreSQL) Keys(ctx context.Context, orgId int64, namespace string, keyPrefix string) ([]Key, error) {
var keys []Key
err := kv.sqlStore.WithDbSession(ctx, func(dbSession *sqlstore.DBSession) error {
query := dbSession.Where("namespace = ?", namespace).And("key LIKE ?", keyPrefix+"%")
if orgId != AllOrganizations {
query.And("org_id = ?", orgId)
}
return query.Find(&keys)
})
return keys, err
}

View File

@@ -224,13 +224,14 @@ func (moa *MultiOrgAlertmanager) SyncAlertmanagersForOrgs(ctx context.Context, o
// We look for orphan directories and remove them. Orphan directories can
// occur when an organization is deleted and the node running Grafana is
// shutdown before the next sync is executed.
moa.cleanupOrphanLocalOrgState(orgsFound)
moa.cleanupOrphanLocalOrgState(ctx, orgsFound)
}
// cleanupOrphanLocalOrgState will check if there is any organization on
// disk that is not part of the active organizations. If this is the case
// it will delete the local state from disk.
func (moa *MultiOrgAlertmanager) cleanupOrphanLocalOrgState(activeOrganizations map[int64]struct{}) {
func (moa *MultiOrgAlertmanager) cleanupOrphanLocalOrgState(ctx context.Context,
activeOrganizations map[int64]struct{}) {
dataDir := filepath.Join(moa.settings.DataPath, workingDir)
files, err := ioutil.ReadDir(dataDir)
if err != nil {
@@ -256,6 +257,27 @@ func (moa *MultiOrgAlertmanager) cleanupOrphanLocalOrgState(activeOrganizations
fileStore.CleanUp()
}
}
// Remove all orphaned items from kvstore by listing all existing items
// in our used namespace and comparing them to the currently active
// organizations.
storedFiles := []string{notificationLogFilename, silencesFilename}
for _, fileName := range storedFiles {
keys, err := moa.kvStore.Keys(ctx, kvstore.AllOrganizations, KVNamespace, fileName)
if err != nil {
moa.logger.Error("failed to fetch items from kvstore", "err", err,
"namespace", KVNamespace, "key", fileName)
}
for _, key := range keys {
if _, exists := activeOrganizations[key.OrgId]; exists {
continue
}
err = moa.kvStore.Del(ctx, key.OrgId, key.Namespace, key.Key)
if err != nil {
moa.logger.Error("failed to delete item from kvstore", "err", err,
"orgID", key.OrgId, "namespace", KVNamespace, "key", key.Key)
}
}
}
}
func (moa *MultiOrgAlertmanager) StopAndWait() {

View File

@@ -97,8 +97,9 @@ grafana_alerting_discovered_configurations 4
require.Len(t, mam.alertmanagers, 4)
}
// Orphaned local state should be removed.
// Orphaned state should be removed.
{
orgID := int64(6)
// First we create a directory and two files for an ograniztation that
// is not existing in the current state.
orphanDir := filepath.Join(tmpDir, "alerting", "6")
@@ -121,12 +122,26 @@ grafana_alerting_discovered_configurations 4
require.NoError(t, err)
require.Equal(t, info.Name(), notificationLogFilename)
// We also populate the kvstore with orphaned records.
err = kvStore.Set(ctx, orgID, KVNamespace, silencesFilename, "file_1")
require.NoError(t, err)
err = kvStore.Set(ctx, orgID, KVNamespace, notificationLogFilename, "file_1")
require.NoError(t, err)
// Now re run the sync job once.
require.NoError(t, mam.LoadAndSyncAlertmanagersForOrgs(ctx))
// The organization directory should be gone by now.
_, err = os.Stat(orphanDir)
require.True(t, errors.Is(err, fs.ErrNotExist))
// The organization kvstore records should be gone by now.
_, exists, _ := kvStore.Get(ctx, orgID, KVNamespace, silencesFilename)
require.False(t, exists)
_, exists, _ = kvStore.Get(ctx, orgID, KVNamespace, notificationLogFilename)
require.False(t, exists)
}
}

View File

@@ -2,9 +2,11 @@ package notifier
import (
"context"
"strings"
"sync"
"testing"
"github.com/grafana/grafana/pkg/infra/kvstore"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/store"
)
@@ -130,6 +132,29 @@ func (fkv *FakeKVStore) Del(_ context.Context, orgId int64, namespace string, ke
return nil
}
func (fkv *FakeKVStore) Keys(ctx context.Context, orgID int64, namespace string, keyPrefix string) ([]kvstore.Key, error) {
fkv.mtx.Lock()
defer fkv.mtx.Unlock()
var keys []kvstore.Key
for orgIDFromStore, namespaceMap := range fkv.store {
if orgID != kvstore.AllOrganizations && orgID != orgIDFromStore {
continue
}
if keyMap, exists := namespaceMap[namespace]; exists {
for k := range keyMap {
if strings.HasPrefix(k, keyPrefix) {
keys = append(keys, kvstore.Key{
OrgId: orgIDFromStore,
Namespace: namespace,
Key: keyPrefix,
})
}
}
}
}
return keys, nil
}
type fakeState struct {
data string
}