add basic list

This commit is contained in:
Ryan McKinley 2024-06-14 21:30:41 +03:00
parent 8108e90fe2
commit 617dd1b40e
7 changed files with 355 additions and 91 deletions

View File

@ -4,8 +4,10 @@ import (
"context"
"encoding/json"
"fmt"
"io/fs"
"path/filepath"
"sort"
"strconv"
"strings"
"github.com/hack-pad/hackpadfs"
@ -38,12 +40,17 @@ func NewFileSystemStore(opts FileSystemOptions) (AppendingStore, error) {
}
}
return &fsStore{tracer: opts.Tracer, root: root}, nil
return &fsStore{
tracer: opts.Tracer,
root: root,
keys: &simpleConverter{}, // not tenant isolated
}, nil
}
type fsStore struct {
tracer trace.Tracer
root hackpadfs.FS
keys KeyConversions
}
type fsEvent struct {
@ -64,8 +71,11 @@ func (f *fsStore) WriteEvent(ctx context.Context, event *WriteEvent) (int64, err
// Blob...
}
// For this case, we will treat them the same
dir := event.Key.NamespacedPath()
err := hackpadfs.MkdirAll(f.root, dir, 0750)
dir, err := f.keys.KeyToPath(event.Key, 0)
if err != nil {
return 0, err
}
err = hackpadfs.MkdirAll(f.root, dir, 0750)
if err != nil {
return 0, err
}
@ -89,7 +99,10 @@ func (f *fsStore) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, er
rv := req.ResourceVersion
fname := "--x--"
dir := req.Key.NamespacedPath()
dir, err := f.keys.KeyToPath(req.Key, 0)
if err != nil {
return nil, err
}
if rv > 0 {
fname = fmt.Sprintf("%d.json", rv)
} else {
@ -140,9 +153,195 @@ func (f *fsStore) open(p string) (*fsEvent, error) {
return evt, err
}
type eventTree struct {
path string
group string
resource string
namespaces []namespaceEvents
}
func (t *eventTree) list(fs *fsStore, rv int64) (*ListResponse, error) {
rsp := &ListResponse{}
for idx, ns := range t.namespaces {
if idx == 0 {
rsp.ResourceVersion = ns.version()
}
err := ns.append(fs, rv, rsp)
if err != nil {
return rsp, err
}
}
return rsp, nil
}
func (t *eventTree) read(root fs.FS, key *ResourceKey) error {
t.group = key.Group
t.resource = key.Resource
t.path = fmt.Sprintf("%s/%s", t.group, t.resource)
// Cluster scoped, with an explicit name
if key.Namespace == "" {
if key.Name != "" {
ns := namespaceEvents{
path: t.path + "/__cluster__",
namespace: "",
}
err := ns.read(root, key)
if err == nil {
t.namespaces = append(t.namespaces, ns)
}
return err
}
}
files, err := hackpadfs.ReadDir(root, t.path)
if err != nil {
return err
}
for _, file := range files {
ns := namespaceEvents{
path: t.path + "/" + file.Name(),
namespace: file.Name(),
}
err = ns.read(root, key)
if err != nil {
return err
}
t.namespaces = append(t.namespaces, ns)
}
return nil
}
type namespaceEvents struct {
path string
namespace string
names []nameEvents
}
func (t *namespaceEvents) version() int64 {
if len(t.names) > 0 {
return t.names[0].version()
}
return 0
}
func (t *namespaceEvents) append(fs *fsStore, rv int64, rsp *ListResponse) error {
for _, name := range t.names {
err := name.append(fs, rv, rsp)
if err != nil {
return err
}
}
return nil
}
func (t *namespaceEvents) read(root fs.FS, key *ResourceKey) error {
if key.Name != "" {
vv := nameEvents{
path: t.path + "/" + key.Name,
name: key.Name,
}
err := vv.read(root)
if err != nil {
return err
}
t.names = []nameEvents{vv}
}
files, err := hackpadfs.ReadDir(root, t.path)
if err != nil {
return err
}
for _, file := range files {
ns := nameEvents{
path: t.path + "/" + file.Name(),
name: file.Name(),
}
err = ns.read(root)
if err != nil {
return err
}
t.names = append(t.names, ns)
}
return nil
}
type nameEvents struct {
path string
name string
versions []resourceEvent
}
func (t *nameEvents) version() int64 {
if len(t.versions) > 0 {
return t.versions[0].rv
}
return 0
}
func (t *nameEvents) append(fs *fsStore, rv int64, rsp *ListResponse) error {
for _, rev := range t.versions {
val, err := fs.open(t.path + "/" + rev.file)
if err != nil {
return err
}
wrapper := &ResourceWrapper{
ResourceVersion: val.ResourceVersion,
Value: val.Value,
// Operation: val.Operation,
}
rsp.Items = append(rsp.Items, wrapper)
if true {
return nil
}
}
return nil
}
func (t *nameEvents) read(root fs.FS) error {
var err error
files, err := hackpadfs.ReadDir(root, t.path)
if err != nil {
return err
}
for _, file := range files {
p := file.Name()
if file.IsDir() || !strings.HasSuffix(p, ".json") {
continue
}
base := strings.TrimSuffix(p, ".json")
base = strings.TrimPrefix(base, "rv")
rr := resourceEvent{file: p}
rr.rv, err = strconv.ParseInt(base, 10, 64)
if err != nil {
return err
}
t.versions = append(t.versions, rr)
}
sort.Slice(t.versions, func(i int, j int) bool {
return t.versions[i].rv > t.versions[j].rv
})
return err
}
type resourceEvent struct {
file string // path to the actual file
rv int64
}
// List implements AppendingStore.
func (f *fsStore) List(ctx context.Context, req *ListRequest) (*ListResponse, error) {
panic("unimplemented")
tree := eventTree{
group: req.Options.Key.Group,
resource: req.Options.Key.Resource,
}
err := tree.read(f.root, req.Options.Key)
if err != nil {
return nil, err
}
return tree.list(f, req.ResourceVersion)
}
// Watch implements AppendingStore.

View File

@ -0,0 +1,109 @@
package resource
import (
"bytes"
"fmt"
"strconv"
"strings"
)
type KeyConversions interface {
KeyToPath(k *ResourceKey, rv int64) (string, error)
PathToKey(p string) (k *ResourceKey, rv int64, err error)
PathPrefix(k *ResourceKey) string
}
var _ KeyConversions = &simpleConverter{}
// group/resource/namespace/name
type simpleConverter struct{}
// KeyToPath implements KeyConversions.
func (s *simpleConverter) KeyToPath(x *ResourceKey, rv int64) (string, error) {
var buffer bytes.Buffer
if x.Group == "" {
return "", fmt.Errorf("missing group")
}
buffer.WriteString(x.Group)
buffer.WriteString("/")
if x.Resource == "" {
return "", fmt.Errorf("missing resource")
}
buffer.WriteString(x.Resource)
buffer.WriteString("/")
if x.Namespace == "" {
buffer.WriteString("__cluster__")
} else {
buffer.WriteString(x.Namespace)
}
if x.Name == "" {
return buffer.String(), nil
}
buffer.WriteString("/")
buffer.WriteString(x.Name)
if rv > 0 {
buffer.WriteString("/")
buffer.WriteString(fmt.Sprintf("%.20d", rv))
}
return buffer.String(), nil
}
// KeyToPath implements KeyConversions.
func (s *simpleConverter) PathPrefix(x *ResourceKey) string {
var buffer bytes.Buffer
if x.Group == "" {
return ""
}
buffer.WriteString(x.Group)
if x.Resource == "" {
return buffer.String()
}
buffer.WriteString("/")
buffer.WriteString(x.Resource)
if x.Namespace == "" {
if x.Name == "" {
return buffer.String()
}
buffer.WriteString("/__cluster__")
} else {
buffer.WriteString("/")
buffer.WriteString(x.Namespace)
}
if x.Name == "" {
return buffer.String()
}
buffer.WriteString("/")
buffer.WriteString(x.Name)
return buffer.String()
}
// PathToKey implements KeyConversions.
func (s *simpleConverter) PathToKey(p string) (k *ResourceKey, rv int64, err error) {
key := &ResourceKey{}
parts := strings.Split(p, "/")
if len(parts) < 2 {
return nil, 0, fmt.Errorf("expecting at least group/resource")
}
key.Group = parts[0]
key.Resource = parts[1]
if len(parts) > 2 {
key.Namespace = parts[2]
}
if len(parts) > 3 {
key.Name = parts[3]
}
if len(parts) > 4 {
parts = strings.Split(parts[4], ".")
rv, err = strconv.ParseInt(parts[0], 10, 64)
}
return key, rv, err
}

View File

@ -0,0 +1,30 @@
package resource
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestKeyConversions(t *testing.T) {
t.Run("key namespaced path", func(t *testing.T) {
conv := &simpleConverter{}
key := &ResourceKey{
Group: "ggg",
Resource: "rrr",
Namespace: "ns",
}
p, err := conv.KeyToPath(key, 0)
require.NoError(t, err)
require.Equal(t, "ggg/rrr/ns", p)
key.Name = "name"
p, err = conv.KeyToPath(key, 0)
require.NoError(t, err)
require.Equal(t, "ggg/rrr/ns/name", p)
require.Equal(t, "ggg/rrr", conv.PathPrefix(&ResourceKey{
Group: "ggg",
Resource: "rrr",
}))
})
}

View File

@ -1,56 +0,0 @@
package resource
import (
"bytes"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)
// NamespacedPath is a path that can be used to isolate tenant data
// NOTE: this strategy does not allow quickly searching across namespace boundaries with a prefix
func (x *ResourceKey) NamespacedPath() string {
var buffer bytes.Buffer
if x.Namespace == "" {
buffer.WriteString("__cluster__")
} else {
buffer.WriteString(x.Namespace)
}
if x.Group == "" {
return buffer.String()
}
buffer.WriteString("/")
buffer.WriteString(x.Group)
if x.Resource == "" {
return buffer.String()
}
buffer.WriteString("/")
buffer.WriteString(x.Resource)
if x.Name == "" {
return buffer.String()
}
buffer.WriteString("/")
buffer.WriteString(x.Name)
return buffer.String()
}
// Return a copy without the resource version
func (x *ResourceKey) WithoutResourceVersion() *ResourceKey {
return &ResourceKey{
Namespace: x.Namespace,
Group: x.Group,
Resource: x.Resource,
Name: x.Name,
}
}
func ResourceKeyFor(gr schema.GroupResource, obj metav1.Object) *ResourceKey {
return &ResourceKey{
Group: gr.Group,
Resource: gr.Resource,
Namespace: obj.GetNamespace(),
Name: obj.GetName(),
}
}

View File

@ -1,28 +0,0 @@
package resource_test
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/storage/unified/resource"
)
func TestResourceModels(t *testing.T) {
t.Run("key namespaced path", func(t *testing.T) {
key := &resource.ResourceKey{}
require.Equal(t, "__cluster__", key.NamespacedPath())
key.Namespace = "ns"
require.Equal(t, "ns", key.NamespacedPath())
key.Group = "ggg"
require.Equal(t, "ns/ggg", key.NamespacedPath())
key.Resource = "rrr"
require.Equal(t, "ns/ggg/rrr", key.NamespacedPath())
key.Name = "nnnn"
require.Equal(t, "ns/ggg/rrr/nnnn", key.NamespacedPath())
})
}

View File

@ -343,7 +343,7 @@ func (s *server) Update(ctx context.Context, req *UpdateRequest) (*UpdateRespons
}
latest, err := s.store.Read(ctx, &ReadRequest{
Key: req.Key.WithoutResourceVersion(),
Key: req.Key,
})
if err != nil {
return nil, err
@ -386,7 +386,7 @@ func (s *server) Delete(ctx context.Context, req *DeleteRequest) (*DeleteRespons
}
latest, err := s.store.Read(ctx, &ReadRequest{
Key: req.Key.WithoutResourceVersion(),
Key: req.Key,
})
if err != nil {
return nil, err

View File

@ -93,6 +93,16 @@ func TestWriter(t *testing.T) {
require.NoError(t, err)
require.Equal(t, updated.ResourceVersion, found.ResourceVersion)
all, err := server.List(ctx, &ListRequest{Options: &ListOptions{
Key: &ResourceKey{
Group: key.Group,
Resource: key.Resource,
},
}})
require.NoError(t, err)
require.Len(t, all.Items, 1)
require.Equal(t, updated.ResourceVersion, all.Items[0].ResourceVersion)
deleted, err := server.Delete(ctx, &DeleteRequest{Key: key, ResourceVersion: updated.ResourceVersion})
require.NoError(t, err)
require.True(t, deleted.ResourceVersion > updated.ResourceVersion)