mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Chore: Remove pkg/infra/x/persistentcollection (#89139)
This commit is contained in:
2
.github/CODEOWNERS
vendored
2
.github/CODEOWNERS
vendored
@@ -156,8 +156,6 @@
|
|||||||
|
|
||||||
/pkg/infra/kvstore/ @grafana/grafana-backend-group
|
/pkg/infra/kvstore/ @grafana/grafana-backend-group
|
||||||
/pkg/infra/fs/ @grafana/grafana-backend-group
|
/pkg/infra/fs/ @grafana/grafana-backend-group
|
||||||
/pkg/infra/x/ @grafana/grafana-backend-group
|
|
||||||
|
|
||||||
|
|
||||||
# devenv
|
# devenv
|
||||||
# Backend code, developers environment
|
# Backend code, developers environment
|
||||||
|
|||||||
@@ -1,212 +0,0 @@
|
|||||||
package persistentcollection
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
func NewLocalFSPersistentCollection[T any](name string, directory string, version int) PersistentCollection[T] {
|
|
||||||
c := &localFsCollection[T]{
|
|
||||||
name: name,
|
|
||||||
version: version,
|
|
||||||
collectionsDir: filepath.Join(directory, "file-collections"),
|
|
||||||
}
|
|
||||||
err := c.createCollectionsDirectory()
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
return c
|
|
||||||
}
|
|
||||||
|
|
||||||
type CollectionFileContents[T any] struct {
|
|
||||||
Version int `json:"version"`
|
|
||||||
Items []T `json:"items"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type localFsCollection[T any] struct {
|
|
||||||
version int
|
|
||||||
name string
|
|
||||||
collectionsDir string
|
|
||||||
mu sync.Mutex
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *localFsCollection[T]) Insert(ctx context.Context, namespace string, item T) error {
|
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
|
|
||||||
items, err := s.load(ctx, namespace)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return s.save(ctx, namespace, append(items, item))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *localFsCollection[T]) Delete(ctx context.Context, namespace string, predicate Predicate[T]) (int, error) {
|
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
|
|
||||||
items, err := s.load(ctx, namespace)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
deletedCount := 0
|
|
||||||
newItems := make([]T, 0)
|
|
||||||
for idx := range items {
|
|
||||||
del, err := predicate(items[idx])
|
|
||||||
if err != nil {
|
|
||||||
return deletedCount, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if del {
|
|
||||||
deletedCount += 1
|
|
||||||
} else {
|
|
||||||
newItems = append(newItems, items[idx])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if deletedCount != 0 {
|
|
||||||
return deletedCount, s.save(ctx, namespace, newItems)
|
|
||||||
}
|
|
||||||
|
|
||||||
return deletedCount, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *localFsCollection[T]) FindFirst(ctx context.Context, namespace string, predicate Predicate[T]) (T, error) {
|
|
||||||
var nilResult T
|
|
||||||
|
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
|
|
||||||
items, err := s.load(ctx, namespace)
|
|
||||||
if err != nil {
|
|
||||||
return nilResult, err
|
|
||||||
}
|
|
||||||
|
|
||||||
for idx := range items {
|
|
||||||
match, err := predicate(items[idx])
|
|
||||||
if err != nil {
|
|
||||||
return nilResult, err
|
|
||||||
}
|
|
||||||
if match {
|
|
||||||
return items[idx], nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nilResult, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *localFsCollection[T]) Find(ctx context.Context, namespace string, predicate Predicate[T]) ([]T, error) {
|
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
|
|
||||||
items, err := s.load(ctx, namespace)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
result := make([]T, 0)
|
|
||||||
for idx := range items {
|
|
||||||
match, err := predicate(items[idx])
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if match {
|
|
||||||
result = append(result, items[idx])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return result, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *localFsCollection[T]) Update(ctx context.Context, namespace string, updateFn UpdateFn[T]) (int, error) {
|
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
|
|
||||||
items, err := s.load(ctx, namespace)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
newItems := make([]T, 0)
|
|
||||||
updatedCount := 0
|
|
||||||
for idx := range items {
|
|
||||||
updated, updatedItem, err := updateFn(items[idx])
|
|
||||||
if err != nil {
|
|
||||||
return updatedCount, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if updated {
|
|
||||||
updatedCount += 1
|
|
||||||
newItems = append(newItems, updatedItem)
|
|
||||||
} else {
|
|
||||||
newItems = append(newItems, items[idx])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if updatedCount != 0 {
|
|
||||||
return updatedCount, s.save(ctx, namespace, newItems)
|
|
||||||
}
|
|
||||||
|
|
||||||
return updatedCount, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *localFsCollection[T]) load(ctx context.Context, namespace string) ([]T, error) {
|
|
||||||
filePath := s.collectionFilePath(namespace)
|
|
||||||
// Safe to ignore gosec warning G304, the path comes from grafana settings rather than the user input
|
|
||||||
// nolint:gosec
|
|
||||||
bytes, err := os.ReadFile(filePath)
|
|
||||||
if err != nil {
|
|
||||||
if os.IsNotExist(err) {
|
|
||||||
return []T{}, nil
|
|
||||||
}
|
|
||||||
return nil, fmt.Errorf("can't read %s file: %w", filePath, err)
|
|
||||||
}
|
|
||||||
var db CollectionFileContents[T]
|
|
||||||
if err = json.Unmarshal(bytes, &db); err != nil {
|
|
||||||
return nil, fmt.Errorf("can't unmarshal %s data: %w", filePath, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if db.Version != s.version {
|
|
||||||
if err := s.save(ctx, namespace, []T{}); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return []T{}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return db.Items, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *localFsCollection[T]) save(_ context.Context, namespace string, items []T) error {
|
|
||||||
filePath := s.collectionFilePath(namespace)
|
|
||||||
|
|
||||||
bytes, err := json.MarshalIndent(&CollectionFileContents[T]{
|
|
||||||
Version: s.version,
|
|
||||||
Items: items,
|
|
||||||
}, "", " ")
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("can't marshal items: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return os.WriteFile(filePath, bytes, 0600)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *localFsCollection[T]) createCollectionsDirectory() error {
|
|
||||||
_, err := os.Stat(s.collectionsDir)
|
|
||||||
if os.IsNotExist(err) {
|
|
||||||
return os.MkdirAll(s.collectionsDir, 0750)
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *localFsCollection[T]) collectionFilePath(namespace string) string {
|
|
||||||
return filepath.Join(s.collectionsDir, fmt.Sprintf("%s-namespace-%s.json", s.name, namespace))
|
|
||||||
}
|
|
||||||
@@ -1,89 +0,0 @@
|
|||||||
package persistentcollection
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
"path"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
|
||||||
|
|
||||||
type item struct {
|
|
||||||
Name string `json:"name"`
|
|
||||||
Val int64 `json:"val"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestLocalFSPersistentCollection(t *testing.T) {
|
|
||||||
namespace := "1"
|
|
||||||
ctx := context.Background()
|
|
||||||
dir := path.Join(os.TempDir(), "persistent-collection-test")
|
|
||||||
defer func() {
|
|
||||||
if err := os.RemoveAll(dir); err != nil {
|
|
||||||
fmt.Printf("Failed to remove temporary directory %q: %s\n", dir, err.Error())
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
coll := NewLocalFSPersistentCollection[*item]("test", dir, 1)
|
|
||||||
|
|
||||||
firstInserted := &item{
|
|
||||||
Name: "test",
|
|
||||||
Val: 10,
|
|
||||||
}
|
|
||||||
err := coll.Insert(ctx, namespace, firstInserted)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
err = coll.Insert(ctx, namespace, &item{
|
|
||||||
Name: "test",
|
|
||||||
Val: 20,
|
|
||||||
})
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
err = coll.Insert(ctx, namespace, &item{
|
|
||||||
Name: "test",
|
|
||||||
Val: 30,
|
|
||||||
})
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
updatedCount, err := coll.Update(ctx, namespace, func(i *item) (bool, *item, error) {
|
|
||||||
if i.Val == 20 {
|
|
||||||
return true, &item{Val: 25, Name: "test"}, nil
|
|
||||||
}
|
|
||||||
return false, nil, nil
|
|
||||||
})
|
|
||||||
require.Equal(t, 1, updatedCount)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
deletedCount, err := coll.Delete(ctx, namespace, func(i *item) (bool, error) {
|
|
||||||
if i.Val == 30 {
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
return false, nil
|
|
||||||
})
|
|
||||||
require.Equal(t, 1, deletedCount)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
firstFound, err := coll.FindFirst(ctx, namespace, func(i *item) (bool, error) {
|
|
||||||
if i.Name == "test" {
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return false, nil
|
|
||||||
})
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, firstInserted, firstFound)
|
|
||||||
|
|
||||||
all, err := coll.Find(ctx, namespace, func(i *item) (bool, error) { return true, nil })
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, []*item{
|
|
||||||
{
|
|
||||||
Name: "test",
|
|
||||||
Val: 10,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Name: "test",
|
|
||||||
Val: 25,
|
|
||||||
},
|
|
||||||
}, all)
|
|
||||||
}
|
|
||||||
@@ -1,21 +0,0 @@
|
|||||||
package persistentcollection
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Predicate[T any] func(item T) (bool, error)
|
|
||||||
type UpdateFn[T any] func(item T) (updated bool, updatedItem T, err error)
|
|
||||||
|
|
||||||
// PersistentCollection is a collection of items that's going to retain its state between Grafana restarts.
|
|
||||||
// The main purpose of this API is to reduce the time-to-Proof-of-Concept - this is NOT intended for production use.
|
|
||||||
//
|
|
||||||
// The item type needs to be serializable to JSON.
|
|
||||||
// @alpha -- EXPERIMENTAL
|
|
||||||
type PersistentCollection[T any] interface {
|
|
||||||
Delete(ctx context.Context, namespace string, predicate Predicate[T]) (deletedCount int, err error)
|
|
||||||
FindFirst(ctx context.Context, namespace string, predicate Predicate[T]) (T, error)
|
|
||||||
Find(ctx context.Context, namespace string, predicate Predicate[T]) ([]T, error)
|
|
||||||
Update(ctx context.Context, namespace string, updateFn UpdateFn[T]) (updatedCount int, err error)
|
|
||||||
Insert(ctx context.Context, namespace string, item T) error
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user