mirror of
https://github.com/grafana/grafana.git
synced 2025-02-10 23:55:47 -06:00
now with a base server implementation
This commit is contained in:
parent
4a41f7d0dd
commit
f66768c67d
153
pkg/storage/unified/resource/fs.go
Normal file
153
pkg/storage/unified/resource/fs.go
Normal file
@ -0,0 +1,153 @@
|
||||
package resource
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/hack-pad/hackpadfs"
|
||||
"github.com/hack-pad/hackpadfs/mem"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.opentelemetry.io/otel/trace/noop"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
)
|
||||
|
||||
type FileSystemOptions struct {
|
||||
// OTel tracer
|
||||
Tracer trace.Tracer
|
||||
|
||||
// Root file system -- null will be in memory
|
||||
Root hackpadfs.FS
|
||||
}
|
||||
|
||||
func NewFileSystemStore(opts FileSystemOptions) (AppendingStore, error) {
|
||||
if opts.Tracer == nil {
|
||||
opts.Tracer = noop.NewTracerProvider().Tracer("fs")
|
||||
}
|
||||
|
||||
var err error
|
||||
root := opts.Root
|
||||
if root == nil {
|
||||
root, err = mem.NewFS()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return &fsStore{tracer: opts.Tracer, root: root}, nil
|
||||
}
|
||||
|
||||
type fsStore struct {
|
||||
tracer trace.Tracer
|
||||
root hackpadfs.FS
|
||||
}
|
||||
|
||||
type fsEvent struct {
|
||||
ResourceVersion int64 `json:"resourceVersion"`
|
||||
Message string `json:"message,omitempty"`
|
||||
Operation string `json:"operation,omitempty"`
|
||||
Value json.RawMessage `json:"value,omitempty"`
|
||||
BlobPath string `json:"blob,omitempty"`
|
||||
}
|
||||
|
||||
// The only write command
|
||||
func (f *fsStore) WriteEvent(ctx context.Context, event *WriteEvent) (int64, error) {
|
||||
body := fsEvent{
|
||||
ResourceVersion: event.EventID,
|
||||
Message: event.Message,
|
||||
Operation: event.Operation.String(),
|
||||
Value: event.Value,
|
||||
// Blob...
|
||||
}
|
||||
// For this case, we will treat them the same
|
||||
event.Key.ResourceVersion = 0
|
||||
dir := event.Key.NamespacedPath()
|
||||
err := hackpadfs.MkdirAll(f.root, dir, 0750)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
bytes, err := json.Marshal(&body)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
fpath := filepath.Join(dir, fmt.Sprintf("%d.json", event.EventID))
|
||||
file, err := hackpadfs.OpenFile(f.root, fpath, hackpadfs.FlagWriteOnly|hackpadfs.FlagCreate, 0750)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
_, err = hackpadfs.WriteFile(file, bytes)
|
||||
return event.EventID, err
|
||||
}
|
||||
|
||||
// Read implements ResourceStoreServer.
|
||||
func (f *fsStore) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, error) {
|
||||
rv := req.Key.ResourceVersion
|
||||
req.Key.ResourceVersion = 0
|
||||
|
||||
fname := "--x--"
|
||||
dir := req.Key.NamespacedPath()
|
||||
if rv > 0 {
|
||||
fname = fmt.Sprintf("%d.json", rv)
|
||||
} else {
|
||||
files, err := hackpadfs.ReadDir(f.root, dir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Sort by name
|
||||
sort.Slice(files, func(i, j int) bool {
|
||||
a := files[i].Name()
|
||||
b := files[j].Name()
|
||||
return a > b // ?? should we parse the numbers ???
|
||||
})
|
||||
|
||||
// The first matching file
|
||||
for _, v := range files {
|
||||
fname = v.Name()
|
||||
if strings.HasSuffix(fname, ".json") {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
evt, err := f.open(filepath.Join(dir, fname))
|
||||
if err != nil || evt.Operation == ResourceOperation_DELETED.String() {
|
||||
return nil, apierrors.NewNotFound(schema.GroupResource{
|
||||
Group: req.Key.Group,
|
||||
Resource: req.Key.Resource,
|
||||
}, req.Key.Name)
|
||||
}
|
||||
|
||||
return &ReadResponse{
|
||||
ResourceVersion: evt.ResourceVersion,
|
||||
Value: evt.Value,
|
||||
Message: evt.Message,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (f *fsStore) open(p string) (*fsEvent, error) {
|
||||
raw, err := hackpadfs.ReadFile(f.root, p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
evt := &fsEvent{}
|
||||
err = json.Unmarshal(raw, evt)
|
||||
return evt, err
|
||||
}
|
||||
|
||||
// List implements AppendingStore.
|
||||
func (f *fsStore) List(ctx context.Context, req *ListRequest) (*ListResponse, error) {
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
// Watch implements AppendingStore.
|
||||
func (f *fsStore) Watch(*WatchRequest, ResourceStore_WatchServer) error {
|
||||
panic("unimplemented")
|
||||
}
|
45
pkg/storage/unified/resource/hooks.go
Normal file
45
pkg/storage/unified/resource/hooks.go
Normal file
@ -0,0 +1,45 @@
|
||||
package resource
|
||||
|
||||
import (
|
||||
context "context"
|
||||
"fmt"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/identity"
|
||||
)
|
||||
|
||||
type WriteAccessHooks struct {
|
||||
// Check if a user has access to write folders
|
||||
// When this is nil, no resources can have folders configured
|
||||
Folder func(ctx context.Context, user identity.Requester, uid string) bool
|
||||
|
||||
// When configured, this will make sure a user is allowed to save to a given origin
|
||||
Origin func(ctx context.Context, user identity.Requester, origin string) bool
|
||||
}
|
||||
|
||||
type LifecycleHooks interface {
|
||||
// Called once at initialization
|
||||
Init() error
|
||||
|
||||
// Stop function -- after calling this, any additional storage functions may error
|
||||
Stop()
|
||||
}
|
||||
|
||||
func (a *WriteAccessHooks) CanWriteFolder(ctx context.Context, user identity.Requester, uid string) error {
|
||||
if a.Folder == nil {
|
||||
return fmt.Errorf("writing folders is not supported")
|
||||
}
|
||||
if !a.Folder(ctx, user, uid) {
|
||||
return fmt.Errorf("not allowed to write resource to folder")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *WriteAccessHooks) CanWriteOrigin(ctx context.Context, user identity.Requester, uid string) error {
|
||||
if a.Origin == nil || uid == "UI" {
|
||||
return nil // default to OK
|
||||
}
|
||||
if !a.Origin(ctx, user, uid) {
|
||||
return fmt.Errorf("not allowed to write resource at origin")
|
||||
}
|
||||
return nil
|
||||
}
|
469
pkg/storage/unified/resource/server.go
Normal file
469
pkg/storage/unified/resource/server.go
Normal file
@ -0,0 +1,469 @@
|
||||
package resource
|
||||
|
||||
import (
|
||||
context "context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/bwmarrin/snowflake"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.opentelemetry.io/otel/trace/noop"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/identity"
|
||||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||
)
|
||||
|
||||
// Package-level errors.
|
||||
var (
|
||||
ErrNotFound = errors.New("entity not found")
|
||||
ErrOptimisticLockingFailed = errors.New("optimistic locking failed")
|
||||
ErrUserNotFoundInContext = errors.New("user not found in context")
|
||||
ErrUnableToReadResourceJSON = errors.New("unable to read resource json")
|
||||
ErrNextPageTokenNotSupported = errors.New("nextPageToken not yet supported")
|
||||
ErrLimitNotSupported = errors.New("limit not yet supported")
|
||||
ErrNotImplementedYet = errors.New("not implemented yet")
|
||||
)
|
||||
|
||||
// ResourceServer implements all services
|
||||
type ResourceServer interface {
|
||||
ResourceStoreServer
|
||||
// ResourceSearchServer
|
||||
// DiagnosticsServer
|
||||
|
||||
// Called once for initialization
|
||||
Init() error
|
||||
|
||||
// Stop
|
||||
Stop()
|
||||
}
|
||||
|
||||
type AppendingStore interface {
|
||||
// Write a Create/Update/Delete,
|
||||
// NOTE: the contents of WriteEvent have been validated
|
||||
// Return the revisionVersion for this event or error
|
||||
WriteEvent(context.Context, *WriteEvent) (int64, error)
|
||||
|
||||
// Read a value from storage
|
||||
Read(context.Context, *ReadRequest) (*ReadResponse, error)
|
||||
|
||||
// Implement List -- this expects the read after write semantics
|
||||
List(context.Context, *ListRequest) (*ListResponse, error)
|
||||
|
||||
// Watch for events
|
||||
// TODO... this should be converted to a go style function
|
||||
// that returns a channel (??) rather than the raw grpc server management
|
||||
Watch(*WatchRequest, ResourceStore_WatchServer) error
|
||||
}
|
||||
|
||||
type ResourceServerOptions struct {
|
||||
// OTel tracer
|
||||
Tracer trace.Tracer
|
||||
|
||||
// When running in a cluster, each node should have a different ID
|
||||
// This is used for snowflake generation and log identification
|
||||
NodeID int64
|
||||
|
||||
// Get the next EventID. When not set, this will default to snowflake IDs
|
||||
NextEventID func() int64
|
||||
|
||||
// Real storage backend
|
||||
Store AppendingStore
|
||||
|
||||
// Real storage backend
|
||||
Search ResourceSearchServer
|
||||
|
||||
// Diagnostics
|
||||
Diagnostics DiagnosticsServer
|
||||
|
||||
// Check if a user has access to write folders
|
||||
// When this is nil, no resources can have folders configured
|
||||
WriteAccess WriteAccessHooks
|
||||
|
||||
// Callbacks for startup and shutdown
|
||||
Lifecycle LifecycleHooks
|
||||
}
|
||||
|
||||
func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
|
||||
if opts.Tracer == nil {
|
||||
opts.Tracer = noop.NewTracerProvider().Tracer("resource-server")
|
||||
}
|
||||
|
||||
if opts.NextEventID == nil {
|
||||
eventNode, err := snowflake.NewNode(opts.NodeID)
|
||||
if err != nil {
|
||||
return nil, apierrors.NewInternalError(
|
||||
fmt.Errorf("error initializing snowflake id generator :: %w", err))
|
||||
}
|
||||
opts.NextEventID = func() int64 {
|
||||
return eventNode.Generate().Int64()
|
||||
}
|
||||
}
|
||||
|
||||
if opts.Store == nil {
|
||||
return nil, fmt.Errorf("missing AppendingStore implementation")
|
||||
}
|
||||
if opts.Search == nil {
|
||||
return nil, fmt.Errorf("missing ResourceSearchServer implementation")
|
||||
}
|
||||
if opts.Diagnostics == nil {
|
||||
return nil, fmt.Errorf("missing Diagnostics implementation")
|
||||
}
|
||||
|
||||
return &server{
|
||||
tracer: opts.Tracer,
|
||||
nextEventID: opts.NextEventID,
|
||||
store: opts.Store,
|
||||
search: opts.Search,
|
||||
diagnostics: opts.Diagnostics,
|
||||
access: opts.WriteAccess,
|
||||
lifecycle: opts.Lifecycle,
|
||||
}, nil
|
||||
}
|
||||
|
||||
var _ ResourceServer = &server{}
|
||||
|
||||
type server struct {
|
||||
tracer trace.Tracer
|
||||
nextEventID func() int64
|
||||
store AppendingStore
|
||||
search ResourceSearchServer
|
||||
diagnostics DiagnosticsServer
|
||||
access WriteAccessHooks
|
||||
lifecycle LifecycleHooks
|
||||
|
||||
// init checking
|
||||
once sync.Once
|
||||
initErr error
|
||||
}
|
||||
|
||||
// Init implements ResourceServer.
|
||||
func (s *server) Init() error {
|
||||
s.once.Do(func() {
|
||||
// TODO, setup a broadcaster for watch
|
||||
|
||||
// Call lifecycle hooks
|
||||
if s.lifecycle != nil {
|
||||
err := s.lifecycle.Init()
|
||||
if err != nil {
|
||||
s.initErr = fmt.Errorf("initialize Resource Server: %w", err)
|
||||
}
|
||||
}
|
||||
})
|
||||
return s.initErr
|
||||
}
|
||||
|
||||
func (s *server) Stop() {
|
||||
s.initErr = fmt.Errorf("service is stopping")
|
||||
if s.lifecycle != nil {
|
||||
s.lifecycle.Stop()
|
||||
}
|
||||
s.initErr = fmt.Errorf("service is stopped")
|
||||
}
|
||||
|
||||
func (s *server) newEvent(ctx context.Context, key *ResourceKey, value, oldValue []byte) (*WriteEvent, error) {
|
||||
var err error
|
||||
event := &WriteEvent{
|
||||
EventID: s.nextEventID(),
|
||||
Key: key,
|
||||
Value: value,
|
||||
}
|
||||
event.Requester, err = identity.GetRequester(ctx)
|
||||
if err != nil {
|
||||
return nil, ErrUserNotFoundInContext
|
||||
}
|
||||
|
||||
dummy := &metav1.PartialObjectMetadata{}
|
||||
err = json.Unmarshal(value, dummy)
|
||||
if err != nil {
|
||||
return nil, ErrUnableToReadResourceJSON
|
||||
}
|
||||
|
||||
obj, err := utils.MetaAccessor(dummy)
|
||||
if err != nil {
|
||||
return nil, apierrors.NewBadRequest("invalid object in json")
|
||||
}
|
||||
if obj.GetUID() == "" {
|
||||
return nil, apierrors.NewBadRequest("the UID must be set")
|
||||
}
|
||||
if obj.GetGenerateName() != "" {
|
||||
return nil, apierrors.NewBadRequest("can not save value with generate name")
|
||||
}
|
||||
gvk := obj.GetGroupVersionKind()
|
||||
if gvk.Kind == "" {
|
||||
return nil, apierrors.NewBadRequest("expecting resources with a kind in the body")
|
||||
}
|
||||
if gvk.Version == "" {
|
||||
return nil, apierrors.NewBadRequest("expecting resources with an apiVersion")
|
||||
}
|
||||
if gvk.Group != "" && gvk.Group != key.Group {
|
||||
return nil, apierrors.NewBadRequest(
|
||||
fmt.Sprintf("group in key does not match group in the body (%s != %s)", key.Group, gvk.Group),
|
||||
)
|
||||
}
|
||||
if obj.GetName() != key.Name {
|
||||
return nil, apierrors.NewBadRequest("key name does not match the name in the body")
|
||||
}
|
||||
if obj.GetNamespace() != key.Namespace {
|
||||
return nil, apierrors.NewBadRequest("key namespace does not match the namespace in the body")
|
||||
}
|
||||
folder := obj.GetFolder()
|
||||
if folder != "" {
|
||||
err = s.access.CanWriteFolder(ctx, event.Requester, folder)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
origin, err := obj.GetOriginInfo()
|
||||
if err != nil {
|
||||
return nil, apierrors.NewBadRequest("invalid origin info")
|
||||
}
|
||||
if origin != nil {
|
||||
err = s.access.CanWriteOrigin(ctx, event.Requester, origin.Name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
event.Object = obj
|
||||
|
||||
// This is an update
|
||||
if oldValue != nil {
|
||||
dummy := &metav1.PartialObjectMetadata{}
|
||||
err = json.Unmarshal(oldValue, dummy)
|
||||
if err != nil {
|
||||
return nil, apierrors.NewBadRequest("error reading old json value")
|
||||
}
|
||||
old, err := utils.MetaAccessor(dummy)
|
||||
if err != nil {
|
||||
return nil, apierrors.NewBadRequest("invalid object inside old json")
|
||||
}
|
||||
if key.Name != old.GetName() {
|
||||
return nil, apierrors.NewBadRequest(
|
||||
fmt.Sprintf("the old value has a different name (%s != %s)", key.Name, old.GetName()))
|
||||
}
|
||||
|
||||
// Can not change creation timestamps+user
|
||||
if obj.GetCreatedBy() != old.GetCreatedBy() {
|
||||
return nil, apierrors.NewBadRequest(
|
||||
fmt.Sprintf("can not change the created by metadata (%s != %s)", obj.GetCreatedBy(), old.GetCreatedBy()))
|
||||
}
|
||||
if obj.GetCreationTimestamp() != old.GetCreationTimestamp() {
|
||||
return nil, apierrors.NewBadRequest(
|
||||
fmt.Sprintf("can not change the CreationTimestamp metadata (%v != %v)", obj.GetCreationTimestamp(), old.GetCreationTimestamp()))
|
||||
}
|
||||
|
||||
oldFolder := obj.GetFolder()
|
||||
if oldFolder != folder {
|
||||
event.FolderChanged = true
|
||||
}
|
||||
event.OldObject = old
|
||||
} else if folder != "" {
|
||||
event.FolderChanged = true
|
||||
}
|
||||
return event, nil
|
||||
}
|
||||
|
||||
func (s *server) Create(ctx context.Context, req *CreateRequest) (*CreateResponse, error) {
|
||||
ctx, span := s.tracer.Start(ctx, "storage_server.Create")
|
||||
defer span.End()
|
||||
|
||||
if err := s.Init(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if req.Key.ResourceVersion > 0 {
|
||||
return nil, apierrors.NewBadRequest("can not update a specific resource version")
|
||||
}
|
||||
|
||||
event, err := s.newEvent(ctx, req.Key, req.Value, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
event.Operation = ResourceOperation_CREATED
|
||||
event.Blob = req.Blob
|
||||
event.Message = req.Message
|
||||
|
||||
rsp := &CreateResponse{}
|
||||
// Make sure the created by user is accurate
|
||||
//----------------------------------------
|
||||
val := event.Object.GetCreatedBy()
|
||||
if val != "" && val != event.Requester.GetUID().String() {
|
||||
return nil, apierrors.NewBadRequest("created by annotation does not match: metadata.annotations#" + utils.AnnoKeyCreatedBy)
|
||||
}
|
||||
|
||||
// Create can not have updated properties
|
||||
//----------------------------------------
|
||||
if event.Object.GetUpdatedBy() != "" {
|
||||
return nil, apierrors.NewBadRequest("unexpected metadata.annotations#" + utils.AnnoKeyCreatedBy)
|
||||
}
|
||||
|
||||
ts, err := event.Object.GetUpdatedTimestamp()
|
||||
if err != nil {
|
||||
return nil, apierrors.NewBadRequest(fmt.Sprintf("invalid timestamp: %s", err))
|
||||
}
|
||||
if ts != nil {
|
||||
return nil, apierrors.NewBadRequest("unexpected metadata.annotations#" + utils.AnnoKeyUpdatedTimestamp)
|
||||
}
|
||||
|
||||
// Append and set the resource version
|
||||
rsp.ResourceVersion, err = s.store.WriteEvent(ctx, event)
|
||||
rsp.Status, err = errToStatus(err)
|
||||
return rsp, err
|
||||
}
|
||||
|
||||
// Convert golang errors to status result errors that can be returned to a client
|
||||
func errToStatus(err error) (*StatusResult, error) {
|
||||
if err != nil {
|
||||
// TODO... better conversion!!!
|
||||
return &StatusResult{
|
||||
Status: "Failure",
|
||||
Message: err.Error(),
|
||||
}, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func (s *server) Update(ctx context.Context, req *UpdateRequest) (*UpdateResponse, error) {
|
||||
ctx, span := s.tracer.Start(ctx, "storage_server.Update")
|
||||
defer span.End()
|
||||
|
||||
if err := s.Init(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rsp := &UpdateResponse{}
|
||||
if req.Key.ResourceVersion < 0 {
|
||||
rsp.Status, _ = errToStatus(apierrors.NewBadRequest("update must include the previous version"))
|
||||
return rsp, nil
|
||||
}
|
||||
|
||||
latest, err := s.store.Read(ctx, &ReadRequest{
|
||||
Key: req.Key.WithoutResourceVersion(),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if latest.Value == nil {
|
||||
return nil, apierrors.NewBadRequest("current value does not exist")
|
||||
}
|
||||
|
||||
event, err := s.newEvent(ctx, req.Key, req.Value, latest.Value)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
event.Operation = ResourceOperation_UPDATED
|
||||
event.PreviousRV = latest.ResourceVersion
|
||||
event.Message = req.Message
|
||||
|
||||
// Make sure the update user is accurate
|
||||
//----------------------------------------
|
||||
val := event.Object.GetUpdatedBy()
|
||||
if val != "" && val != event.Requester.GetUID().String() {
|
||||
return nil, apierrors.NewBadRequest("updated by annotation does not match: metadata.annotations#" + utils.AnnoKeyUpdatedBy)
|
||||
}
|
||||
|
||||
rsp.ResourceVersion, err = s.store.WriteEvent(ctx, event)
|
||||
rsp.Status, err = errToStatus(err)
|
||||
return rsp, err
|
||||
}
|
||||
|
||||
func (s *server) Delete(ctx context.Context, req *DeleteRequest) (*DeleteResponse, error) {
|
||||
ctx, span := s.tracer.Start(ctx, "storage_server.Delete")
|
||||
defer span.End()
|
||||
|
||||
if err := s.Init(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rsp := &DeleteResponse{}
|
||||
if req.Key.ResourceVersion < 0 {
|
||||
return nil, apierrors.NewBadRequest("update must include the previous version")
|
||||
}
|
||||
|
||||
latest, err := s.store.Read(ctx, &ReadRequest{
|
||||
Key: req.Key.WithoutResourceVersion(),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if latest.ResourceVersion != req.Key.ResourceVersion {
|
||||
return nil, ErrOptimisticLockingFailed
|
||||
}
|
||||
|
||||
now := metav1.NewTime(time.Now())
|
||||
event := &WriteEvent{
|
||||
EventID: s.nextEventID(),
|
||||
Key: req.Key,
|
||||
Operation: ResourceOperation_DELETED,
|
||||
PreviousRV: latest.ResourceVersion,
|
||||
}
|
||||
event.Requester, err = identity.GetRequester(ctx)
|
||||
if err != nil {
|
||||
return nil, apierrors.NewBadRequest("unable to get user")
|
||||
}
|
||||
marker := &DeletedMarker{}
|
||||
err = json.Unmarshal(latest.Value, marker)
|
||||
if err != nil {
|
||||
return nil, apierrors.NewBadRequest(
|
||||
fmt.Sprintf("unable to read previous object, %v", err))
|
||||
}
|
||||
event.Object, err = utils.MetaAccessor(marker)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
event.Object.SetDeletionTimestamp(&now)
|
||||
event.Object.SetUpdatedTimestamp(&now.Time)
|
||||
event.Object.SetManagedFields(nil)
|
||||
event.Object.SetFinalizers(nil)
|
||||
event.Object.SetUpdatedBy(event.Requester.GetUID().String())
|
||||
marker.TypeMeta = metav1.TypeMeta{
|
||||
Kind: "DeletedMarker",
|
||||
APIVersion: "storage.grafana.app/v0alpha1", // ?? or can we stick this in common?
|
||||
}
|
||||
marker.Annotations["RestoreResourceVersion"] = fmt.Sprintf("%d", event.PreviousRV)
|
||||
event.Value, err = json.Marshal(marker)
|
||||
if err != nil {
|
||||
return nil, apierrors.NewBadRequest(
|
||||
fmt.Sprintf("unable creating deletion marker, %v", err))
|
||||
}
|
||||
|
||||
rsp.ResourceVersion, err = s.store.WriteEvent(ctx, event)
|
||||
rsp.Status, err = errToStatus(err)
|
||||
return rsp, err
|
||||
}
|
||||
|
||||
func (s *server) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, error) {
|
||||
if err := s.Init(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rsp, err := s.store.Read(ctx, req)
|
||||
if err != nil {
|
||||
if rsp == nil {
|
||||
rsp = &ReadResponse{}
|
||||
}
|
||||
rsp.Status, err = errToStatus(err)
|
||||
}
|
||||
return rsp, err
|
||||
}
|
||||
|
||||
func (s *server) List(ctx context.Context, req *ListRequest) (*ListResponse, error) {
|
||||
if err := s.Init(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rsp, err := s.store.List(ctx, req)
|
||||
// Status???
|
||||
return rsp, err
|
||||
}
|
||||
|
||||
func (s *server) Watch(req *WatchRequest, srv ResourceStore_WatchServer) error {
|
||||
if err := s.Init(); err != nil {
|
||||
return err
|
||||
}
|
||||
return s.Watch(req, srv)
|
||||
}
|
116
pkg/storage/unified/resource/server_test.go
Normal file
116
pkg/storage/unified/resource/server_test.go
Normal file
@ -0,0 +1,116 @@
|
||||
package resource
|
||||
|
||||
import (
|
||||
"context"
|
||||
"embed"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hack-pad/hackpadfs"
|
||||
hackos "github.com/hack-pad/hackpadfs/os"
|
||||
"github.com/stretchr/testify/require"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/identity"
|
||||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||
)
|
||||
|
||||
func TestWriter(t *testing.T) {
|
||||
testUserA := &identity.StaticRequester{
|
||||
Namespace: identity.NamespaceUser,
|
||||
UserID: 123,
|
||||
UserUID: "u123",
|
||||
OrgRole: identity.RoleAdmin,
|
||||
IsGrafanaAdmin: true, // can do anything
|
||||
}
|
||||
ctx := identity.WithRequester(context.Background(), testUserA)
|
||||
|
||||
var root hackpadfs.FS
|
||||
if false {
|
||||
tmp, err := os.MkdirTemp("", "xxx-*")
|
||||
require.NoError(t, err)
|
||||
|
||||
root, err = hackos.NewFS().Sub(tmp[1:])
|
||||
require.NoError(t, err)
|
||||
fmt.Printf("ROOT: %s\n\n", tmp)
|
||||
}
|
||||
tmp, err := NewFileSystemStore(FileSystemOptions{
|
||||
Root: root,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
server, err := NewResourceServer(ResourceServerOptions{
|
||||
Store: tmp,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Run("playlist happy CRUD paths", func(t *testing.T) {
|
||||
raw := testdata(t, "01_create_playlist.json")
|
||||
key := &ResourceKey{
|
||||
Group: "playlist.grafana.app",
|
||||
Resource: "rrrr", // can be anything :(
|
||||
Namespace: "default",
|
||||
Name: "fdgsv37qslr0ga",
|
||||
}
|
||||
created, err := server.Create(ctx, &CreateRequest{
|
||||
Value: raw,
|
||||
Key: key,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.True(t, created.ResourceVersion > 0)
|
||||
|
||||
// The key does not include resource version
|
||||
found, err := server.Read(ctx, &ReadRequest{Key: key})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, created.ResourceVersion, found.ResourceVersion)
|
||||
|
||||
// Now update the value
|
||||
tmp := &unstructured.Unstructured{}
|
||||
err = json.Unmarshal(raw, tmp)
|
||||
require.NoError(t, err)
|
||||
|
||||
now := time.Now().UnixMilli()
|
||||
obj, err := utils.MetaAccessor(tmp)
|
||||
require.NoError(t, err)
|
||||
obj.SetAnnotation("test", "hello")
|
||||
obj.SetUpdatedTimestampMillis(now)
|
||||
obj.SetUpdatedBy(testUserA.GetUID().String())
|
||||
raw, err = json.Marshal(tmp)
|
||||
require.NoError(t, err)
|
||||
|
||||
key.ResourceVersion = created.ResourceVersion
|
||||
updated, err := server.Update(ctx, &UpdateRequest{Key: key, Value: raw})
|
||||
require.NoError(t, err)
|
||||
require.True(t, updated.ResourceVersion > created.ResourceVersion)
|
||||
|
||||
// We should still get the latest
|
||||
key.ResourceVersion = 0
|
||||
found, err = server.Read(ctx, &ReadRequest{Key: key})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, updated.ResourceVersion, found.ResourceVersion)
|
||||
|
||||
key.ResourceVersion = updated.ResourceVersion
|
||||
deleted, err := server.Delete(ctx, &DeleteRequest{Key: key})
|
||||
require.NoError(t, err)
|
||||
require.True(t, deleted.ResourceVersion > updated.ResourceVersion)
|
||||
|
||||
// We should get not found when trying to read the latest value
|
||||
key.ResourceVersion = 0
|
||||
found, err = server.Read(ctx, &ReadRequest{Key: key})
|
||||
require.Error(t, err)
|
||||
require.Nil(t, found)
|
||||
})
|
||||
}
|
||||
|
||||
//go:embed testdata/*
|
||||
var testdataFS embed.FS
|
||||
|
||||
func testdata(t *testing.T, filename string) []byte {
|
||||
t.Helper()
|
||||
b, err := testdataFS.ReadFile(`testdata/` + filename)
|
||||
require.NoError(t, err)
|
||||
return b
|
||||
}
|
Loading…
Reference in New Issue
Block a user