now with a base server implementation

This commit is contained in:
Ryan McKinley 2024-06-14 14:24:36 +03:00
parent 4a41f7d0dd
commit f66768c67d
4 changed files with 783 additions and 0 deletions

View File

@ -0,0 +1,153 @@
package resource
import (
"context"
"encoding/json"
"fmt"
"path/filepath"
"sort"
"strings"
"github.com/hack-pad/hackpadfs"
"github.com/hack-pad/hackpadfs/mem"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
)
type FileSystemOptions struct {
// OTel tracer
Tracer trace.Tracer
// Root file system -- null will be in memory
Root hackpadfs.FS
}
func NewFileSystemStore(opts FileSystemOptions) (AppendingStore, error) {
if opts.Tracer == nil {
opts.Tracer = noop.NewTracerProvider().Tracer("fs")
}
var err error
root := opts.Root
if root == nil {
root, err = mem.NewFS()
if err != nil {
return nil, err
}
}
return &fsStore{tracer: opts.Tracer, root: root}, nil
}
type fsStore struct {
tracer trace.Tracer
root hackpadfs.FS
}
type fsEvent struct {
ResourceVersion int64 `json:"resourceVersion"`
Message string `json:"message,omitempty"`
Operation string `json:"operation,omitempty"`
Value json.RawMessage `json:"value,omitempty"`
BlobPath string `json:"blob,omitempty"`
}
// The only write command
func (f *fsStore) WriteEvent(ctx context.Context, event *WriteEvent) (int64, error) {
body := fsEvent{
ResourceVersion: event.EventID,
Message: event.Message,
Operation: event.Operation.String(),
Value: event.Value,
// Blob...
}
// For this case, we will treat them the same
event.Key.ResourceVersion = 0
dir := event.Key.NamespacedPath()
err := hackpadfs.MkdirAll(f.root, dir, 0750)
if err != nil {
return 0, err
}
bytes, err := json.Marshal(&body)
if err != nil {
return 0, err
}
fpath := filepath.Join(dir, fmt.Sprintf("%d.json", event.EventID))
file, err := hackpadfs.OpenFile(f.root, fpath, hackpadfs.FlagWriteOnly|hackpadfs.FlagCreate, 0750)
if err != nil {
return 0, err
}
_, err = hackpadfs.WriteFile(file, bytes)
return event.EventID, err
}
// Read implements ResourceStoreServer.
func (f *fsStore) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, error) {
rv := req.Key.ResourceVersion
req.Key.ResourceVersion = 0
fname := "--x--"
dir := req.Key.NamespacedPath()
if rv > 0 {
fname = fmt.Sprintf("%d.json", rv)
} else {
files, err := hackpadfs.ReadDir(f.root, dir)
if err != nil {
return nil, err
}
// Sort by name
sort.Slice(files, func(i, j int) bool {
a := files[i].Name()
b := files[j].Name()
return a > b // ?? should we parse the numbers ???
})
// The first matching file
for _, v := range files {
fname = v.Name()
if strings.HasSuffix(fname, ".json") {
break
}
}
}
evt, err := f.open(filepath.Join(dir, fname))
if err != nil || evt.Operation == ResourceOperation_DELETED.String() {
return nil, apierrors.NewNotFound(schema.GroupResource{
Group: req.Key.Group,
Resource: req.Key.Resource,
}, req.Key.Name)
}
return &ReadResponse{
ResourceVersion: evt.ResourceVersion,
Value: evt.Value,
Message: evt.Message,
}, nil
}
func (f *fsStore) open(p string) (*fsEvent, error) {
raw, err := hackpadfs.ReadFile(f.root, p)
if err != nil {
return nil, err
}
evt := &fsEvent{}
err = json.Unmarshal(raw, evt)
return evt, err
}
// List implements AppendingStore.
func (f *fsStore) List(ctx context.Context, req *ListRequest) (*ListResponse, error) {
panic("unimplemented")
}
// Watch implements AppendingStore.
func (f *fsStore) Watch(*WatchRequest, ResourceStore_WatchServer) error {
panic("unimplemented")
}

View File

