From ea1334c01d872ce37c1af6223916655baedb4c2b Mon Sep 17 00:00:00 2001 From: Artur Wierzbicki Date: Fri, 30 Sep 2022 21:34:44 +0400 Subject: [PATCH] Chore: Persistent collection (#56074) * persistent collection * dont remove temp dir * temp dir change * lint * add experimental comment * move to X package * lint * orgID -> namespace --- pkg/infra/x/persistentcollection/local_fs.go | 212 ++++++++++++++++++ .../x/persistentcollection/local_fs_test.go | 89 ++++++++ pkg/infra/x/persistentcollection/model.go | 21 ++ 3 files changed, 322 insertions(+) create mode 100644 pkg/infra/x/persistentcollection/local_fs.go create mode 100644 pkg/infra/x/persistentcollection/local_fs_test.go create mode 100644 pkg/infra/x/persistentcollection/model.go diff --git a/pkg/infra/x/persistentcollection/local_fs.go b/pkg/infra/x/persistentcollection/local_fs.go new file mode 100644 index 00000000000..912fcf9ce64 --- /dev/null +++ b/pkg/infra/x/persistentcollection/local_fs.go @@ -0,0 +1,212 @@ +package persistentcollection + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" +) + +func NewLocalFSPersistentCollection[T any](name string, directory string, version int) PersistentCollection[T] { + c := &localFsCollection[T]{ + name: name, + version: version, + collectionsDir: filepath.Join(directory, "file-collections"), + } + err := c.createCollectionsDirectory() + if err != nil { + panic(err) + } + return c +} + +type CollectionFileContents[T any] struct { + Version int `json:"version"` + Items []T `json:"items"` +} + +type localFsCollection[T any] struct { + version int + name string + collectionsDir string + mu sync.Mutex +} + +func (s *localFsCollection[T]) Insert(ctx context.Context, namespace string, item T) error { + s.mu.Lock() + defer s.mu.Unlock() + + items, err := s.load(ctx, namespace) + if err != nil { + return err + } + + return s.save(ctx, namespace, append(items, item)) +} + +func (s *localFsCollection[T]) Delete(ctx context.Context, namespace string, predicate Predicate[T]) (int, error) { + s.mu.Lock() + defer s.mu.Unlock() + + items, err := s.load(ctx, namespace) + if err != nil { + return 0, err + } + + deletedCount := 0 + newItems := make([]T, 0) + for idx := range items { + del, err := predicate(items[idx]) + if err != nil { + return deletedCount, err + } + + if del { + deletedCount += 1 + } else { + newItems = append(newItems, items[idx]) + } + } + + if deletedCount != 0 { + return deletedCount, s.save(ctx, namespace, newItems) + } + + return deletedCount, nil +} + +func (s *localFsCollection[T]) FindFirst(ctx context.Context, namespace string, predicate Predicate[T]) (T, error) { + var nilResult T + + s.mu.Lock() + defer s.mu.Unlock() + + items, err := s.load(ctx, namespace) + if err != nil { + return nilResult, err + } + + for idx := range items { + match, err := predicate(items[idx]) + if err != nil { + return nilResult, err + } + if match { + return items[idx], nil + } + } + + return nilResult, nil +} + +func (s *localFsCollection[T]) Find(ctx context.Context, namespace string, predicate Predicate[T]) ([]T, error) { + s.mu.Lock() + defer s.mu.Unlock() + + items, err := s.load(ctx, namespace) + if err != nil { + return nil, err + } + + result := make([]T, 0) + for idx := range items { + match, err := predicate(items[idx]) + if err != nil { + return nil, err + } + + if match { + result = append(result, items[idx]) + } + } + + return result, nil +} + +func (s *localFsCollection[T]) Update(ctx context.Context, namespace string, updateFn UpdateFn[T]) (int, error) { + s.mu.Lock() + defer s.mu.Unlock() + + items, err := s.load(ctx, namespace) + if err != nil { + return 0, err + } + + newItems := make([]T, 0) + updatedCount := 0 + for idx := range items { + updated, updatedItem, err := updateFn(items[idx]) + if err != nil { + return updatedCount, err + } + + if updated { + updatedCount += 1 + newItems = append(newItems, updatedItem) + } else { + newItems = append(newItems, items[idx]) + } + } + + if updatedCount != 0 { + return updatedCount, s.save(ctx, namespace, newItems) + } + + return updatedCount, nil +} + +func (s *localFsCollection[T]) load(ctx context.Context, namespace string) ([]T, error) { + filePath := s.collectionFilePath(namespace) + // Safe to ignore gosec warning G304, the path comes from grafana settings rather than the user input + // nolint:gosec + bytes, err := os.ReadFile(filePath) + if err != nil { + if os.IsNotExist(err) { + return []T{}, nil + } + return nil, fmt.Errorf("can't read %s file: %w", filePath, err) + } + var db CollectionFileContents[T] + if err = json.Unmarshal(bytes, &db); err != nil { + return nil, fmt.Errorf("can't unmarshal %s data: %w", filePath, err) + } + + if db.Version != s.version { + if err := s.save(ctx, namespace, []T{}); err != nil { + return nil, err + } + + return []T{}, nil + } + + return db.Items, nil +} + +func (s *localFsCollection[T]) save(_ context.Context, namespace string, items []T) error { + filePath := s.collectionFilePath(namespace) + + bytes, err := json.MarshalIndent(&CollectionFileContents[T]{ + Version: s.version, + Items: items, + }, "", " ") + if err != nil { + return fmt.Errorf("can't marshal items: %w", err) + } + + return os.WriteFile(filePath, bytes, 0600) +} + +func (s *localFsCollection[T]) createCollectionsDirectory() error { + _, err := os.Stat(s.collectionsDir) + if os.IsNotExist(err) { + return os.MkdirAll(s.collectionsDir, 0750) + } + + return err +} + +func (s *localFsCollection[T]) collectionFilePath(namespace string) string { + return filepath.Join(s.collectionsDir, fmt.Sprintf("%s-namespace-%s.json", s.name, namespace)) +} diff --git a/pkg/infra/x/persistentcollection/local_fs_test.go b/pkg/infra/x/persistentcollection/local_fs_test.go new file mode 100644 index 00000000000..ac1908d4658 --- /dev/null +++ b/pkg/infra/x/persistentcollection/local_fs_test.go @@ -0,0 +1,89 @@ +package persistentcollection + +import ( + "context" + "fmt" + "os" + "path" + "testing" + + "github.com/stretchr/testify/require" +) + +type item struct { + Name string `json:"name"` + Val int64 `json:"val"` +} + +func TestLocalFSPersistentCollection(t *testing.T) { + namespace := "1" + ctx := context.Background() + dir := path.Join(os.TempDir(), "persistent-collection-test") + defer func() { + if err := os.RemoveAll(dir); err != nil { + fmt.Printf("Failed to remove temporary directory %q: %s\n", dir, err.Error()) + } + }() + + coll := NewLocalFSPersistentCollection[*item]("test", dir, 1) + + firstInserted := &item{ + Name: "test", + Val: 10, + } + err := coll.Insert(ctx, namespace, firstInserted) + require.NoError(t, err) + + err = coll.Insert(ctx, namespace, &item{ + Name: "test", + Val: 20, + }) + require.NoError(t, err) + + err = coll.Insert(ctx, namespace, &item{ + Name: "test", + Val: 30, + }) + require.NoError(t, err) + + updatedCount, err := coll.Update(ctx, namespace, func(i *item) (bool, *item, error) { + if i.Val == 20 { + return true, &item{Val: 25, Name: "test"}, nil + } + return false, nil, nil + }) + require.Equal(t, 1, updatedCount) + require.NoError(t, err) + + deletedCount, err := coll.Delete(ctx, namespace, func(i *item) (bool, error) { + if i.Val == 30 { + return true, nil + } + return false, nil + }) + require.Equal(t, 1, deletedCount) + require.NoError(t, err) + + firstFound, err := coll.FindFirst(ctx, namespace, func(i *item) (bool, error) { + if i.Name == "test" { + return true, nil + } + + return false, nil + }) + require.NoError(t, err) + require.Equal(t, firstInserted, firstFound) + + all, err := coll.Find(ctx, namespace, func(i *item) (bool, error) { return true, nil }) + require.NoError(t, err) + require.Equal(t, []*item{ + { + Name: "test", + Val: 10, + }, + { + Name: "test", + Val: 25, + }, + }, all) +} diff --git a/pkg/infra/x/persistentcollection/model.go b/pkg/infra/x/persistentcollection/model.go new file mode 100644 index 00000000000..36e99818a62 --- /dev/null +++ b/pkg/infra/x/persistentcollection/model.go @@ -0,0 +1,21 @@ +package persistentcollection + +import ( + "context" +) + +type Predicate[T any] func(item T) (bool, error) +type UpdateFn[T any] func(item T) (updated bool, updatedItem T, err error) + +// PersistentCollection is a collection of items that's going to retain its state between Grafana restarts. +// The main purpose of this API is to reduce the time-to-Proof-of-Concept - this is NOT intended for production use. +// +// The item type needs to be serializable to JSON. +// @alpha -- EXPERIMENTAL +type PersistentCollection[T any] interface { + Delete(ctx context.Context, namespace string, predicate Predicate[T]) (deletedCount int, err error) + FindFirst(ctx context.Context, namespace string, predicate Predicate[T]) (T, error) + Find(ctx context.Context, namespace string, predicate Predicate[T]) ([]T, error) + Update(ctx context.Context, namespace string, updateFn UpdateFn[T]) (updatedCount int, err error) + Insert(ctx context.Context, namespace string, item T) error +}