From 617dd1b40ee5195d8a9b577707d8f4dca2ac751e Mon Sep 17 00:00:00 2001 From: Ryan McKinley Date: Fri, 14 Jun 2024 21:30:41 +0300 Subject: [PATCH] add basic list --- pkg/storage/unified/resource/fs.go | 209 +++++++++++++++++- pkg/storage/unified/resource/keys.go | 109 +++++++++ pkg/storage/unified/resource/keys_test.go | 30 +++ pkg/storage/unified/resource/resource.go | 56 ----- pkg/storage/unified/resource/resource_test.go | 28 --- pkg/storage/unified/resource/server.go | 4 +- pkg/storage/unified/resource/server_test.go | 10 + 7 files changed, 355 insertions(+), 91 deletions(-) create mode 100644 pkg/storage/unified/resource/keys.go create mode 100644 pkg/storage/unified/resource/keys_test.go delete mode 100644 pkg/storage/unified/resource/resource.go delete mode 100644 pkg/storage/unified/resource/resource_test.go diff --git a/pkg/storage/unified/resource/fs.go b/pkg/storage/unified/resource/fs.go index 14deb57b2f1..e5a70e1b376 100644 --- a/pkg/storage/unified/resource/fs.go +++ b/pkg/storage/unified/resource/fs.go @@ -4,8 +4,10 @@ import ( "context" "encoding/json" "fmt" + "io/fs" "path/filepath" "sort" + "strconv" "strings" "github.com/hack-pad/hackpadfs" @@ -38,12 +40,17 @@ func NewFileSystemStore(opts FileSystemOptions) (AppendingStore, error) { } } - return &fsStore{tracer: opts.Tracer, root: root}, nil + return &fsStore{ + tracer: opts.Tracer, + root: root, + keys: &simpleConverter{}, // not tenant isolated + }, nil } type fsStore struct { tracer trace.Tracer root hackpadfs.FS + keys KeyConversions } type fsEvent struct { @@ -64,8 +71,11 @@ func (f *fsStore) WriteEvent(ctx context.Context, event *WriteEvent) (int64, err // Blob... } // For this case, we will treat them the same - dir := event.Key.NamespacedPath() - err := hackpadfs.MkdirAll(f.root, dir, 0750) + dir, err := f.keys.KeyToPath(event.Key, 0) + if err != nil { + return 0, err + } + err = hackpadfs.MkdirAll(f.root, dir, 0750) if err != nil { return 0, err } @@ -89,7 +99,10 @@ func (f *fsStore) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, er rv := req.ResourceVersion fname := "--x--" - dir := req.Key.NamespacedPath() + dir, err := f.keys.KeyToPath(req.Key, 0) + if err != nil { + return nil, err + } if rv > 0 { fname = fmt.Sprintf("%d.json", rv) } else { @@ -140,9 +153,195 @@ func (f *fsStore) open(p string) (*fsEvent, error) { return evt, err } +type eventTree struct { + path string + group string + resource string + namespaces []namespaceEvents +} + +func (t *eventTree) list(fs *fsStore, rv int64) (*ListResponse, error) { + rsp := &ListResponse{} + for idx, ns := range t.namespaces { + if idx == 0 { + rsp.ResourceVersion = ns.version() + } + err := ns.append(fs, rv, rsp) + if err != nil { + return rsp, err + } + } + return rsp, nil +} + +func (t *eventTree) read(root fs.FS, key *ResourceKey) error { + t.group = key.Group + t.resource = key.Resource + t.path = fmt.Sprintf("%s/%s", t.group, t.resource) + + // Cluster scoped, with an explicit name + if key.Namespace == "" { + if key.Name != "" { + ns := namespaceEvents{ + path: t.path + "/__cluster__", + namespace: "", + } + err := ns.read(root, key) + if err == nil { + t.namespaces = append(t.namespaces, ns) + } + return err + } + } + + files, err := hackpadfs.ReadDir(root, t.path) + if err != nil { + return err + } + for _, file := range files { + ns := namespaceEvents{ + path: t.path + "/" + file.Name(), + namespace: file.Name(), + } + err = ns.read(root, key) + if err != nil { + return err + } + t.namespaces = append(t.namespaces, ns) + } + + return nil +} + +type namespaceEvents struct { + path string + namespace string + names []nameEvents +} + +func (t *namespaceEvents) version() int64 { + if len(t.names) > 0 { + return t.names[0].version() + } + return 0 +} + +func (t *namespaceEvents) append(fs *fsStore, rv int64, rsp *ListResponse) error { + for _, name := range t.names { + err := name.append(fs, rv, rsp) + if err != nil { + return err + } + } + return nil +} + +func (t *namespaceEvents) read(root fs.FS, key *ResourceKey) error { + if key.Name != "" { + vv := nameEvents{ + path: t.path + "/" + key.Name, + name: key.Name, + } + err := vv.read(root) + if err != nil { + return err + } + t.names = []nameEvents{vv} + } + + files, err := hackpadfs.ReadDir(root, t.path) + if err != nil { + return err + } + for _, file := range files { + ns := nameEvents{ + path: t.path + "/" + file.Name(), + name: file.Name(), + } + err = ns.read(root) + if err != nil { + return err + } + t.names = append(t.names, ns) + } + return nil +} + +type nameEvents struct { + path string + name string + versions []resourceEvent +} + +func (t *nameEvents) version() int64 { + if len(t.versions) > 0 { + return t.versions[0].rv + } + return 0 +} + +func (t *nameEvents) append(fs *fsStore, rv int64, rsp *ListResponse) error { + for _, rev := range t.versions { + val, err := fs.open(t.path + "/" + rev.file) + if err != nil { + return err + } + wrapper := &ResourceWrapper{ + ResourceVersion: val.ResourceVersion, + Value: val.Value, + // Operation: val.Operation, + } + rsp.Items = append(rsp.Items, wrapper) + if true { + return nil + } + } + return nil +} + +func (t *nameEvents) read(root fs.FS) error { + var err error + files, err := hackpadfs.ReadDir(root, t.path) + if err != nil { + return err + } + for _, file := range files { + p := file.Name() + if file.IsDir() || !strings.HasSuffix(p, ".json") { + continue + } + + base := strings.TrimSuffix(p, ".json") + base = strings.TrimPrefix(base, "rv") + rr := resourceEvent{file: p} + rr.rv, err = strconv.ParseInt(base, 10, 64) + if err != nil { + return err + } + t.versions = append(t.versions, rr) + } + sort.Slice(t.versions, func(i int, j int) bool { + return t.versions[i].rv > t.versions[j].rv + }) + return err +} + +type resourceEvent struct { + file string // path to the actual file + rv int64 +} + // List implements AppendingStore. func (f *fsStore) List(ctx context.Context, req *ListRequest) (*ListResponse, error) { - panic("unimplemented") + tree := eventTree{ + group: req.Options.Key.Group, + resource: req.Options.Key.Resource, + } + err := tree.read(f.root, req.Options.Key) + if err != nil { + return nil, err + } + return tree.list(f, req.ResourceVersion) } // Watch implements AppendingStore. diff --git a/pkg/storage/unified/resource/keys.go b/pkg/storage/unified/resource/keys.go new file mode 100644 index 00000000000..3e3eea3d9b3 --- /dev/null +++ b/pkg/storage/unified/resource/keys.go @@ -0,0 +1,109 @@ +package resource + +import ( + "bytes" + "fmt" + "strconv" + "strings" +) + +type KeyConversions interface { + KeyToPath(k *ResourceKey, rv int64) (string, error) + PathToKey(p string) (k *ResourceKey, rv int64, err error) + PathPrefix(k *ResourceKey) string +} + +var _ KeyConversions = &simpleConverter{} + +// group/resource/namespace/name +type simpleConverter struct{} + +// KeyToPath implements KeyConversions. +func (s *simpleConverter) KeyToPath(x *ResourceKey, rv int64) (string, error) { + var buffer bytes.Buffer + + if x.Group == "" { + return "", fmt.Errorf("missing group") + } + buffer.WriteString(x.Group) + buffer.WriteString("/") + + if x.Resource == "" { + return "", fmt.Errorf("missing resource") + } + buffer.WriteString(x.Resource) + buffer.WriteString("/") + + if x.Namespace == "" { + buffer.WriteString("__cluster__") + } else { + buffer.WriteString(x.Namespace) + } + + if x.Name == "" { + return buffer.String(), nil + } + buffer.WriteString("/") + buffer.WriteString(x.Name) + + if rv > 0 { + buffer.WriteString("/") + buffer.WriteString(fmt.Sprintf("%.20d", rv)) + } + + return buffer.String(), nil +} + +// KeyToPath implements KeyConversions. +func (s *simpleConverter) PathPrefix(x *ResourceKey) string { + var buffer bytes.Buffer + if x.Group == "" { + return "" + } + buffer.WriteString(x.Group) + + if x.Resource == "" { + return buffer.String() + } + buffer.WriteString("/") + buffer.WriteString(x.Resource) + + if x.Namespace == "" { + if x.Name == "" { + return buffer.String() + } + buffer.WriteString("/__cluster__") + } else { + buffer.WriteString("/") + buffer.WriteString(x.Namespace) + } + + if x.Name == "" { + return buffer.String() + } + buffer.WriteString("/") + buffer.WriteString(x.Name) + return buffer.String() +} + +// PathToKey implements KeyConversions. +func (s *simpleConverter) PathToKey(p string) (k *ResourceKey, rv int64, err error) { + key := &ResourceKey{} + parts := strings.Split(p, "/") + if len(parts) < 2 { + return nil, 0, fmt.Errorf("expecting at least group/resource") + } + key.Group = parts[0] + key.Resource = parts[1] + if len(parts) > 2 { + key.Namespace = parts[2] + } + if len(parts) > 3 { + key.Name = parts[3] + } + if len(parts) > 4 { + parts = strings.Split(parts[4], ".") + rv, err = strconv.ParseInt(parts[0], 10, 64) + } + return key, rv, err +} diff --git a/pkg/storage/unified/resource/keys_test.go b/pkg/storage/unified/resource/keys_test.go new file mode 100644 index 00000000000..f65fc4f1f7b --- /dev/null +++ b/pkg/storage/unified/resource/keys_test.go @@ -0,0 +1,30 @@ +package resource + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestKeyConversions(t *testing.T) { + t.Run("key namespaced path", func(t *testing.T) { + conv := &simpleConverter{} + key := &ResourceKey{ + Group: "ggg", + Resource: "rrr", + Namespace: "ns", + } + p, err := conv.KeyToPath(key, 0) + require.NoError(t, err) + require.Equal(t, "ggg/rrr/ns", p) + + key.Name = "name" + p, err = conv.KeyToPath(key, 0) + require.NoError(t, err) + require.Equal(t, "ggg/rrr/ns/name", p) + require.Equal(t, "ggg/rrr", conv.PathPrefix(&ResourceKey{ + Group: "ggg", + Resource: "rrr", + })) + }) +} diff --git a/pkg/storage/unified/resource/resource.go b/pkg/storage/unified/resource/resource.go deleted file mode 100644 index 6d04d901eb1..00000000000 --- a/pkg/storage/unified/resource/resource.go +++ /dev/null @@ -1,56 +0,0 @@ -package resource - -import ( - "bytes" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" -) - -// NamespacedPath is a path that can be used to isolate tenant data -// NOTE: this strategy does not allow quickly searching across namespace boundaries with a prefix -func (x *ResourceKey) NamespacedPath() string { - var buffer bytes.Buffer - if x.Namespace == "" { - buffer.WriteString("__cluster__") - } else { - buffer.WriteString(x.Namespace) - } - if x.Group == "" { - return buffer.String() - } - buffer.WriteString("/") - buffer.WriteString(x.Group) - - if x.Resource == "" { - return buffer.String() - } - buffer.WriteString("/") - buffer.WriteString(x.Resource) - - if x.Name == "" { - return buffer.String() - } - buffer.WriteString("/") - buffer.WriteString(x.Name) - return buffer.String() -} - -// Return a copy without the resource version -func (x *ResourceKey) WithoutResourceVersion() *ResourceKey { - return &ResourceKey{ - Namespace: x.Namespace, - Group: x.Group, - Resource: x.Resource, - Name: x.Name, - } -} - -func ResourceKeyFor(gr schema.GroupResource, obj metav1.Object) *ResourceKey { - return &ResourceKey{ - Group: gr.Group, - Resource: gr.Resource, - Namespace: obj.GetNamespace(), - Name: obj.GetName(), - } -} diff --git a/pkg/storage/unified/resource/resource_test.go b/pkg/storage/unified/resource/resource_test.go deleted file mode 100644 index 20b2bef1cdb..00000000000 --- a/pkg/storage/unified/resource/resource_test.go +++ /dev/null @@ -1,28 +0,0 @@ -package resource_test - -import ( - "testing" - - "github.com/stretchr/testify/require" - - "github.com/grafana/grafana/pkg/storage/unified/resource" -) - -func TestResourceModels(t *testing.T) { - t.Run("key namespaced path", func(t *testing.T) { - key := &resource.ResourceKey{} - require.Equal(t, "__cluster__", key.NamespacedPath()) - - key.Namespace = "ns" - require.Equal(t, "ns", key.NamespacedPath()) - - key.Group = "ggg" - require.Equal(t, "ns/ggg", key.NamespacedPath()) - - key.Resource = "rrr" - require.Equal(t, "ns/ggg/rrr", key.NamespacedPath()) - - key.Name = "nnnn" - require.Equal(t, "ns/ggg/rrr/nnnn", key.NamespacedPath()) - }) -} diff --git a/pkg/storage/unified/resource/server.go b/pkg/storage/unified/resource/server.go index b4808b342d8..501f044e932 100644 --- a/pkg/storage/unified/resource/server.go +++ b/pkg/storage/unified/resource/server.go @@ -343,7 +343,7 @@ func (s *server) Update(ctx context.Context, req *UpdateRequest) (*UpdateRespons } latest, err := s.store.Read(ctx, &ReadRequest{ - Key: req.Key.WithoutResourceVersion(), + Key: req.Key, }) if err != nil { return nil, err @@ -386,7 +386,7 @@ func (s *server) Delete(ctx context.Context, req *DeleteRequest) (*DeleteRespons } latest, err := s.store.Read(ctx, &ReadRequest{ - Key: req.Key.WithoutResourceVersion(), + Key: req.Key, }) if err != nil { return nil, err diff --git a/pkg/storage/unified/resource/server_test.go b/pkg/storage/unified/resource/server_test.go index 5ea5d36d3da..f0cf1c8cb62 100644 --- a/pkg/storage/unified/resource/server_test.go +++ b/pkg/storage/unified/resource/server_test.go @@ -93,6 +93,16 @@ func TestWriter(t *testing.T) { require.NoError(t, err) require.Equal(t, updated.ResourceVersion, found.ResourceVersion) + all, err := server.List(ctx, &ListRequest{Options: &ListOptions{ + Key: &ResourceKey{ + Group: key.Group, + Resource: key.Resource, + }, + }}) + require.NoError(t, err) + require.Len(t, all.Items, 1) + require.Equal(t, updated.ResourceVersion, all.Items[0].ResourceVersion) + deleted, err := server.Delete(ctx, &DeleteRequest{Key: key, ResourceVersion: updated.ResourceVersion}) require.NoError(t, err) require.True(t, deleted.ResourceVersion > updated.ResourceVersion)