@ -0,0 +1,45 @@
package resource
import (
context "context"
"fmt"
"github.com/grafana/grafana/pkg/apimachinery/identity"
)
type WriteAccessHooks struct {
// Check if a user has access to write folders
// When this is nil, no resources can have folders configured
Folder func(ctx context.Context, user identity.Requester, uid string) bool
// When configured, this will make sure a user is allowed to save to a given origin
Origin func(ctx context.Context, user identity.Requester, origin string) bool
}
type LifecycleHooks interface {
// Called once at initialization
Init() error
// Stop function -- after calling this, any additional storage functions may error
Stop()
}
func (a *WriteAccessHooks) CanWriteFolder(ctx context.Context, user identity.Requester, uid string) error {
if a.Folder == nil {
return fmt.Errorf("writing folders is not supported")
}
if !a.Folder(ctx, user, uid) {
return fmt.Errorf("not allowed to write resource to folder")
}
return nil
}
func (a *WriteAccessHooks) CanWriteOrigin(ctx context.Context, user identity.Requester, uid string) error {
if a.Origin == nil || uid == "UI" {
return nil // default to OK
}
if !a.Origin(ctx, user, uid) {
return fmt.Errorf("not allowed to write resource at origin")
}
return nil
}

View File

@ -0,0 +1,469 @@
package resource
import (
context "context"
"encoding/json"
"errors"
"fmt"
"sync"
"time"
"github.com/bwmarrin/snowflake"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/apimachinery/utils"
)
// Package-level errors.
var (
ErrNotFound = errors.New("entity not found")
ErrOptimisticLockingFailed = errors.New("optimistic locking failed")
ErrUserNotFoundInContext = errors.New("user not found in context")
ErrUnableToReadResourceJSON = errors.New("unable to read resource json")
ErrNextPageTokenNotSupported = errors.New("nextPageToken not yet supported")
ErrLimitNotSupported = errors.New("limit not yet supported")
ErrNotImplementedYet = errors.New("not implemented yet")
)
// ResourceServer implements all services
type ResourceServer interface {
ResourceStoreServer
// ResourceSearchServer
// DiagnosticsServer
// Called once for initialization
Init() error
// Stop
Stop()
}
type AppendingStore interface {
// Write a Create/Update/Delete,
// NOTE: the contents of WriteEvent have been validated
// Return the revisionVersion for this event or error
WriteEvent(context.Context, *WriteEvent) (int64, error)
// Read a value from storage
Read(context.Context, *ReadRequest) (*ReadResponse, error)
// Implement List -- this expects the read after write semantics
List(context.Context, *ListRequest) (*ListResponse, error)
// Watch for events
// TODO... this should be converted to a go style function
// that returns a channel (??) rather than the raw grpc server management
Watch(*WatchRequest, ResourceStore_WatchServer) error
}
type ResourceServerOptions struct {
// OTel tracer
Tracer trace.Tracer
// When running in a cluster, each node should have a different ID
// This is used for snowflake generation and log identification
NodeID int64
// Get the next EventID. When not set, this will default to snowflake IDs
NextEventID func() int64
// Real storage backend
Store AppendingStore
// Real storage backend
Search ResourceSearchServer
// Diagnostics
Diagnostics DiagnosticsServer
// Check if a user has access to write folders
// When this is nil, no resources can have folders configured
WriteAccess WriteAccessHooks
// Callbacks for startup and shutdown
Lifecycle LifecycleHooks
}
func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
if opts.Tracer == nil {
opts.Tracer = noop.NewTracerProvider().Tracer("resource-server")
}
if opts.NextEventID == nil {
eventNode, err := snowflake.NewNode(opts.NodeID)
if err != nil {
return nil, apierrors.NewInternalError(
fmt.Errorf("error initializing snowflake id generator :: %w", err))
}
opts.NextEventID = func() int64 {
return eventNode.Generate().Int64()
}
}
if opts.Store == nil {
return nil, fmt.Errorf("missing AppendingStore implementation")
}
if opts.Search == nil {
return nil, fmt.Errorf("missing ResourceSearchServer implementation")
}
if opts.Diagnostics == nil {
return nil, fmt.Errorf("missing Diagnostics implementation")
}
return &server{
tracer: opts.Tracer,
nextEventID: opts.NextEventID,
store: opts.Store,
search: opts.Search,
diagnostics: opts.Diagnostics,
access: opts.WriteAccess,
lifecycle: opts.Lifecycle,
}, nil
}
var _ ResourceServer = &server{}
type server struct {
tracer trace.Tracer
nextEventID func() int64
store AppendingStore
search ResourceSearchServer
diagnostics DiagnosticsServer
access WriteAccessHooks
lifecycle LifecycleHooks
// init checking
once sync.Once
initErr error
}
// Init implements ResourceServer.
func (s *server) Init() error {
s.once.Do(func() {
// TODO, setup a broadcaster for watch
// Call lifecycle hooks
if s.lifecycle != nil {
err := s.lifecycle.Init()
if err != nil {
s.initErr = fmt.Errorf("initialize Resource Server: %w", err)
}
}
})
return s.initErr
}
func (s *server) Stop() {
s.initErr = fmt.Errorf("service is stopping")
if s.lifecycle != nil {
s.lifecycle.Stop()
}
s.initErr = fmt.Errorf("service is stopped")
}
func (s *server) newEvent(ctx context.Context, key *ResourceKey, value, oldValue []byte) (*WriteEvent, error) {
var err error
event := &WriteEvent{
EventID: s.nextEventID(),
Key: key,
Value: value,
}
event.Requester, err = identity.GetRequester(ctx)
if err != nil {
return nil, ErrUserNotFoundInContext
}
dummy := &metav1.PartialObjectMetadata{}
err = json.Unmarshal(value, dummy)
if err != nil {
return nil, ErrUnableToReadResourceJSON
}
obj, err := utils.MetaAccessor(dummy)
if err != nil {
return nil, apierrors.NewBadRequest("invalid object in json")
}
if obj.GetUID() == "" {
return nil, apierrors.NewBadRequest("the UID must be set")
}
if obj.GetGenerateName() != "" {
return nil, apierrors.NewBadRequest("can not save value with generate name")
}
gvk := obj.GetGroupVersionKind()
if gvk.Kind == "" {
return nil, apierrors.NewBadRequest("expecting resources with a kind in the body")
}
if gvk.Version == "" {
return nil, apierrors.NewBadRequest("expecting resources with an apiVersion")
}
if gvk.Group != "" && gvk.Group != key.Group {
return nil, apierrors.NewBadRequest(
fmt.Sprintf("group in key does not match group in the body (%s != %s)", key.Group, gvk.Group),
)
}
if obj.GetName() != key.Name {
return nil, apierrors.NewBadRequest("key name does not match the name in the body")
}
if obj.GetNamespace() != key.Namespace {
return nil, apierrors.NewBadRequest("key namespace does not match the namespace in the body")
}
folder := obj.GetFolder()
if folder != "" {
err = s.access.CanWriteFolder(ctx, event.Requester, folder)
if err != nil {
return nil, err
}
}
origin, err := obj.GetOriginInfo()
if err != nil {
return nil, apierrors.NewBadRequest("invalid origin info")
}
if origin != nil {
err = s.access.CanWriteOrigin(ctx, event.Requester, origin.Name)
if err != nil {
return nil, err
}
}
event.Object = obj
// This is an update
if oldValue != nil {
dummy := &metav1.PartialObjectMetadata{}
err = json.Unmarshal(oldValue, dummy)
if err != nil {
return nil, apierrors.NewBadRequest("error reading old json value")
}
old, err := utils.MetaAccessor(dummy)
if err != nil {
return nil, apierrors.NewBadRequest("invalid object inside old json")
}
if key.Name != old.GetName() {
return nil, apierrors.NewBadRequest(
fmt.Sprintf("the old value has a different name (%s != %s)", key.Name, old.GetName()))
}
// Can not change creation timestamps+user
if obj.GetCreatedBy() != old.GetCreatedBy() {
return nil, apierrors.NewBadRequest(
fmt.Sprintf("can not change the created by metadata (%s != %s)", obj.GetCreatedBy(), old.GetCreatedBy()))
}
if obj.GetCreationTimestamp() != old.GetCreationTimestamp() {
return nil, apierrors.NewBadRequest(
fmt.Sprintf("can not change the CreationTimestamp metadata (%v != %v)", obj.GetCreationTimestamp(), old.GetCreationTimestamp()))
}
oldFolder := obj.GetFolder()
if oldFolder != folder {
event.FolderChanged = true
}
event.OldObject = old
} else if folder != "" {
event.FolderChanged = true
}
return event, nil
}
func (s *server) Create(ctx context.Context, req *CreateRequest) (*CreateResponse, error) {
ctx, span := s.tracer.Start(ctx, "storage_server.Create")
defer span.End()
if err := s.Init(); err != nil {
return nil, err
}
if req.Key.ResourceVersion > 0 {
return nil, apierrors.NewBadRequest("can not update a specific resource version")
}
event, err := s.newEvent(ctx, req.Key, req.Value, nil)
if err != nil {
return nil, err
}
event.Operation = ResourceOperation_CREATED
event.Blob = req.Blob
event.Message = req.Message
rsp := &CreateResponse{}
// Make sure the created by user is accurate
//----------------------------------------
val := event.Object.GetCreatedBy()
if val != "" && val != event.Requester.GetUID().String() {
return nil, apierrors.NewBadRequest("created by annotation does not match: metadata.annotations#" + utils.AnnoKeyCreatedBy)
}
// Create can not have updated properties
//----------------------------------------
if event.Object.GetUpdatedBy() != "" {
return nil, apierrors.NewBadRequest("unexpected metadata.annotations#" + utils.AnnoKeyCreatedBy)
}
ts, err := event.Object.GetUpdatedTimestamp()
if err != nil {
return nil, apierrors.NewBadRequest(fmt.Sprintf("invalid timestamp: %s", err))
}
if ts != nil {
return nil, apierrors.NewBadRequest("unexpected metadata.annotations#" + utils.AnnoKeyUpdatedTimestamp)
}
// Append and set the resource version
rsp.ResourceVersion, err = s.store.WriteEvent(ctx, event)
rsp.Status, err = errToStatus(err)
return rsp, err
}
// Convert golang errors to status result errors that can be returned to a client
func errToStatus(err error) (*StatusResult, error) {
if err != nil {
// TODO... better conversion!!!
return &StatusResult{
Status: "Failure",
Message: err.Error(),
}, nil
}
return nil, err
}
func (s *server) Update(ctx context.Context, req *UpdateRequest) (*UpdateResponse, error) {
ctx, span := s.tracer.Start(ctx, "storage_server.Update")
defer span.End()
if err := s.Init(); err != nil {
return nil, err
}
rsp := &UpdateResponse{}
if req.Key.ResourceVersion < 0 {
rsp.Status, _ = errToStatus(apierrors.NewBadRequest("update must include the previous version"))
return rsp, nil
}
latest, err := s.store.Read(ctx, &ReadRequest{
Key: req.Key.WithoutResourceVersion(),
})
if err != nil {
return nil, err
}
if latest.Value == nil {
return nil, apierrors.NewBadRequest("current value does not exist")
}
event, err := s.newEvent(ctx, req.Key, req.Value, latest.Value)
if err != nil {
return nil, err
}
event.Operation = ResourceOperation_UPDATED
event.PreviousRV = latest.ResourceVersion
event.Message = req.Message
// Make sure the update user is accurate
//----------------------------------------
val := event.Object.GetUpdatedBy()
if val != "" && val != event.Requester.GetUID().String() {
return nil, apierrors.NewBadRequest("updated by annotation does not match: metadata.annotations#" + utils.AnnoKeyUpdatedBy)
}
rsp.ResourceVersion, err = s.store.WriteEvent(ctx, event)
rsp.Status, err = errToStatus(err)
return rsp, err
}
func (s *server) Delete(ctx context.Context, req *DeleteRequest) (*DeleteResponse, error) {
ctx, span := s.tracer.Start(ctx, "storage_server.Delete")
defer span.End()
if err := s.Init(); err != nil {
return nil, err
}
rsp := &DeleteResponse{}
if req.Key.ResourceVersion < 0 {
return nil, apierrors.NewBadRequest("update must include the previous version")
}
latest, err := s.store.Read(ctx, &ReadRequest{
Key: req.Key.WithoutResourceVersion(),
})
if err != nil {
return nil, err
}
if latest.ResourceVersion != req.Key.ResourceVersion {
return nil, ErrOptimisticLockingFailed
}
now := metav1.NewTime(time.Now())
event := &WriteEvent{
EventID: s.nextEventID(),
Key: req.Key,
Operation: ResourceOperation_DELETED,
PreviousRV: latest.ResourceVersion,
}
event.Requester, err = identity.GetRequester(ctx)
if err != nil {
return nil, apierrors.NewBadRequest("unable to get user")
}
marker := &DeletedMarker{}
err = json.Unmarshal(latest.Value, marker)
if err != nil {
return nil, apierrors.NewBadRequest(
fmt.Sprintf("unable to read previous object, %v", err))
}
event.Object, err = utils.MetaAccessor(marker)
if err != nil {
return nil, err
}
event.Object.SetDeletionTimestamp(&now)
event.Object.SetUpdatedTimestamp(&now.Time)
event.Object.SetManagedFields(nil)
event.Object.SetFinalizers(nil)
event.Object.SetUpdatedBy(event.Requester.GetUID().String())
marker.TypeMeta = metav1.TypeMeta{
Kind: "DeletedMarker",
APIVersion: "storage.grafana.app/v0alpha1", // ?? or can we stick this in common?
}
marker.Annotations["RestoreResourceVersion"] = fmt.Sprintf("%d", event.PreviousRV)
event.Value, err = json.Marshal(marker)
if err != nil {
return nil, apierrors.NewBadRequest(
fmt.Sprintf("unable creating deletion marker, %v", err))
}
rsp.ResourceVersion, err = s.store.WriteEvent(ctx, event)
rsp.Status, err = errToStatus(err)
return rsp, err
}
func (s *server) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, error) {
if err := s.Init(); err != nil {
return nil, err
}
rsp, err := s.store.Read(ctx, req)
if err != nil {
if rsp == nil {
rsp = &ReadResponse{}
}
rsp.Status, err = errToStatus(err)
}
return rsp, err
}
func (s *server) List(ctx context.Context, req *ListRequest) (*ListResponse, error) {
if err := s.Init(); err != nil {
return nil, err
}
rsp, err := s.store.List(ctx, req)
// Status???
return rsp, err
}
func (s *server) Watch(req *WatchRequest, srv ResourceStore_WatchServer) error {
if err := s.Init(); err != nil {
return err
}
return s.Watch(req, srv)
}

