Storage: Add cdk blob support to ResourceStore (#89408)

This commit is contained in:
Ryan McKinley 2024-06-19 22:01:10 +03:00 committed by GitHub
parent 3e85f87db6
commit 040f392018
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 2115 additions and 1601 deletions

View File

@ -376,8 +376,8 @@ devenv-mysql:
.PHONY: protobuf
protobuf: ## Compile protobuf definitions
bash scripts/protobuf-check.sh
go install google.golang.org/protobuf/cmd/protoc-gen-go
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
# go install google.golang.org/protobuf/cmd/protoc-gen-go
# go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
# buf generate pkg/plugins/backendplugin/pluginextensionv2 --template pkg/plugins/backendplugin/pluginextensionv2/buf.gen.yaml
# buf generate pkg/plugins/backendplugin/secretsmanagerplugin --template pkg/plugins/backendplugin/secretsmanagerplugin/buf.gen.yaml
# buf generate pkg/services/store/entity --template pkg/services/store/entity/buf.gen.yaml

View File

@ -1,9 +1,12 @@
package utils
import (
"bytes"
"fmt"
"mime"
"reflect"
"strconv"
"strings"
"time"
"k8s.io/apimachinery/pkg/api/meta"
@ -20,6 +23,7 @@ const AnnoKeyUpdatedTimestamp = "grafana.app/updatedTimestamp"
const AnnoKeyUpdatedBy = "grafana.app/updatedBy"
const AnnoKeyFolder = "grafana.app/folder"
const AnnoKeySlug = "grafana.app/slug"
const AnnoKeyBlob = "grafana.app/blob"
const AnnoKeyMessage = "grafana.app/message"
// Identify where values came from
@ -49,6 +53,85 @@ type ResourceOriginInfo struct {
_ any `json:"-"`
}
type BlobInfo struct {
UID string `json:"uid"`
Size int64 `json:"size,omitempty"`
Hash string `json:"hash,omitempty"`
MimeType string `json:"mime,omitempty"`
Charset string `json:"charset,omitempty"` // content type = mime+charset
}
// Content type is mime + charset
func (b *BlobInfo) SetContentType(v string) {
var params map[string]string
var err error
b.Charset = ""
b.MimeType, params, err = mime.ParseMediaType(v)
if err != nil {
return
}
b.Charset = params["charset"]
}
// Content type is mime + charset
func (b *BlobInfo) ContentType() string {
sb := bytes.NewBufferString(b.MimeType)
if b.Charset != "" {
sb.WriteString("; charset=")
sb.WriteString(b.Charset)
}
return sb.String()
}
func (b *BlobInfo) String() string {
sb := bytes.NewBufferString(b.UID)
if b.Size > 0 {
sb.WriteString(fmt.Sprintf("; size=%d", b.Size))
}
if b.Hash != "" {
sb.WriteString("; hash=")
sb.WriteString(b.Hash)
}
if b.MimeType != "" {
sb.WriteString("; mime=")
sb.WriteString(b.MimeType)
}
if b.Charset != "" {
sb.WriteString("; charset=")
sb.WriteString(b.Charset)
}
return sb.String()
}
func ParseBlobInfo(v string) *BlobInfo {
if v == "" {
return nil
}
info := &BlobInfo{}
for i, part := range strings.Split(v, ";") {
if i == 0 {
info.UID = part
continue
}
kv := strings.Split(strings.TrimSpace(part), "=")
if len(kv) == 2 {
val := kv[1]
switch kv[0] {
case "size":
info.Size, _ = strconv.ParseInt(val, 10, 64)
case "hash":
info.Hash = val
case "mime":
info.MimeType = val
case "charset":
info.Charset = val
}
}
}
return info
}
// Accessor functions for k8s objects
type GrafanaMetaAccessor interface {
metav1.Object
@ -76,6 +159,9 @@ type GrafanaMetaAccessor interface {
GetSlug() string
SetSlug(v string)
SetBlob(v *BlobInfo)
GetBlob() *BlobInfo
GetOriginInfo() (*ResourceOriginInfo, error)
SetOriginInfo(info *ResourceOriginInfo)
GetOriginName() string
@ -195,6 +281,17 @@ func (m *grafanaMetaAccessor) SetUpdatedBy(user string) {
m.SetAnnotation(AnnoKeyUpdatedBy, user)
}
func (m *grafanaMetaAccessor) GetBlob() *BlobInfo {
return ParseBlobInfo(m.get(AnnoKeyBlob))
}
func (m *grafanaMetaAccessor) SetBlob(info *BlobInfo) {
if info == nil {
m.SetAnnotation(AnnoKeyBlob, "") // delete
}
m.SetAnnotation(AnnoKeyBlob, info.String())
}
func (m *grafanaMetaAccessor) GetFolder() string {
return m.get(AnnoKeyFolder)
}

View File

@ -172,6 +172,14 @@ func TestMetaAccessor(t *testing.T) {
require.Equal(t, int64(12345), rv)
})
t.Run("blob info", func(t *testing.T) {
info := &utils.BlobInfo{UID: "AAA", Size: 123, Hash: "xyz", MimeType: "application/json", Charset: "utf-8"}
anno := info.String()
require.Equal(t, "AAA; size=123; hash=xyz; mime=application/json; charset=utf-8", anno)
copy := utils.ParseBlobInfo(anno)
require.Equal(t, info, copy)
})
t.Run("find titles", func(t *testing.T) {
// with a k8s object that has Spec.Title
obj := &TestResource{

View File

@ -25,6 +25,7 @@ import (
"github.com/grafana/grafana/pkg/services/store/entity"
"github.com/grafana/grafana/pkg/services/store/entity/db"
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate"
"github.com/grafana/grafana/pkg/storage/unified/resource"
)
const entityTable = "entity"
@ -75,7 +76,7 @@ type sqlEntityServer struct {
db db.EntityDBInterface // needed to keep xorm engine in scope
sess *session.SessionDB
dialect migrator.Dialect
broadcaster Broadcaster[*entity.EntityWatchResponse]
broadcaster resource.Broadcaster[*entity.EntityWatchResponse]
ctx context.Context // TODO: remove
cancel context.CancelFunc
tracer trace.Tracer
@ -141,7 +142,7 @@ func (s *sqlEntityServer) init() error {
s.dialect = migrator.NewDialect(engine.DriverName())
// set up the broadcaster
s.broadcaster, err = NewBroadcaster(s.ctx, func(stream chan<- *entity.EntityWatchResponse) error {
s.broadcaster, err = resource.NewBroadcaster(s.ctx, func(stream chan<- *entity.EntityWatchResponse) error {
// start the poller
go s.poller(stream)

View File

@ -5,8 +5,7 @@ import (
"fmt"
"os"
"github.com/hack-pad/hackpadfs"
hackos "github.com/hack-pad/hackpadfs/os"
"gocloud.dev/blob/fileblob"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/tracing"
@ -16,32 +15,35 @@ import (
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/util"
)
// Creates a ResourceServer using the existing entity tables
// NOTE: most of the field values are ignored
func ProvideResourceServer(db db.DB, cfg *setting.Cfg, features featuremgmt.FeatureToggles, tracer tracing.Tracer) (resource.ResourceServer, error) {
if true {
var root hackpadfs.FS
if false {
tmp, err := os.MkdirTemp("", "xxx-*")
if err != nil {
return nil, err
}
tmp, err := os.MkdirTemp("", "xxx-*")
if err != nil {
return nil, err
}
root, err = hackos.NewFS().Sub(tmp[1:])
if err != nil {
return nil, err
}
bucket, err := fileblob.OpenBucket(tmp, &fileblob.Options{
CreateDir: true,
Metadata: fileblob.MetadataDontWrite, // skip
})
if err != nil {
return nil, err
}
fmt.Printf("ROOT: %s\n", tmp)
fmt.Printf("ROOT: %s\n\n", tmp)
store, err := resource.NewCDKAppendingStore(context.Background(), resource.CDKAppenderOptions{
Bucket: bucket,
})
if err != nil {
return nil, err
}
return resource.NewResourceServer(resource.ResourceServerOptions{
Store: resource.NewFileSystemStore(resource.FileSystemOptions{
Root: root,
}),
Store: store,
})
}
@ -96,7 +98,7 @@ func (b *entityBridge) WriteEvent(ctx context.Context, event resource.WriteEvent
key := toEntityKey(event.Key)
// Delete does not need to create an entity first
if event.Event == resource.WatchEvent_DELETED {
if event.Type == resource.WatchEvent_DELETED {
rsp, err := b.entity.Delete(ctx, &entity.DeleteEntityRequest{
Key: key,
PreviousVersion: event.PreviousRV,
@ -117,15 +119,14 @@ func (b *entityBridge) WriteEvent(ctx context.Context, event resource.WriteEvent
Guid: string(event.Object.GetUID()),
// Key: fmt.Sprint("%s/%s/%s/%s", ),
Folder: obj.GetFolder(),
Body: event.Value,
Message: event.Message,
Folder: obj.GetFolder(),
Body: event.Value,
Labels: obj.GetLabels(),
Size: int64(len(event.Value)),
}
switch event.Event {
switch event.Type {
case resource.WatchEvent_ADDED:
msg.Action = entity.Entity_CREATED
rsp, err := b.entity.Create(ctx, &entity.CreateEntityRequest{Entity: msg})
@ -148,15 +149,10 @@ func (b *entityBridge) WriteEvent(ctx context.Context, event resource.WriteEvent
default:
}
return 0, fmt.Errorf("unsupported operation: %s", event.Event.String())
return 0, fmt.Errorf("unsupported operation: %s", event.Type.String())
}
// Create new name for a given resource
func (b *entityBridge) GenerateName(_ context.Context, _ *resource.ResourceKey, _ string) (string, error) {
return util.GenerateShortUID(), nil
}
func (b *entityBridge) Watch(_ context.Context, _ *resource.WatchRequest) (chan *resource.WatchEvent, error) {
func (b *entityBridge) WatchWriteEvents(ctx context.Context) (<-chan *resource.WrittenEvent, error) {
return nil, resource.ErrNotImplementedYet
}

View File

@ -1,4 +1,4 @@
package sqlstash
package resource
import (
"context"

View File

@ -1,4 +1,4 @@
package sqlstash
package resource
import (
"context"

View File

@ -0,0 +1,315 @@
package resource
import (
"bytes"
context "context"
"fmt"
"io"
"math/rand"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/bwmarrin/snowflake"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"gocloud.dev/blob"
_ "gocloud.dev/blob/fileblob"
_ "gocloud.dev/blob/memblob"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
)
type CDKAppenderOptions struct {
Tracer trace.Tracer
Bucket *blob.Bucket
RootFolder string
// When running in a cluster, each node should have a different ID
// This is used for snowflake generation and log identification
NodeID int64
// Get the next ResourceVersion. When not set, this will default to snowflake IDs
NextResourceVersion func() int64
}
func NewCDKAppendingStore(ctx context.Context, opts CDKAppenderOptions) (AppendingStore, error) {
if opts.Tracer == nil {
opts.Tracer = noop.NewTracerProvider().Tracer("cdk-appending-store")
}
if opts.Bucket == nil {
return nil, fmt.Errorf("missing bucket")
}
found, _, err := opts.Bucket.ListPage(ctx, blob.FirstPageToken, 1, &blob.ListOptions{
Prefix: opts.RootFolder,
Delimiter: "/",
})
if err != nil {
return nil, err
}
if found == nil {
return nil, fmt.Errorf("the root folder does not exist")
}
// This is not totally safe when running in HA
if opts.NextResourceVersion == nil {
if opts.NodeID == 0 {
opts.NodeID = rand.Int63n(1024)
}
eventNode, err := snowflake.NewNode(opts.NodeID)
if err != nil {
return nil, apierrors.NewInternalError(
fmt.Errorf("error initializing snowflake id generator :: %w", err))
}
opts.NextResourceVersion = func() int64 {
return eventNode.Generate().Int64()
}
}
return &cdkAppender{
tracer: opts.Tracer,
bucket: opts.Bucket,
root: opts.RootFolder,
nextRV: opts.NextResourceVersion,
}, nil
}
type cdkAppender struct {
tracer trace.Tracer
bucket *blob.Bucket
root string
nextRV func() int64
mutex sync.Mutex
// Typically one... the server wrapper
subscribers []chan *WrittenEvent
}
func (s *cdkAppender) getPath(key *ResourceKey, rv int64) string {
var buffer bytes.Buffer
buffer.WriteString(s.root)
if key.Group == "" {
return buffer.String()
}
buffer.WriteString(key.Group)
if key.Resource == "" {
return buffer.String()
}
buffer.WriteString("/")
buffer.WriteString(key.Resource)
if key.Namespace == "" {
if key.Name == "" {
return buffer.String()
}
buffer.WriteString("/__cluster__")
} else {
buffer.WriteString("/")
buffer.WriteString(key.Namespace)
}
if key.Name == "" {
return buffer.String()
}
buffer.WriteString("/")
buffer.WriteString(key.Name)
if rv > 0 {
buffer.WriteString(fmt.Sprintf("/%d.json", rv))
}
return buffer.String()
}
func (s *cdkAppender) WriteEvent(ctx context.Context, event WriteEvent) (rv int64, err error) {
// Scope the lock
{
s.mutex.Lock()
defer s.mutex.Unlock()
rv = s.nextRV()
err = s.bucket.WriteAll(ctx, s.getPath(event.Key, rv), event.Value, &blob.WriterOptions{
ContentType: "application/json",
})
}
// Async notify all subscribers
if s.subscribers != nil {
go func() {
write := &WrittenEvent{
WriteEvent: event,
Timestamp: time.Now().UnixMilli(),
ResourceVersion: rv,
}
for _, sub := range s.subscribers {
sub <- write
}
}()
}
return rv, err
}
// Read implements ResourceStoreServer.
func (s *cdkAppender) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, error) {
rv := req.ResourceVersion
path := s.getPath(req.Key, req.ResourceVersion)
if rv < 1 {
iter := s.bucket.List(&blob.ListOptions{Prefix: path + "/", Delimiter: "/"})
for {
obj, err := iter.Next(ctx)
if err == io.EOF {
break
}
if strings.HasSuffix(obj.Key, ".json") {
idx := strings.LastIndex(obj.Key, "/") + 1
edx := strings.LastIndex(obj.Key, ".")
if idx > 0 {
v, err := strconv.ParseInt(obj.Key[idx:edx], 10, 64)
if err == nil && v > rv {
rv = v
path = obj.Key // find the path with biggest resource version
}
}
}
}
}
raw, err := s.bucket.ReadAll(ctx, path)
if err == nil && bytes.Contains(raw, []byte(`"DeletedMarker"`)) {
tmp := &unstructured.Unstructured{}
err = tmp.UnmarshalJSON(raw)
if err == nil && tmp.GetKind() == "DeletedMarker" {
return nil, apierrors.NewNotFound(schema.GroupResource{
Group: req.Key.Group,
Resource: req.Key.Resource,
}, req.Key.Name)
}
}
return &ReadResponse{
ResourceVersion: rv,
Value: raw,
}, err
}
// List implements AppendingStore.
func (s *cdkAppender) List(ctx context.Context, req *ListRequest) (*ListResponse, error) {
resources, err := buildTree(ctx, s, req.Options.Key)
if err != nil {
return nil, err
}
rsp := &ListResponse{}
for _, item := range resources {
latest := item.versions[0]
raw, err := s.bucket.ReadAll(ctx, latest.key)
if err != nil {
return nil, err
}
rsp.Items = append(rsp.Items, &ResourceWrapper{
ResourceVersion: latest.rv,
Value: raw,
})
}
return rsp, nil
}
// Watch implements AppendingStore.
func (s *cdkAppender) WatchWriteEvents(ctx context.Context) (<-chan *WrittenEvent, error) {
stream := make(chan *WrittenEvent, 10)
{
s.mutex.Lock()
defer s.mutex.Unlock()
// Add the event stream
s.subscribers = append(s.subscribers, stream)
}
// Wait for context done
go func() {
// Wait till the context is done
<-ctx.Done()
// Then remove the subscription
s.mutex.Lock()
defer s.mutex.Unlock()
// Copy all streams without our listener
subs := []chan *WrittenEvent{}
for _, sub := range s.subscribers {
if sub != stream {
subs = append(subs, sub)
}
}
s.subscribers = subs
}()
return stream, nil
}
// group > resource > namespace > name > versions
type cdkResource struct {
prefix string
versions []cdkVersion
}
type cdkVersion struct {
rv int64
key string
}
func buildTree(ctx context.Context, s *cdkAppender, key *ResourceKey) ([]cdkResource, error) {
byPrefix := make(map[string]*cdkResource)
path := s.getPath(key, 0)
iter := s.bucket.List(&blob.ListOptions{Prefix: path, Delimiter: ""}) // "" is recursive
for {
obj, err := iter.Next(ctx)
if err == io.EOF {
break
}
if strings.HasSuffix(obj.Key, ".json") {
idx := strings.LastIndex(obj.Key, "/") + 1
edx := strings.LastIndex(obj.Key, ".")
if idx > 0 {
rv, err := strconv.ParseInt(obj.Key[idx:edx], 10, 64)
if err == nil {
prefix := obj.Key[:idx]
res, ok := byPrefix[prefix]
if !ok {
res = &cdkResource{prefix: prefix}
byPrefix[prefix] = res
}
res.versions = append(res.versions, cdkVersion{
rv: rv,
key: obj.Key,
})
}
}
}
}
// Now sort all versions
resources := make([]cdkResource, 0, len(byPrefix))
for _, res := range byPrefix {
sort.Slice(res.versions, func(i, j int) bool {
return res.versions[i].rv > res.versions[j].rv
})
resources = append(resources, *res)
}
sort.Slice(resources, func(i, j int) bool {
a := resources[i].versions[0].rv
b := resources[j].versions[0].rv
return a > b
})
return resources, nil
}

View File

@ -0,0 +1,176 @@
package resource
import (
"bytes"
context "context"
"crypto/md5"
"encoding/hex"
"fmt"
"mime"
"time"
"github.com/google/uuid"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"gocloud.dev/blob"
_ "gocloud.dev/blob/fileblob"
_ "gocloud.dev/blob/memblob"
"github.com/grafana/grafana/pkg/apimachinery/utils"
)
type CDKBlobStoreOptions struct {
Tracer trace.Tracer
Bucket *blob.Bucket
RootFolder string
URLExpiration time.Duration
}
func NewCDKBlobStore(ctx context.Context, opts CDKBlobStoreOptions) (BlobStore, error) {
if opts.Tracer == nil {
opts.Tracer = noop.NewTracerProvider().Tracer("cdk-blob-store")
}
if opts.Bucket == nil {
return nil, fmt.Errorf("missing bucket")
}
if opts.URLExpiration < 1 {
opts.URLExpiration = time.Minute * 10 // 10 min default
}
found, _, err := opts.Bucket.ListPage(ctx, blob.FirstPageToken, 1, &blob.ListOptions{
Prefix: opts.RootFolder,
Delimiter: "/",
})
if err != nil {
return nil, err
}
if found == nil {
return nil, fmt.Errorf("the root folder does not exist")
}
return &cdkBlobStore{
tracer: opts.Tracer,
bucket: opts.Bucket,
root: opts.RootFolder,
cansignurls: false, // TODO depends on the implementation
expiration: opts.URLExpiration,
}, nil
}
type cdkBlobStore struct {
tracer trace.Tracer
bucket *blob.Bucket
root string
cansignurls bool
expiration time.Duration
}
func (s *cdkBlobStore) getBlobPath(key *ResourceKey, info *utils.BlobInfo) (string, error) {
var buffer bytes.Buffer
buffer.WriteString(s.root)
if key.Namespace == "" {
buffer.WriteString("__cluster__/")
} else {
buffer.WriteString(key.Namespace)
buffer.WriteString("/")
}
if key.Group == "" {
return "", fmt.Errorf("missing group")
}
buffer.WriteString(key.Group)
buffer.WriteString("/")
if key.Resource == "" {
return "", fmt.Errorf("missing resource")
}
buffer.WriteString(key.Resource)
buffer.WriteString("/")
if key.Name == "" {
return "", fmt.Errorf("missing name")
}
buffer.WriteString(key.Name)
buffer.WriteString("/")
buffer.WriteString(info.UID)
ext, err := mime.ExtensionsByType(info.MimeType)
if err != nil {
return "", err
}
if len(ext) > 0 {
buffer.WriteString(ext[0])
}
return buffer.String(), nil
}
func (s *cdkBlobStore) SupportsSignedURLs() bool {
return s.cansignurls
}
func (s *cdkBlobStore) PutBlob(ctx context.Context, req *PutBlobRequest) (*PutBlobResponse, error) {
info := &utils.BlobInfo{
UID: uuid.New().String(),
}
info.SetContentType(req.ContentType)
path, err := s.getBlobPath(req.Resource, info)
if err != nil {
return nil, err
}
rsp := &PutBlobResponse{Uid: info.UID, MimeType: info.MimeType, Charset: info.Charset}
if req.Method == PutBlobRequest_HTTP {
rsp.Url, err = s.bucket.SignedURL(ctx, path, &blob.SignedURLOptions{
Method: "PUT",
Expiry: s.expiration,
ContentType: req.ContentType,
})
return rsp, err
}
if len(req.Value) < 1 {
return nil, fmt.Errorf("missing content value")
}
// Write the value
err = s.bucket.WriteAll(ctx, path, req.Value, &blob.WriterOptions{
ContentType: req.ContentType,
})
if err != nil {
return nil, err
}
attrs, err := s.bucket.Attributes(ctx, path)
if err != nil {
return nil, err
}
rsp.Size = attrs.Size
// Set the MD5 hash if missing
if len(attrs.MD5) == 0 {
h := md5.New()
_, _ = h.Write(req.Value)
attrs.MD5 = h.Sum(nil)
}
rsp.Hash = hex.EncodeToString(attrs.MD5[:])
return rsp, err
}
func (s *cdkBlobStore) GetBlob(ctx context.Context, resource *ResourceKey, info *utils.BlobInfo, mustProxy bool) (*GetBlobResponse, error) {
path, err := s.getBlobPath(resource, info)
if err != nil {
return nil, err
}
rsp := &GetBlobResponse{ContentType: info.ContentType()}
if mustProxy || !s.cansignurls {
rsp.Value, err = s.bucket.ReadAll(ctx, path)
return rsp, err
}
rsp.Url, err = s.bucket.SignedURL(ctx, path, &blob.SignedURLOptions{
Method: "GET",
Expiry: s.expiration,
ContentType: rsp.ContentType,
})
return rsp, err
}

View File

@ -0,0 +1,67 @@
package resource
import (
"context"
"fmt"
"os"
"testing"
"github.com/stretchr/testify/require"
"gocloud.dev/blob/fileblob"
"gocloud.dev/blob/memblob"
"github.com/grafana/grafana/pkg/apimachinery/utils"
)
func TestCDKBlobStore(t *testing.T) {
bucket := memblob.OpenBucket(nil)
if false {
tmp, err := os.MkdirTemp("", "xxx-*")
require.NoError(t, err)
bucket, err = fileblob.OpenBucket(tmp, &fileblob.Options{
CreateDir: true,
Metadata: fileblob.MetadataDontWrite, // skip
})
require.NoError(t, err)
fmt.Printf("ROOT: %s\n\n", tmp)
}
ctx := context.Background()
store, err := NewCDKBlobStore(ctx, CDKBlobStoreOptions{
Bucket: bucket,
//RootFolder: "xyz",
})
require.NoError(t, err)
t.Run("can write then read a blob", func(t *testing.T) {
raw := testdata(t, "01_create_playlist.json")
key := &ResourceKey{
Group: "playlist.grafana.app",
Resource: "rrrr", // can be anything
Namespace: "default",
Name: "fdgsv37qslr0ga",
}
rsp, err := store.PutBlob(ctx, &PutBlobRequest{
Resource: key,
Method: PutBlobRequest_GRPC,
ContentType: "application/json",
Value: raw,
})
require.NoError(t, err)
require.Equal(t, "4933beea0c6d6dfd73150451098c70f0", rsp.Hash)
found, err := store.GetBlob(ctx, key, &utils.BlobInfo{
UID: rsp.Uid,
Size: rsp.Size,
Hash: rsp.Hash,
MimeType: rsp.MimeType,
Charset: rsp.Charset,
}, false)
require.NoError(t, err)
require.Equal(t, raw, found.Value)
require.Equal(t, "application/json", found.ContentType)
})
}

View File

@ -11,16 +11,26 @@ import (
type WriteEvent struct {
EventID int64
Event WatchEvent_Type // ADDED, MODIFIED, DELETED
Type WatchEvent_Type // ADDED, MODIFIED, DELETED
Key *ResourceKey // the request key
PreviousRV int64 // only for Update+Delete
Message string // commit message
// Access to raw metadata
Object utils.GrafanaMetaAccessor
// The json payload (without resourceVersion)
Value []byte
// Access real fields
Object utils.GrafanaMetaAccessor
}
// WriteEvents after they include a resource version
type WrittenEvent struct {
WriteEvent
// The resource version
ResourceVersion int64
// Timestamp when the event is created
Timestamp int64
}
// A function to write events
@ -29,7 +39,7 @@ type EventAppender = func(context.Context, *WriteEvent) (int64, error)
type writeEventBuilder struct {
EventID int64
Key *ResourceKey // the request key
Event WatchEvent_Type
Type WatchEvent_Type
Requester identity.Requester
Object *unstructured.Unstructured
@ -53,9 +63,9 @@ func newEventFromBytes(value, oldValue []byte) (*writeEventBuilder, error) {
}
if oldValue == nil {
builder.Event = WatchEvent_ADDED
builder.Type = WatchEvent_ADDED
} else {
builder.Event = WatchEvent_MODIFIED
builder.Type = WatchEvent_MODIFIED
temp := &unstructured.Unstructured{}
err = temp.UnmarshalJSON(oldValue)
@ -73,7 +83,7 @@ func newEventFromBytes(value, oldValue []byte) (*writeEventBuilder, error) {
func (b *writeEventBuilder) toEvent() (event WriteEvent, err error) {
event.EventID = b.EventID
event.Key = b.Key
event.Event = b.Event
event.Type = b.Type
event.Object = b.Meta
event.Value, err = b.Object.MarshalJSON()
return // includes the named values

View File

@ -1,159 +0,0 @@
package resource
import (
"context"
"encoding/json"
"fmt"
"path/filepath"
"sort"
"strconv"
"strings"
"github.com/google/uuid"
"github.com/hack-pad/hackpadfs"
"github.com/hack-pad/hackpadfs/mem"
"go.opentelemetry.io/otel/trace"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"github.com/grafana/grafana/pkg/apimachinery/utils"
)
type FileSystemOptions struct {
// OTel tracer
Tracer trace.Tracer
// Root file system -- null will be in memory
Root hackpadfs.FS
}
func NewFileSystemStore(opts FileSystemOptions) AppendingStore {
root := opts.Root
if root == nil {
root, _ = mem.NewFS()
}
return &fsStore{
root: root,
keys: &simpleConverter{}, // not tenant isolated
}
}
type fsStore struct {
root hackpadfs.FS
keys KeyConversions
}
// The only write command
func (f *fsStore) WriteEvent(ctx context.Context, event WriteEvent) (int64, error) {
// For this case, we will treat them the same
dir, err := f.keys.KeyToPath(event.Key, 0)
if err != nil {
return 0, err
}
err = hackpadfs.MkdirAll(f.root, dir, 0750)
if err != nil {
return 0, err
}
fpath := filepath.Join(dir, fmt.Sprintf("%d.json", event.EventID))
file, err := hackpadfs.OpenFile(f.root, fpath, hackpadfs.FlagWriteOnly|hackpadfs.FlagCreate, 0750)
if err != nil {
return 0, err
}
_, err = hackpadfs.WriteFile(file, event.Value)
return event.EventID, err
}
// Create new name for a given resource
func (f *fsStore) GenerateName(ctx context.Context, key *ResourceKey, prefix string) (string, error) {
// TODO... shorter and make sure it does not exist
return prefix + "x" + uuid.New().String(), nil
}
// Read implements ResourceStoreServer.
func (f *fsStore) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, error) {
rv := req.ResourceVersion
fname := "--x--"
dir, err := f.keys.KeyToPath(req.Key, 0)
if err != nil {
return nil, err
}
if rv > 0 {
fname = fmt.Sprintf("%d.json", rv)
} else {
files, err := hackpadfs.ReadDir(f.root, dir)
if err != nil {
return nil, err
}
// Sort by name
sort.Slice(files, func(i, j int) bool {
a := files[i].Name()
b := files[j].Name()
return a > b // ?? should we parse the numbers ???
})
// The first matching file
for _, v := range files {
fname = v.Name()
if strings.HasSuffix(fname, ".json") {
// The RV is encoded in the path
rv, err = strconv.ParseInt(strings.TrimSuffix(fname, ".json"), 10, 64)
if err != nil {
return nil, err
}
break
}
}
}
raw, err := hackpadfs.ReadFile(f.root, filepath.Join(dir, fname))
if err != nil {
return nil, err
}
if req.ResourceVersion < 1 {
// The operation is inside the body, so need to inspect :(
tmp := &metav1.PartialObjectMetadata{}
err = json.Unmarshal(raw, tmp)
if err != nil {
return nil, err
}
meta, err := utils.MetaAccessor(tmp)
if err != nil {
return nil, err
}
if meta.GetGroupVersionKind().Kind == "DeletedMarker" {
return nil, apierrors.NewNotFound(schema.GroupResource{
Group: req.Key.Group,
Resource: req.Key.Resource,
}, req.Key.Name)
}
}
return &ReadResponse{
ResourceVersion: rv,
Value: raw,
}, nil
}
// List implements AppendingStore.
func (f *fsStore) List(ctx context.Context, req *ListRequest) (*ListResponse, error) {
tree := eventTree{
group: req.Options.Key.Group,
resource: req.Options.Key.Resource,
}
_ = tree.read(f.root, req.Options.Key)
// if err != nil {
// return nil, err
// }
return tree.list(f, req.ResourceVersion)
}
// Watch implements AppendingStore.
func (f *fsStore) Watch(context.Context, *WatchRequest) (chan *WatchEvent, error) {
panic("unimplemented")
}

View File

@ -1,195 +0,0 @@
package resource
import (
"fmt"
"io/fs"
"sort"
"strconv"
"strings"
"github.com/hack-pad/hackpadfs"
)
// VERY VERY early, hacky file system reader
type eventTree struct {
path string
group string
resource string
namespaces []namespaceEvents
}
func (t *eventTree) list(fs *fsStore, rv int64) (*ListResponse, error) {
rsp := &ListResponse{}
for idx, ns := range t.namespaces {
if idx == 0 {
rsp.ResourceVersion = ns.version()
}
err := ns.append(fs, rv, rsp)
if err != nil {
return rsp, err
}
}
return rsp, nil
}
func (t *eventTree) read(root fs.FS, key *ResourceKey) error {
t.group = key.Group
t.resource = key.Resource
t.path = fmt.Sprintf("%s/%s", t.group, t.resource)
// Cluster scoped, with an explicit name
if key.Namespace == "" {
if key.Name != "" {
ns := namespaceEvents{
path: t.path + "/__cluster__",
namespace: "",
}
err := ns.read(root, key)
if err == nil {
t.namespaces = append(t.namespaces, ns)
}
return err
}
}
files, err := hackpadfs.ReadDir(root, t.path)
if err != nil {
return err
}
for _, file := range files {
ns := namespaceEvents{
path: t.path + "/" + file.Name(),
namespace: file.Name(),
}
err = ns.read(root, key)
if err != nil {
return err
}
t.namespaces = append(t.namespaces, ns)
}
return nil
}
type namespaceEvents struct {
path string
namespace string
names []nameEvents
}
func (t *namespaceEvents) version() int64 {
if len(t.names) > 0 {
return t.names[0].version()
}
return 0
}
func (t *namespaceEvents) append(fs *fsStore, rv int64, rsp *ListResponse) error {
for _, name := range t.names {
err := name.append(fs, rv, rsp)
if err != nil {
return err
}
}
return nil
}
func (t *namespaceEvents) read(root fs.FS, key *ResourceKey) error {
if key.Name != "" {
vv := nameEvents{
path: t.path + "/" + key.Name,
name: key.Name,
}
err := vv.read(root)
if err != nil {
return err
}
t.names = []nameEvents{vv}
}
files, err := hackpadfs.ReadDir(root, t.path)
if err != nil {
return err
}
for _, file := range files {
ns := nameEvents{
path: t.path + "/" + file.Name(),
name: file.Name(),
}
err = ns.read(root)
if err != nil {
return err
}
t.names = append(t.names, ns)
}
return nil
}
type nameEvents struct {
path string
name string
versions []resourceEvent
}
func (t *nameEvents) version() int64 {
if len(t.versions) > 0 {
return t.versions[0].rv
}
return 0
}
func (t *nameEvents) append(fs *fsStore, rv int64, rsp *ListResponse) error {
if rv > 0 {
fmt.Printf("TODO... check explicit rv")
}
for _, rev := range t.versions {
raw, err := hackpadfs.ReadFile(fs.root, t.path+"/"+rev.file)
if err != nil {
return err
}
wrapper := &ResourceWrapper{
ResourceVersion: rev.rv,
Value: raw,
// Operation: val.Operation,
}
rsp.Items = append(rsp.Items, wrapper)
if true {
return nil
}
}
return nil
}
func (t *nameEvents) read(root fs.FS) error {
var err error
files, err := hackpadfs.ReadDir(root, t.path)
if err != nil {
return err
}
for _, file := range files {
p := file.Name()
if file.IsDir() || !strings.HasSuffix(p, ".json") {
continue
}
base := strings.TrimSuffix(p, ".json")
base = strings.TrimPrefix(base, "rv")
rr := resourceEvent{file: p}
rr.rv, err = strconv.ParseInt(base, 10, 64)
if err != nil {
return err
}
t.versions = append(t.versions, rr)
}
sort.Slice(t.versions, func(i int, j int) bool {
return t.versions[i].rv > t.versions[j].rv
})
return err
}
type resourceEvent struct {
file string // path to the actual file
rv int64
}

View File

@ -5,27 +5,32 @@ go 1.21.10
require (
github.com/bwmarrin/snowflake v0.3.0
github.com/fullstorydev/grpchan v1.1.1
github.com/google/uuid v1.6.0
github.com/grafana/grafana/pkg/apimachinery v0.0.0-20240613114114-5e2f08de316d
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0
github.com/prometheus/client_golang v1.19.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/otel/trace v1.26.0
gocloud.dev v0.25.0
google.golang.org/grpc v1.64.0
google.golang.org/protobuf v1.34.1
k8s.io/apimachinery v0.29.3
github.com/hack-pad/hackpadfs v0.2.1
)
require (
cloud.google.com/go/compute/metadata v0.3.0 // indirect
cloud.google.com/go v0.112.1 // indirect
cloud.google.com/go/storage v1.38.0 // indirect
github.com/aws/aws-sdk-go v1.51.31 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bufbuild/protocompile v0.4.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/googleapis/gax-go/v2 v2.12.3 // indirect
github.com/jhump/protoreflect v1.15.1 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kr/text v0.2.0 // indirect
@ -35,11 +40,18 @@ require (
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.53.0 // indirect
github.com/prometheus/procfs v0.14.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.51.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0 // indirect
go.opentelemetry.io/otel v1.26.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/oauth2 v0.20.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/api v0.176.0 // indirect
google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
@ -48,4 +60,5 @@ require (
k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
)

View File

@ -1,19 +1,51 @@
cloud.google.com/go v0.112.1 h1:uJSeirPke5UNZHIb4SxfZklVSiWWVqW4oXlETwZziwM=
cloud.google.com/go/auth v0.2.2 h1:gmxNJs4YZYcw6YvKRtVBaF2fyUE6UrWPyzU8jHvYfmI=
cloud.google.com/go/auth/oauth2adapt v0.2.1 h1:VSPmMmUlT8CkIZ2PzD9AlLN+R3+D1clXMWHHa6vG/Ag=
cloud.google.com/go/compute v1.25.1 h1:ZRpHJedLtTpKgr3RV1Fx23NuaAEN1Zfx9hw1u4aJdjU=
cloud.google.com/go/compute/metadata v0.3.0 h1:Tz+eQXMEqDIKRsmY3cHTL6FVaynIjX2QxYC4trgAKZc=
cloud.google.com/go/iam v1.1.6 h1:bEa06k05IO4f4uJonbB5iAgKTPpABy1ayxaIZV/GHVc=
cloud.google.com/go/storage v1.38.0 h1:Az68ZRGlnNTpIBbLjSMIV2BDcwwXYlRlQzis0llkpJg=
github.com/aws/aws-sdk-go v1.51.31 h1:4TM+sNc+Dzs7wY1sJ0+J8i60c6rkgnKP1pvPx8ghsSY=
github.com/aws/aws-sdk-go-v2 v1.16.2 h1:fqlCk6Iy3bnCumtrLz9r3mJ/2gUT0pJ0wLFVIdWh+JA=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.1 h1:SdK4Ppk5IzLs64ZMvr6MrSficMtjY2oS0WOORXTlxwU=
github.com/aws/aws-sdk-go-v2/config v1.15.3 h1:5AlQD0jhVXlGzwo+VORKiUuogkG7pQcLJNzIzK7eodw=
github.com/aws/aws-sdk-go-v2/credentials v1.11.2 h1:RQQ5fzclAKJyY5TvF+fkjJEwzK4hnxQCLOu5JXzDmQo=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.3 h1:LWPg5zjHV9oz/myQr4wMs0gi4CjnDN/ILmyZUFYXZsU=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.3 h1:ir7iEq78s4txFGgwcLqD6q9IIPzTQNRJXulJd9h/zQo=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.9 h1:onz/VaaxZ7Z4V+WIN9Txly9XLTmoOh1oJ8XcAC3pako=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.3 h1:9stUQR/u2KXU6HkFJYlqnZEjBnbgrVbG6I5HN09xZh0=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.10 h1:by9P+oy3P/CwggN4ClnW2D4oL91QV7pBzBICi1chZvQ=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.1 h1:T4pFel53bkHjL2mMo+4DKE6r6AuoZnM0fg7k1/ratr4=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.3 h1:I0dcwWitE752hVSMrsLCxqNQ+UdEp3nACx2bYNMQq+k=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.3 h1:Gh1Gpyh01Yvn7ilO/b/hr01WgNpaszfbKMUgqM186xQ=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.3 h1:BKjwCJPnANbkwQ8vzSbaZDKawwagDubrH/z/c0X+kbQ=
github.com/aws/aws-sdk-go-v2/service/s3 v1.26.3 h1:rMPtwA7zzkSQZhhz9U3/SoIDz/NZ7Q+iRn4EIO8rSyU=
github.com/aws/aws-sdk-go-v2/service/sso v1.11.3 h1:frW4ikGcxfAEDfmQqWgMLp+F1n4nRo9sF39OcIb5BkQ=
github.com/aws/aws-sdk-go-v2/service/sts v1.16.3 h1:cJGRyzCSVwZC7zZZ1xbx9m32UnrKydRYhOvcD1NYP9Q=
github.com/aws/smithy-go v1.11.2 h1:eG/N+CcUMAvsdffgMvjMKwfyDzIkjM6pfxMJ8Mzc6mE=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/bufbuild/protocompile v0.4.0 h1:LbFKd2XowZvQ/kajzguUp2DC9UEIQhIq77fZZlaQsNA=
github.com/bwmarrin/snowflake v0.3.0 h1:xm67bEhkKh6ij1790JB83OujPR5CzNe8QuQqAgISZN0=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/fullstorydev/grpchan v1.1.1 h1:heQqIJlAv5Cnks9a70GRL2EJke6QQoUB25VGR6TZQas=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/wire v0.5.0 h1:I7ELFeVBr3yfPIcc8+MWvrjk+3VjbcSzoXm3JVa+jD8=
github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfFxPRy3Bf7vr3h0cechB90XaQs=
github.com/googleapis/gax-go/v2 v2.12.3 h1:5/zPPDvw8Q1SuXjrqrZslrqT7dL/uJT2CQii/cLCKqA=
github.com/grafana/grafana/pkg/apimachinery v0.0.0-20240613114114-5e2f08de316d h1:/UE5JdF+0hxll7EuuO7zRzAxXrvAxQo5M9eqOepc2mQ=
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 h1:pRhl55Yx1eC7BZ1N+BBWwnKaMyD8uC+34TLdndZMAKk=
github.com/hack-pad/hackpadfs v0.2.1 h1:FelFhIhv26gyjujoA/yeFO+6YGlqzmc9la/6iKMIxMw=
github.com/jhump/protoreflect v1.15.1 h1:HUMERORf3I3ZdX05WaQ6MIpd/NJ434hTp5YiKgfCL6c=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
@ -27,13 +59,24 @@ github.com/prometheus/procfs v0.14.0 h1:Lw4VdGGoKEZilJsayHf0B+9YgLGREba2C6xr+Fdf
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.51.0 h1:A3SayB3rNyt+1S6qpI9mHPkeHTZbD7XILEqWnYZb2l0=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0 h1:Xs2Ncz0gNihqu9iosIZ5SkBbWo5T8JhhLJFMQL1qmLI=
go.opentelemetry.io/otel v1.26.0 h1:LQwgL5s/1W7YiiRwxf03QGnWLb2HW4pLiAhaA5cZXBs=
go.opentelemetry.io/otel/metric v1.26.0 h1:7S39CLuY5Jgg9CrnA9HHiEjGMF/X2VHvoXGgSllRz30=
go.opentelemetry.io/otel/trace v1.26.0 h1:1ieeAUb4y0TE26jUFrCIXKpTuVK7uJGN9/Z/2LP5sQA=
gocloud.dev v0.25.0 h1:Y7vDq8xj7SyM848KXf32Krda2e6jQ4CLh/mTeCSqXtk=
golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI=
golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ=
golang.org/x/oauth2 v0.20.0 h1:4mQdhULixXKP1rwYBW0vAijoXnkTG0BLCDRzfe1idMo=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU=
google.golang.org/api v0.176.0 h1:dHj1/yv5Dm/eQTXiP9hNCRT3xzJHWXeNdRq29XbMxoE=
google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de h1:F6qOa9AZTYJXOUEr4jDysRDLrm4PHePlge4v4TGAlxY=
google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117 h1:+rdxYoE3E5htTEWIe15GlN6IfvbURM//Jt0mmkmm6ZU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117 h1:1GBuWVLM/KMVUv1t1En5Gs+gFZCNd360GGb4sSxtrhU=
google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY=
google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
@ -46,4 +89,4 @@ k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw=
k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSnlTLKgpAAttJvpI=
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo=
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4=
sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo=
sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E=

View File

@ -1,109 +1,17 @@
package resource
import (
"bytes"
"fmt"
"strconv"
"strings"
)
type KeyConversions interface {
KeyToPath(k *ResourceKey, rv int64) (string, error)
PathToKey(p string) (k *ResourceKey, rv int64, err error)
PathPrefix(k *ResourceKey) string
}
var _ KeyConversions = &simpleConverter{}
// group/resource/namespace/name
type simpleConverter struct{}
// KeyToPath implements KeyConversions.
func (s *simpleConverter) KeyToPath(x *ResourceKey, rv int64) (string, error) {
var buffer bytes.Buffer
if x.Group == "" {
return "", fmt.Errorf("missing group")
}
buffer.WriteString(x.Group)
buffer.WriteString("/")
if x.Resource == "" {
return "", fmt.Errorf("missing resource")
}
buffer.WriteString(x.Resource)
buffer.WriteString("/")
if x.Namespace == "" {
buffer.WriteString("__cluster__")
} else {
buffer.WriteString(x.Namespace)
}
if x.Name == "" {
return buffer.String(), nil
}
buffer.WriteString("/")
buffer.WriteString(x.Name)
if rv > 0 {
buffer.WriteString("/")
buffer.WriteString(fmt.Sprintf("%.20d", rv))
}
return buffer.String(), nil
}
// KeyToPath implements KeyConversions.
func (s *simpleConverter) PathPrefix(x *ResourceKey) string {
var buffer bytes.Buffer
if x.Group == "" {
return ""
}
buffer.WriteString(x.Group)
if x.Resource == "" {
return buffer.String()
}
buffer.WriteString("/")
buffer.WriteString(x.Resource)
if x.Namespace == "" {
if x.Name == "" {
return buffer.String()
}
buffer.WriteString("/__cluster__")
} else {
buffer.WriteString("/")
buffer.WriteString(x.Namespace)
}
if x.Name == "" {
return buffer.String()
}
buffer.WriteString("/")
buffer.WriteString(x.Name)
return buffer.String()
}
// PathToKey implements KeyConversions.
func (s *simpleConverter) PathToKey(p string) (k *ResourceKey, rv int64, err error) {
key := &ResourceKey{}
parts := strings.Split(p, "/")
if len(parts) < 2 {
return nil, 0, fmt.Errorf("expecting at least group/resource")
}
key.Group = parts[0]
key.Resource = parts[1]
if len(parts) > 2 {
key.Namespace = parts[2]
}
if len(parts) > 3 {
key.Name = parts[3]
}
if len(parts) > 4 {
parts = strings.Split(parts[4], ".")
rv, err = strconv.ParseInt(parts[0], 10, 64)
}
return key, rv, err
func matchesQueryKey(query *ResourceKey, key *ResourceKey) bool {
if query.Group != key.Group {
return false
}
if query.Resource != key.Resource {
return false
}
if query.Namespace != "" && query.Namespace != key.Namespace {
return false
}
if query.Name != "" && query.Name != key.Name {
return false
}
return true
}

View File

@ -6,25 +6,16 @@ import (
"github.com/stretchr/testify/require"
)
func TestKeyConversions(t *testing.T) {
t.Run("key namespaced path", func(t *testing.T) {
conv := &simpleConverter{}
key := &ResourceKey{
func TestKeyMatching(t *testing.T) {
t.Run("key matching", func(t *testing.T) {
require.True(t, matchesQueryKey(&ResourceKey{
Group: "ggg",
Resource: "rrr",
Namespace: "ns",
}, &ResourceKey{
Group: "ggg",
Resource: "rrr",
Namespace: "ns",
}
p, err := conv.KeyToPath(key, 0)
require.NoError(t, err)
require.Equal(t, "ggg/rrr/ns", p)
key.Name = "name"
p, err = conv.KeyToPath(key, 0)
require.NoError(t, err)
require.Equal(t, "ggg/rrr/ns/name", p)
require.Equal(t, "ggg/rrr", conv.PathPrefix(&ResourceKey{
Group: "ggg",
Resource: "rrr",
}))
})
}

View File

@ -1,6 +1,10 @@
package resource
import "context"
import (
"context"
"github.com/grafana/grafana/pkg/apimachinery/utils"
)
var (
_ ResourceSearchServer = &noopService{}
@ -39,11 +43,6 @@ func (n *noopService) List(context.Context, *ListRequest) (*ListResponse, error)
return nil, ErrNotImplementedYet
}
// GetBlob implements ResourceServer.
func (n *noopService) GetBlob(context.Context, *GetBlobRequest) (*GetBlobResponse, error) {
return nil, ErrNotImplementedYet
}
// History implements ResourceServer.
func (n *noopService) History(context.Context, *HistoryRequest) (*HistoryResponse, error) {
return nil, ErrNotImplementedYet
@ -53,3 +52,15 @@ func (n *noopService) History(context.Context, *HistoryRequest) (*HistoryRespons
func (n *noopService) Origin(context.Context, *OriginRequest) (*OriginResponse, error) {
return nil, ErrNotImplementedYet
}
func (n *noopService) SupportsSignedURLs() bool {
return false
}
func (n *noopService) PutBlob(context.Context, *PutBlobRequest) (*PutBlobResponse, error) {
return nil, ErrNotImplementedYet
}
func (n *noopService) GetBlob(ctx context.Context, resource *ResourceKey, info *utils.BlobInfo, mustProxy bool) (*GetBlobResponse, error) {
return nil, ErrNotImplementedYet
}

File diff suppressed because it is too large Load Diff

View File

@ -20,9 +20,6 @@ message ResourceWrapper {
// Full kubernetes json bytes (although the resource version may not be accurate)
bytes value = 2;
// The resource has an attached blob
bool has_blob = 4;
}
// The history and trash commands need access to commit messages
@ -39,24 +36,6 @@ message ResourceMeta {
// The kubernetes metadata section (not the full resource)
// https://github.com/kubernetes/kubernetes/blob/v1.30.2/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go#L1496
bytes partial_object_meta = 6;
// The resource has an attached blob
bool has_blob = 7;
}
// Basic blob metadata
message BlobInfo {
// System identifier
string path = 1;
// Content Length
int64 size = 2;
// Content type header
string content_type = 3;
// Content hash used for an etag
string hash = 4;
}
// Status structure is copied from:
@ -81,25 +60,6 @@ message StatusResult {
int32 code = 4;
}
message LinkBlob {
enum Action {
UNKNOWN = 0;
// Upload raw bytes
UPLOAD = 1;
// Keep the existing blob (valid for updates)
KEEP = 2;
// Do not keep the existing version (same as not sending LinkBlob, only valid for updates)
REMOVE = 3;
// TODO... support presigned uploads
}
// Content type header
string content_type = 1;
// Raw value to write
bytes value = 2;
}
// ----------------------------------
// CRUD Objects
// ----------------------------------
@ -112,9 +72,6 @@ message CreateRequest {
// The resource JSON.
bytes value = 2;
// Optionally include a large binary object
LinkBlob blob = 4;
}
message CreateResponse {
@ -137,9 +94,6 @@ message UpdateRequest {
// The resource JSON.
bytes value = 3;
// Optionally link a resource object
LinkBlob blob = 5;
}
message UpdateResponse {
@ -180,9 +134,6 @@ message ReadRequest {
// Optionally pick an explicit resource version
int64 resource_version = 3;
// Include a signed blob_url (if exists)
bool include_blob_url = 2;
}
message ReadResponse {
@ -194,28 +145,6 @@ message ReadResponse {
// The properties
bytes value = 3;
// A Signed URL that will let you fetch the blob
// If this value starts with # you must read the bytes using the GetResourceBlob request
string blob_url = 5;
}
message GetBlobRequest {
ResourceKey key = 1;
// The new resource version
int64 resource_version = 2;
}
message GetBlobResponse {
// Status code
StatusResult status = 1;
// Headers
BlobInfo info = 2;
// The raw object value
bytes value = 3;
}
// ----------------------------------
@ -426,6 +355,85 @@ message HealthCheckResponse {
ServingStatus status = 1;
}
//----------------------------
// Blob Support
//----------------------------
message PutBlobRequest {
enum Method {
// Use the inline raw []byte
GRPC = 0;
// Get a signed URL and PUT the value
HTTP = 1;
}
// The resource that will use this blob
// NOTE: the name may not yet exist, but group+resource are required
ResourceKey resource = 1;
// How to upload
Method method = 2;
// Content type header
string content_type = 3;
// Raw value to write
// Not valid when method == HTTP
bytes value = 4;
}
message PutBlobResponse {
// Status code
StatusResult status = 1;
// The blob uid. This must be saved into the resource to support access
string uid = 2;
// The URL where this value can be PUT
string url = 3;
// Size of the uploaded blob
int64 size = 4;
// Content hash used for an etag
string hash = 5;
// Validated mimetype (from content_type)
string mime_type = 6;
// Validated charset (from content_type)
string charset = 7;
}
message GetBlobRequest {
ResourceKey resource = 1;
// The new resource version
int64 resource_version = 2;
// Do not return a pre-signed URL (when possible)
bool must_proxy_bytes = 3;
}
message GetBlobResponse {
// Status code sent on errors
StatusResult status = 1;
// (optional) When possible, the system will return a presigned URL
// that can be used to actually read the full blob+metadata
// When this is set, neither info nor value will be set
string url = 2;
// Content type
string content_type = 3;
// The raw object value
bytes value = 4;
}
// This provides the CRUD+List+Watch support needed for a k8s apiserver
// The semantics and behaviors of this service are constrained by kubernetes
// This does not understand the resource schemas, only deals with json bytes
@ -437,6 +445,16 @@ service ResourceStore {
rpc Delete(DeleteRequest) returns (DeleteResponse);
rpc List(ListRequest) returns (ListResponse);
rpc Watch(WatchRequest) returns (stream WatchEvent);
// Upload a blob that will be saved in a resource
rpc PutBlob(PutBlobRequest) returns (PutBlobResponse);
// Get blob contents. When possible, this will return a signed URL
// For large payloads, signed URLs are required to avoid protobuf message size limits
rpc GetBlob(GetBlobRequest) returns (GetBlobResponse);
// NOTE: there is no direct access to delete blobs
// >> cleanup will be managed via garbage collection or direct access to the underlying storage
}
// Clients can use this service directly
@ -446,9 +464,6 @@ service ResourceSearch {
rpc Read(ReadRequest) returns (ReadResponse); // Duplicated -- for client read only usage
// Get the raw blob bytes and metadata
rpc GetBlob(GetBlobRequest) returns (GetBlobResponse);
// Show resource history (and trash)
rpc History(HistoryRequest) returns (HistoryResponse);

View File

@ -19,12 +19,14 @@ import (
const _ = grpc.SupportPackageIsVersion8
const (
ResourceStore_Read_FullMethodName = "/resource.ResourceStore/Read"
ResourceStore_Create_FullMethodName = "/resource.ResourceStore/Create"
ResourceStore_Update_FullMethodName = "/resource.ResourceStore/Update"
ResourceStore_Delete_FullMethodName = "/resource.ResourceStore/Delete"
ResourceStore_List_FullMethodName = "/resource.ResourceStore/List"
ResourceStore_Watch_FullMethodName = "/resource.ResourceStore/Watch"
ResourceStore_Read_FullMethodName = "/resource.ResourceStore/Read"
ResourceStore_Create_FullMethodName = "/resource.ResourceStore/Create"
ResourceStore_Update_FullMethodName = "/resource.ResourceStore/Update"
ResourceStore_Delete_FullMethodName = "/resource.ResourceStore/Delete"
ResourceStore_List_FullMethodName = "/resource.ResourceStore/List"
ResourceStore_Watch_FullMethodName = "/resource.ResourceStore/Watch"
ResourceStore_PutBlob_FullMethodName = "/resource.ResourceStore/PutBlob"
ResourceStore_GetBlob_FullMethodName = "/resource.ResourceStore/GetBlob"
)
// ResourceStoreClient is the client API for ResourceStore service.
@ -42,6 +44,11 @@ type ResourceStoreClient interface {
Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*DeleteResponse, error)
List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (*ListResponse, error)
Watch(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (ResourceStore_WatchClient, error)
// Upload a blob that will be saved in a resource
PutBlob(ctx context.Context, in *PutBlobRequest, opts ...grpc.CallOption) (*PutBlobResponse, error)
// Get blob contents. When possible, this will return a signed URL
// For large payloads, signed URLs are required to avoid protobuf message size limits
GetBlob(ctx context.Context, in *GetBlobRequest, opts ...grpc.CallOption) (*GetBlobResponse, error)
}
type resourceStoreClient struct {
@ -135,6 +142,26 @@ func (x *resourceStoreWatchClient) Recv() (*WatchEvent, error) {
return m, nil
}
func (c *resourceStoreClient) PutBlob(ctx context.Context, in *PutBlobRequest, opts ...grpc.CallOption) (*PutBlobResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(PutBlobResponse)
err := c.cc.Invoke(ctx, ResourceStore_PutBlob_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *resourceStoreClient) GetBlob(ctx context.Context, in *GetBlobRequest, opts ...grpc.CallOption) (*GetBlobResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(GetBlobResponse)
err := c.cc.Invoke(ctx, ResourceStore_GetBlob_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
// ResourceStoreServer is the server API for ResourceStore service.
// All implementations should embed UnimplementedResourceStoreServer
// for forward compatibility
@ -150,6 +177,11 @@ type ResourceStoreServer interface {
Delete(context.Context, *DeleteRequest) (*DeleteResponse, error)
List(context.Context, *ListRequest) (*ListResponse, error)
Watch(*WatchRequest, ResourceStore_WatchServer) error
// Upload a blob that will be saved in a resource
PutBlob(context.Context, *PutBlobRequest) (*PutBlobResponse, error)
// Get blob contents. When possible, this will return a signed URL
// For large payloads, signed URLs are required to avoid protobuf message size limits
GetBlob(context.Context, *GetBlobRequest) (*GetBlobResponse, error)
}
// UnimplementedResourceStoreServer should be embedded to have forward compatible implementations.
@ -174,6 +206,12 @@ func (UnimplementedResourceStoreServer) List(context.Context, *ListRequest) (*Li
func (UnimplementedResourceStoreServer) Watch(*WatchRequest, ResourceStore_WatchServer) error {
return status.Errorf(codes.Unimplemented, "method Watch not implemented")
}
func (UnimplementedResourceStoreServer) PutBlob(context.Context, *PutBlobRequest) (*PutBlobResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method PutBlob not implemented")
}
func (UnimplementedResourceStoreServer) GetBlob(context.Context, *GetBlobRequest) (*GetBlobResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetBlob not implemented")
}
// UnsafeResourceStoreServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to ResourceStoreServer will
@ -297,6 +335,42 @@ func (x *resourceStoreWatchServer) Send(m *WatchEvent) error {
return x.ServerStream.SendMsg(m)
}
func _ResourceStore_PutBlob_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(PutBlobRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ResourceStoreServer).PutBlob(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ResourceStore_PutBlob_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ResourceStoreServer).PutBlob(ctx, req.(*PutBlobRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ResourceStore_GetBlob_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetBlobRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ResourceStoreServer).GetBlob(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ResourceStore_GetBlob_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ResourceStoreServer).GetBlob(ctx, req.(*GetBlobRequest))
}
return interceptor(ctx, in, info, handler)
}
// ResourceStore_ServiceDesc is the grpc.ServiceDesc for ResourceStore service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@ -324,6 +398,14 @@ var ResourceStore_ServiceDesc = grpc.ServiceDesc{
MethodName: "List",
Handler: _ResourceStore_List_Handler,
},
{
MethodName: "PutBlob",
Handler: _ResourceStore_PutBlob_Handler,
},
{
MethodName: "GetBlob",
Handler: _ResourceStore_GetBlob_Handler,
},
},
Streams: []grpc.StreamDesc{
{
@ -337,7 +419,6 @@ var ResourceStore_ServiceDesc = grpc.ServiceDesc{
const (
ResourceSearch_Read_FullMethodName = "/resource.ResourceSearch/Read"
ResourceSearch_GetBlob_FullMethodName = "/resource.ResourceSearch/GetBlob"
ResourceSearch_History_FullMethodName = "/resource.ResourceSearch/History"
ResourceSearch_Origin_FullMethodName = "/resource.ResourceSearch/Origin"
)
@ -350,8 +431,6 @@ const (
// NOTE: This is read only, and no read afer write guarantees
type ResourceSearchClient interface {
Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*ReadResponse, error)
// Get the raw blob bytes and metadata
GetBlob(ctx context.Context, in *GetBlobRequest, opts ...grpc.CallOption) (*GetBlobResponse, error)
// Show resource history (and trash)
History(ctx context.Context, in *HistoryRequest, opts ...grpc.CallOption) (*HistoryResponse, error)
// Used for efficient provisioning
@ -376,16 +455,6 @@ func (c *resourceSearchClient) Read(ctx context.Context, in *ReadRequest, opts .
return out, nil
}
func (c *resourceSearchClient) GetBlob(ctx context.Context, in *GetBlobRequest, opts ...grpc.CallOption) (*GetBlobResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(GetBlobResponse)
err := c.cc.Invoke(ctx, ResourceSearch_GetBlob_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *resourceSearchClient) History(ctx context.Context, in *HistoryRequest, opts ...grpc.CallOption) (*HistoryResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(HistoryResponse)
@ -414,8 +483,6 @@ func (c *resourceSearchClient) Origin(ctx context.Context, in *OriginRequest, op
// NOTE: This is read only, and no read afer write guarantees
type ResourceSearchServer interface {
Read(context.Context, *ReadRequest) (*ReadResponse, error)
// Get the raw blob bytes and metadata
GetBlob(context.Context, *GetBlobRequest) (*GetBlobResponse, error)
// Show resource history (and trash)
History(context.Context, *HistoryRequest) (*HistoryResponse, error)
// Used for efficient provisioning
@ -429,9 +496,6 @@ type UnimplementedResourceSearchServer struct {
func (UnimplementedResourceSearchServer) Read(context.Context, *ReadRequest) (*ReadResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Read not implemented")
}
func (UnimplementedResourceSearchServer) GetBlob(context.Context, *GetBlobRequest) (*GetBlobResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetBlob not implemented")
}
func (UnimplementedResourceSearchServer) History(context.Context, *HistoryRequest) (*HistoryResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method History not implemented")
}
@ -468,24 +532,6 @@ func _ResourceSearch_Read_Handler(srv interface{}, ctx context.Context, dec func
return interceptor(ctx, in, info, handler)
}
func _ResourceSearch_GetBlob_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetBlobRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ResourceSearchServer).GetBlob(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ResourceSearch_GetBlob_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ResourceSearchServer).GetBlob(ctx, req.(*GetBlobRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ResourceSearch_History_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(HistoryRequest)
if err := dec(in); err != nil {
@ -533,10 +579,6 @@ var ResourceSearch_ServiceDesc = grpc.ServiceDesc{
MethodName: "Read",
Handler: _ResourceSearch_Read_Handler,
},
{
MethodName: "GetBlob",
Handler: _ResourceSearch_GetBlob_Handler,
},
{
MethodName: "History",
Handler: _ResourceSearch_History_Handler,

View File

@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"log/slog"
"math/rand"
"sync"
"time"
@ -40,23 +41,40 @@ type ResourceServer interface {
LifecycleHooks
}
// This store is not exposed directly to users, it is a helper to implement writing
// resources as a stream of WriteEvents
type AppendingStore interface {
// Write a Create/Update/Delete,
// NOTE: the contents of WriteEvent have been validated
// Return the revisionVersion for this event or error
WriteEvent(context.Context, WriteEvent) (int64, error)
// Create new name for a given resource
GenerateName(ctx context.Context, key *ResourceKey, prefix string) (string, error)
// Read a value from storage
Read(context.Context, *ReadRequest) (*ReadResponse, error)
// Implement List -- this expects the read after write semantics
List(context.Context, *ListRequest) (*ListResponse, error)
// Watch for events
Watch(context.Context, *WatchRequest) (chan *WatchEvent, error)
// Get all events from the store
// For HA setups, this will be more events than the local WriteEvent above!
WatchWriteEvents(ctx context.Context) (<-chan *WrittenEvent, error)
}
// This interface is not exposed to end users directly
// Access to this interface is already gated by access control
type BlobStore interface {
// Indicates if storage layer supports signed urls
SupportsSignedURLs() bool
// Get the raw blob bytes and metadata -- limited to protobuf message size
// For larger payloads, we should use presigned URLs to upload from the client
PutBlob(context.Context, *PutBlobRequest) (*PutBlobResponse, error)
// Get blob contents. When possible, this will return a signed URL
// For large payloads, signed URLs are required to avoid protobuf message size limits
GetBlob(ctx context.Context, resource *ResourceKey, info *utils.BlobInfo, mustProxy bool) (*GetBlobResponse, error)
// TODO? List+Delete? This is for admin access
}
type ResourceServerOptions struct {
@ -73,6 +91,9 @@ type ResourceServerOptions struct {
// Real storage backend
Store AppendingStore
// The blob storage engine
Blob BlobStore
// Real storage backend
Search ResourceSearchServer
@ -115,6 +136,9 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
if opts.Search == nil {
opts.Search = &noopService{}
}
if opts.Blob == nil {
opts.Blob = &noopService{}
}
if opts.Diagnostics == nil {
opts.Search = &noopService{}
}
@ -124,8 +148,11 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
}
}
// Make this cancelable
ctx, cancel := context.WithCancel(context.Background())
return &server{
tracer: opts.Tracer,
log: slog.Default().With("logger", "resource-server"),
nextEventID: opts.NextEventID,
store: opts.Store,
search: opts.Search,
@ -133,6 +160,8 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
access: opts.WriteAccess,
lifecycle: opts.Lifecycle,
now: opts.Now,
ctx: ctx,
cancel: cancel,
}, nil
}
@ -140,14 +169,21 @@ var _ ResourceServer = &server{}
type server struct {
tracer trace.Tracer
log *slog.Logger
nextEventID func() int64
store AppendingStore
search ResourceSearchServer
blob BlobStore
diagnostics DiagnosticsServer
access WriteAccessHooks
lifecycle LifecycleHooks
now func() int64
// Background watch task
ctx context.Context
cancel context.CancelFunc
broadcaster Broadcaster[*WrittenEvent]
// init checking
once sync.Once
initErr error
@ -156,8 +192,6 @@ type server struct {
// Init implements ResourceServer.
func (s *server) Init() error {
s.once.Do(func() {
// TODO, setup a broadcaster for watch
// Call lifecycle hooks
if s.lifecycle != nil {
err := s.lifecycle.Init()
@ -165,15 +199,24 @@ func (s *server) Init() error {
s.initErr = fmt.Errorf("initialize Resource Server: %w", err)
}
}
// Start listening for changes
s.initWatcher()
})
return s.initErr
}
func (s *server) Stop() {
s.initErr = fmt.Errorf("service is stopping")
if s.lifecycle != nil {
s.lifecycle.Stop()
}
// Stops the streaming
s.cancel()
// mark the value as done
s.initErr = fmt.Errorf("service is stopped")
}
@ -191,9 +234,6 @@ func (s *server) newEventBuilder(ctx context.Context, key *ResourceKey, value, o
}
obj := event.Meta
if key.Name != obj.GetName() {
return nil, apierrors.NewBadRequest("key/name do not match")
}
if key.Namespace != obj.GetNamespace() {
return nil, apierrors.NewBadRequest("key/namespace do not match")
}
@ -213,27 +253,20 @@ func (s *server) newEventBuilder(ctx context.Context, key *ResourceKey, value, o
// This needs to be a create function
if key.Name == "" {
prefix := obj.GetGenerateName()
if prefix == "" {
return nil, apierrors.NewBadRequest("must have name or generate name set")
if obj.GetName() == "" {
return nil, apierrors.NewBadRequest("missing name")
}
key.Name, err = s.store.GenerateName(ctx, key, prefix)
if err != nil {
return nil, err
}
obj.SetName(key.Name)
obj.SetGenerateName("")
} else if obj.GetGenerateName() != "" {
return nil, apierrors.NewBadRequest("values with a name must not include generate name")
key.Name = obj.GetName()
} else if key.Name != obj.GetName() {
return nil, apierrors.NewBadRequest(
fmt.Sprintf("key/name do not match (key: %s, name: %s)", key.Name, obj.GetName()))
}
obj.SetGenerateName("")
err = validateName(obj.GetName())
if err != nil {
return nil, err
}
if obj.GetName() != key.Name {
return nil, apierrors.NewBadRequest("key name does not match the name in the body")
}
folder := obj.GetFolder()
if folder != "" {
err = s.access.CanWriteFolder(ctx, event.Requester, folder)
@ -376,7 +409,7 @@ func (s *server) Update(ctx context.Context, req *UpdateRequest) (*UpdateRespons
return rsp, err
}
event.Event = WatchEvent_MODIFIED
event.Type = WatchEvent_MODIFIED
event.PreviousRV = latest.ResourceVersion
rsp.ResourceVersion, err = s.store.WriteEvent(ctx, event)
@ -416,7 +449,7 @@ func (s *server) Delete(ctx context.Context, req *DeleteRequest) (*DeleteRespons
event := WriteEvent{
EventID: s.nextEventID(),
Key: req.Key,
Event: WatchEvent_DELETED,
Type: WatchEvent_DELETED,
PreviousRV: latest.ResourceVersion,
}
requester, err := identity.GetRequester(ctx)
@ -440,7 +473,7 @@ func (s *server) Delete(ctx context.Context, req *DeleteRequest) (*DeleteRespons
obj.SetUpdatedBy(requester.GetUID().String())
marker.TypeMeta = metav1.TypeMeta{
Kind: "DeletedMarker",
APIVersion: "storage.grafana.app/v0alpha1", // ?? or can we stick this in common?
APIVersion: "common.grafana.app/v0alpha1", // ?? or can we stick this in common?
}
marker.Annotations["RestoreResourceVersion"] = fmt.Sprintf("%d", event.PreviousRV)
event.Value, err = json.Marshal(marker)
@ -488,20 +521,110 @@ func (s *server) List(ctx context.Context, req *ListRequest) (*ListResponse, err
return rsp, err
}
func (s *server) initWatcher() error {
var err error
s.broadcaster, err = NewBroadcaster(s.ctx, func(out chan<- *WrittenEvent) error {
events, err := s.store.WatchWriteEvents(s.ctx)
if err != nil {
return err
}
go func() {
for {
// pipe all events
v := <-events
out <- v
}
}()
return nil
})
return err
}
func (s *server) Watch(req *WatchRequest, srv ResourceStore_WatchServer) error {
if err := s.Init(); err != nil {
return err
}
// TODO??? can we move any of the common processing here?
stream, err := s.store.Watch(srv.Context(), req)
fmt.Printf("WATCH %v\n", req.Options.Key)
ctx := srv.Context()
// Start listening -- this will buffer any changes that happen while we backfill
stream, err := s.broadcaster.Subscribe(ctx)
if err != nil {
return err
}
for event := range stream {
srv.Send(event)
defer s.broadcaster.Unsubscribe(stream)
since := req.Since
if req.SendInitialEvents {
fmt.Printf("TODO... query\n")
if req.AllowWatchBookmarks {
fmt.Printf("TODO... send bookmark\n")
}
}
return nil
for {
select {
case <-ctx.Done():
return nil
case event, ok := <-stream:
if !ok {
s.log.Debug("watch events closed")
return nil
}
if event.ResourceVersion > since && matchesQueryKey(req.Options.Key, event.Key) {
srv.Send(&WatchEvent{
Timestamp: event.Timestamp,
Type: event.Type,
Resource: &WatchEvent_Resource{
Value: event.Value,
Version: event.ResourceVersion,
},
// TODO... previous???
})
}
}
}
}
// GetBlob implements ResourceServer.
func (s *server) PutBlob(ctx context.Context, req *PutBlobRequest) (*PutBlobResponse, error) {
if err := s.Init(); err != nil {
return nil, err
}
rsp, err := s.blob.PutBlob(ctx, req)
rsp.Status, err = errToStatus(err)
return rsp, err
}
func (s *server) getPartialObject(ctx context.Context, key *ResourceKey, rv int64) (utils.GrafanaMetaAccessor, *StatusResult) {
rsp, err := s.store.Read(ctx, &ReadRequest{
Key: key,
ResourceVersion: rv,
})
if err != nil {
rsp.Status, _ = errToStatus(err)
}
if rsp.Status != nil {
return nil, rsp.Status
}
partial := &metav1.PartialObjectMetadata{}
err = json.Unmarshal(rsp.Value, partial)
if err != nil {
rsp.Status, _ = errToStatus(fmt.Errorf("error reading body %w", err))
return nil, rsp.Status
}
obj, err := utils.MetaAccessor(partial)
if err != nil {
rsp.Status, _ = errToStatus(fmt.Errorf("error getting accessor %w", err))
return nil, rsp.Status
}
return obj, nil
}
// GetBlob implements ResourceServer.
@ -509,7 +632,23 @@ func (s *server) GetBlob(ctx context.Context, req *GetBlobRequest) (*GetBlobResp
if err := s.Init(); err != nil {
return nil, err
}
rsp, err := s.search.GetBlob(ctx, req)
// NOTE: in SQL... this could be simple select rather than a full fetch and extract
obj, status := s.getPartialObject(ctx, req.Resource, req.ResourceVersion)
if status != nil {
return &GetBlobResponse{Status: status}, nil
}
info := obj.GetBlob()
if info == nil || info.UID == "" {
return &GetBlobResponse{Status: &StatusResult{
Status: "Failure",
Message: "Resource does not have a linked blob",
Code: 404,
}}, nil
}
rsp, err := s.blob.GetBlob(ctx, req.Resource, info, req.MustProxyBytes)
rsp.Status, err = errToStatus(err)
return rsp, err
}

View File

@ -9,9 +9,9 @@ import (
"testing"
"time"
"github.com/hack-pad/hackpadfs"
hackos "github.com/hack-pad/hackpadfs/os"
"github.com/stretchr/testify/require"
"gocloud.dev/blob/fileblob"
"gocloud.dev/blob/memblob"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"github.com/grafana/grafana/pkg/apimachinery/identity"
@ -28,20 +28,26 @@ func TestSimpleServer(t *testing.T) {
}
ctx := identity.WithRequester(context.Background(), testUserA)
var root hackpadfs.FS
if true {
bucket := memblob.OpenBucket(nil)
if false {
tmp, err := os.MkdirTemp("", "xxx-*")
require.NoError(t, err)
root, err = hackos.NewFS().Sub(tmp[1:])
bucket, err = fileblob.OpenBucket(tmp, &fileblob.Options{
CreateDir: true,
Metadata: fileblob.MetadataDontWrite, // skip
})
require.NoError(t, err)
fmt.Printf("ROOT: %s\n\n", tmp)
}
store, err := NewCDKAppendingStore(ctx, CDKAppenderOptions{
Bucket: bucket,
})
require.NoError(t, err)
server, err := NewResourceServer(ResourceServerOptions{
Store: NewFileSystemStore(FileSystemOptions{
Root: root,
}),
Store: store,
})
require.NoError(t, err)

View File

@ -7,7 +7,7 @@ import (
)
func InitResourceTables(mg *migrator.Migrator) string {
marker := "Initialize resource tables (v0)" // changing this key wipe+rewrite everything
marker := "Initialize resource tables (vX)" // changing this key wipe+rewrite everything
mg.AddMigration(marker, &migrator.RawSQLMigration{})
tables := []migrator.Table{}
@ -65,7 +65,7 @@ func InitResourceTables(mg *migrator.Migrator) string {
{Name: "hash", Type: migrator.DB_NVarchar, Length: 32, Nullable: false},
// Path to linked blob (or null). This blob may be saved in SQL, or in an object store
{Name: "blob_path_hash", Type: migrator.DB_NVarchar, Length: 60, Nullable: true},
{Name: "blob_uid", Type: migrator.DB_NVarchar, Length: 60, Nullable: true},
},
Indices: []*migrator.Index{
{Cols: []string{"rv"}, Type: migrator.UniqueIndex},
@ -74,7 +74,7 @@ func InitResourceTables(mg *migrator.Migrator) string {
{Cols: []string{"operation"}, Type: migrator.IndexType},
{Cols: []string{"namespace"}, Type: migrator.IndexType},
{Cols: []string{"group", "resource", "name"}, Type: migrator.IndexType},
{Cols: []string{"blob_path_hash"}, Type: migrator.IndexType},
{Cols: []string{"blob_uid"}, Type: migrator.IndexType},
},
})
@ -111,19 +111,28 @@ func InitResourceTables(mg *migrator.Migrator) string {
},
})
// This table is optional -- values can be saved in blob storage
// This table is optional, blobs can also be saved to object store or disk
// This is an append only store
tables = append(tables, migrator.Table{
Name: "resource_blob", // even things that failed?
Columns: []*migrator.Column{
{Name: "path_hash", Type: migrator.DB_NVarchar, Length: 60, Nullable: false, IsPrimaryKey: true},
{Name: "path", Type: migrator.DB_Text, Nullable: false},
{Name: "body", Type: migrator.DB_Blob, Nullable: true},
{Name: "uid", Type: migrator.DB_NVarchar, Length: 60, Nullable: false, IsPrimaryKey: true},
{Name: "value", Type: migrator.DB_Blob, Nullable: true},
{Name: "etag", Type: migrator.DB_NVarchar, Length: 64, Nullable: false},
{Name: "size", Type: migrator.DB_BigInt, Nullable: false},
{Name: "content_type", Type: migrator.DB_NVarchar, Length: 255, Nullable: false},
// These is used for auditing and cleanup (could be path?)
{Name: "namespace", Type: migrator.DB_NVarchar, Length: 63, Nullable: true},
{Name: "group", Type: migrator.DB_NVarchar, Length: 190, Nullable: false},
{Name: "resource", Type: migrator.DB_NVarchar, Length: 190, Nullable: false},
{Name: "name", Type: migrator.DB_NVarchar, Length: 190, Nullable: false},
},
Indices: []*migrator.Index{
{Cols: []string{"path_hash"}, Type: migrator.UniqueIndex},
{Cols: []string{"uid"}, Type: migrator.UniqueIndex},
// Used for auditing
{Cols: []string{"namespace", "group", "resource", "name"}, Type: migrator.IndexType},
},
})

View File

@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"net/http"
"strings"
"github.com/prometheus/client_golang/prometheus"
@ -139,15 +140,10 @@ func (s *sqlResourceStore) WriteEvent(ctx context.Context, event resource.WriteE
return 0, ErrNotImplementedYet
}
func (s *sqlResourceStore) Watch(context.Context, *resource.WatchRequest) (chan *resource.WatchEvent, error) {
func (s *sqlResourceStore) WatchWriteEvents(ctx context.Context) (<-chan *resource.WrittenEvent, error) {
return nil, ErrNotImplementedYet
}
// Create new name for a given resource
func (s *sqlResourceStore) GenerateName(_ context.Context, _ *resource.ResourceKey, _ string) (string, error) {
return util.GenerateShortUID(), nil
}
func (s *sqlResourceStore) Read(ctx context.Context, req *resource.ReadRequest) (*resource.ReadResponse, error) {
_, span := s.tracer.Start(ctx, "storage_server.GetResource")
defer span.End()
@ -166,13 +162,25 @@ func (s *sqlResourceStore) List(ctx context.Context, req *resource.ListRequest)
return nil, ErrNotImplementedYet
}
// Get the raw blob bytes and metadata
func (s *sqlResourceStore) GetBlob(ctx context.Context, req *resource.GetBlobRequest) (*resource.GetBlobResponse, error) {
_, span := s.tracer.Start(ctx, "storage_server.List")
defer span.End()
func (s *sqlResourceStore) PutBlob(ctx context.Context, req *resource.PutBlobRequest) (*resource.PutBlobResponse, error) {
if req.Method == resource.PutBlobRequest_HTTP {
return &resource.PutBlobResponse{
Status: &resource.StatusResult{
Status: "Failure",
Message: "http upload not supported",
Code: http.StatusNotImplemented,
},
}, nil
}
fmt.Printf("TODO, GET BLOB: %+v", req.Key)
uid := util.GenerateShortUID()
fmt.Printf("TODO, UPLOAD: %s // %+v", uid, req)
return nil, ErrNotImplementedYet
}
func (s *sqlResourceStore) GetBlob(ctx context.Context, uid string, mustProxy bool) (*resource.GetBlobResponse, error) {
return nil, ErrNotImplementedYet
}