Chore: Persistent collection (#56074)

* persistent collection

* dont remove temp dir

* temp dir change

* lint

* add experimental comment

* move to X package

* lint

* orgID -> namespace
This commit is contained in:
Artur Wierzbicki 2022-09-30 21:34:44 +04:00 committed by GitHub
parent 2eb24bbc4e
commit ea1334c01d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 322 additions and 0 deletions

View File

@ -0,0 +1,212 @@
package persistentcollection
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
)
func NewLocalFSPersistentCollection[T any](name string, directory string, version int) PersistentCollection[T] {
c := &localFsCollection[T]{
name: name,
version: version,
collectionsDir: filepath.Join(directory, "file-collections"),
}
err := c.createCollectionsDirectory()
if err != nil {
panic(err)
}
return c
}
type CollectionFileContents[T any] struct {
Version int `json:"version"`
Items []T `json:"items"`
}
type localFsCollection[T any] struct {
version int
name string
collectionsDir string
mu sync.Mutex
}
func (s *localFsCollection[T]) Insert(ctx context.Context, namespace string, item T) error {
s.mu.Lock()
defer s.mu.Unlock()
items, err := s.load(ctx, namespace)
if err != nil {
return err
}
return s.save(ctx, namespace, append(items, item))
}
func (s *localFsCollection[T]) Delete(ctx context.Context, namespace string, predicate Predicate[T]) (int, error) {
s.mu.Lock()
defer s.mu.Unlock()
items, err := s.load(ctx, namespace)
if err != nil {
return 0, err
}
deletedCount := 0
newItems := make([]T, 0)
for idx := range items {
del, err := predicate(items[idx])
if err != nil {
return deletedCount, err
}
if del {
deletedCount += 1
} else {
newItems = append(newItems, items[idx])
}
}
if deletedCount != 0 {
return deletedCount, s.save(ctx, namespace, newItems)
}
return deletedCount, nil
}
func (s *localFsCollection[T]) FindFirst(ctx context.Context, namespace string, predicate Predicate[T]) (T, error) {
var nilResult T
s.mu.Lock()
defer s.mu.Unlock()
items, err := s.load(ctx, namespace)
if err != nil {
return nilResult, err
}
for idx := range items {
match, err := predicate(items[idx])
if err != nil {
return nilResult, err
}
if match {
return items[idx], nil
}
}
return nilResult, nil
}
func (s *localFsCollection[T]) Find(ctx context.Context, namespace string, predicate Predicate[T]) ([]T, error) {
s.mu.Lock()
defer s.mu.Unlock()
items, err := s.load(ctx, namespace)
if err != nil {
return nil, err
}
result := make([]T, 0)
for idx := range items {
match, err := predicate(items[idx])
if err != nil {
return nil, err
}
if match {
result = append(result, items[idx])
}
}
return result, nil
}
func (s *localFsCollection[T]) Update(ctx context.Context, namespace string, updateFn UpdateFn[T]) (int, error) {
s.mu.Lock()
defer s.mu.Unlock()
items, err := s.load(ctx, namespace)
if err != nil {
return 0, err
}
newItems := make([]T, 0)
updatedCount := 0
for idx := range items {
updated, updatedItem, err := updateFn(items[idx])
if err != nil {
return updatedCount, err
}
if updated {
updatedCount += 1
newItems = append(newItems, updatedItem)
} else {
newItems = append(newItems, items[idx])
}
}
if updatedCount != 0 {
return updatedCount, s.save(ctx, namespace, newItems)
}
return updatedCount, nil
}
func (s *localFsCollection[T]) load(ctx context.Context, namespace string) ([]T, error) {
filePath := s.collectionFilePath(namespace)
// Safe to ignore gosec warning G304, the path comes from grafana settings rather than the user input
// nolint:gosec
bytes, err := os.ReadFile(filePath)
if err != nil {
if os.IsNotExist(err) {
return []T{}, nil
}
return nil, fmt.Errorf("can't read %s file: %w", filePath, err)
}
var db CollectionFileContents[T]
if err = json.Unmarshal(bytes, &db); err != nil {
return nil, fmt.Errorf("can't unmarshal %s data: %w", filePath, err)
}
if db.Version != s.version {
if err := s.save(ctx, namespace, []T{}); err != nil {
return nil, err
}
return []T{}, nil
}
return db.Items, nil
}
func (s *localFsCollection[T]) save(_ context.Context, namespace string, items []T) error {
filePath := s.collectionFilePath(namespace)
bytes, err := json.MarshalIndent(&CollectionFileContents[T]{
Version: s.version,
Items: items,
}, "", " ")
if err != nil {
return fmt.Errorf("can't marshal items: %w", err)
}
return os.WriteFile(filePath, bytes, 0600)
}
func (s *localFsCollection[T]) createCollectionsDirectory() error {
_, err := os.Stat(s.collectionsDir)
if os.IsNotExist(err) {
return os.MkdirAll(s.collectionsDir, 0750)
}
return err
}
func (s *localFsCollection[T]) collectionFilePath(namespace string) string {
return filepath.Join(s.collectionsDir, fmt.Sprintf("%s-namespace-%s.json", s.name, namespace))
}