View File

@ -0,0 +1,116 @@
package resource
import (
"context"
"embed"
"encoding/json"
"fmt"
"os"
"testing"
"time"
"github.com/hack-pad/hackpadfs"
hackos "github.com/hack-pad/hackpadfs/os"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/apimachinery/utils"
)
func TestWriter(t *testing.T) {
testUserA := &identity.StaticRequester{
Namespace: identity.NamespaceUser,
UserID: 123,
UserUID: "u123",
OrgRole: identity.RoleAdmin,
IsGrafanaAdmin: true, // can do anything
}
ctx := identity.WithRequester(context.Background(), testUserA)
var root hackpadfs.FS
if false {
tmp, err := os.MkdirTemp("", "xxx-*")
require.NoError(t, err)
root, err = hackos.NewFS().Sub(tmp[1:])
require.NoError(t, err)
fmt.Printf("ROOT: %s\n\n", tmp)
}
tmp, err := NewFileSystemStore(FileSystemOptions{
Root: root,
})
require.NoError(t, err)
server, err := NewResourceServer(ResourceServerOptions{
Store: tmp,
})
require.NoError(t, err)
t.Run("playlist happy CRUD paths", func(t *testing.T) {
raw := testdata(t, "01_create_playlist.json")
key := &ResourceKey{
Group: "playlist.grafana.app",
Resource: "rrrr", // can be anything :(
Namespace: "default",
Name: "fdgsv37qslr0ga",
}
created, err := server.Create(ctx, &CreateRequest{
Value: raw,
Key: key,
})
require.NoError(t, err)
require.True(t, created.ResourceVersion > 0)
// The key does not include resource version
found, err := server.Read(ctx, &ReadRequest{Key: key})
require.NoError(t, err)
require.Equal(t, created.ResourceVersion, found.ResourceVersion)
// Now update the value
tmp := &unstructured.Unstructured{}
err = json.Unmarshal(raw, tmp)
require.NoError(t, err)
now := time.Now().UnixMilli()
obj, err := utils.MetaAccessor(tmp)
require.NoError(t, err)
obj.SetAnnotation("test", "hello")
obj.SetUpdatedTimestampMillis(now)
obj.SetUpdatedBy(testUserA.GetUID().String())
raw, err = json.Marshal(tmp)
require.NoError(t, err)
key.ResourceVersion = created.ResourceVersion
updated, err := server.Update(ctx, &UpdateRequest{Key: key, Value: raw})
require.NoError(t, err)
require.True(t, updated.ResourceVersion > created.ResourceVersion)
// We should still get the latest
key.ResourceVersion = 0
found, err = server.Read(ctx, &ReadRequest{Key: key})
require.NoError(t, err)
require.Equal(t, updated.ResourceVersion, found.ResourceVersion)
key.ResourceVersion = updated.ResourceVersion
deleted, err := server.Delete(ctx, &DeleteRequest{Key: key})
require.NoError(t, err)
require.True(t, deleted.ResourceVersion > updated.ResourceVersion)
// We should get not found when trying to read the latest value
key.ResourceVersion = 0
found, err = server.Read(ctx, &ReadRequest{Key: key})
require.Error(t, err)
require.Nil(t, found)
})
}
//go:embed testdata/*
var testdataFS embed.FS
func testdata(t *testing.T, filename string) []byte {
t.Helper()
b, err := testdataFS.ReadFile(`testdata/` + filename)
require.NoError(t, err)
return b
}