View File

@ -0,0 +1,89 @@
package persistentcollection
import (
"context"
"fmt"
"os"
"path"
"testing"
"github.com/stretchr/testify/require"
)
type item struct {
Name string `json:"name"`
Val int64 `json:"val"`
}
func TestLocalFSPersistentCollection(t *testing.T) {
namespace := "1"
ctx := context.Background()
dir := path.Join(os.TempDir(), "persistent-collection-test")
defer func() {
if err := os.RemoveAll(dir); err != nil {
fmt.Printf("Failed to remove temporary directory %q: %s\n", dir, err.Error())
}
}()
coll := NewLocalFSPersistentCollection[*item]("test", dir, 1)
firstInserted := &item{
Name: "test",
Val: 10,
}
err := coll.Insert(ctx, namespace, firstInserted)
require.NoError(t, err)
err = coll.Insert(ctx, namespace, &item{
Name: "test",
Val: 20,
})
require.NoError(t, err)
err = coll.Insert(ctx, namespace, &item{
Name: "test",
Val: 30,
})
require.NoError(t, err)
updatedCount, err := coll.Update(ctx, namespace, func(i *item) (bool, *item, error) {
if i.Val == 20 {
return true, &item{Val: 25, Name: "test"}, nil
}
return false, nil, nil
})
require.Equal(t, 1, updatedCount)
require.NoError(t, err)
deletedCount, err := coll.Delete(ctx, namespace, func(i *item) (bool, error) {
if i.Val == 30 {
return true, nil
}
return false, nil
})
require.Equal(t, 1, deletedCount)
require.NoError(t, err)
firstFound, err := coll.FindFirst(ctx, namespace, func(i *item) (bool, error) {
if i.Name == "test" {
return true, nil
}
return false, nil
})
require.NoError(t, err)
require.Equal(t, firstInserted, firstFound)
all, err := coll.Find(ctx, namespace, func(i *item) (bool, error) { return true, nil })
require.NoError(t, err)
require.Equal(t, []*item{
{
Name: "test",
Val: 10,
},
{
Name: "test",
Val: 25,
},
}, all)
}

View File

@ -0,0 +1,21 @@
package persistentcollection
import (
"context"
)
type Predicate[T any] func(item T) (bool, error)
type UpdateFn[T any] func(item T) (updated bool, updatedItem T, err error)
// PersistentCollection is a collection of items that's going to retain its state between Grafana restarts.
// The main purpose of this API is to reduce the time-to-Proof-of-Concept - this is NOT intended for production use.
//
// The item type needs to be serializable to JSON.
// @alpha -- EXPERIMENTAL
type PersistentCollection[T any] interface {
Delete(ctx context.Context, namespace string, predicate Predicate[T]) (deletedCount int, err error)
FindFirst(ctx context.Context, namespace string, predicate Predicate[T]) (T, error)
Find(ctx context.Context, namespace string, predicate Predicate[T]) ([]T, error)
Update(ctx context.Context, namespace string, updateFn UpdateFn[T]) (updatedCount int, err error)
Insert(ctx context.Context, namespace string, item T) error
}