make list preparelist

This commit is contained in:
Ryan McKinley 2024-07-02 14:37:28 -07:00
parent a3d9f8aab2
commit 0db0e13248
10 changed files with 57 additions and 330 deletions

View File

@ -145,7 +145,7 @@ func (a *dashboardSqlAccess) Read(ctx context.Context, req *resource.ReadRequest
}
// List implements AppendingStore.
func (a *dashboardSqlAccess) List(ctx context.Context, req *resource.ListRequest) (*resource.ListResponse, error) {
func (a *dashboardSqlAccess) PrepareList(ctx context.Context, req *resource.ListRequest) (*resource.ListResponse, error) {
opts := req.Options
info, err := request.ParseNamespace(opts.Key.Namespace)
if err == nil {

View File

@ -24,7 +24,7 @@ type DashboardQuery struct {
}
type DashboardAccess interface {
resource.AppendingStore
resource.StorageBackend
resource.BlobStore
resource.ResourceIndexServer

View File

@ -24,9 +24,9 @@ type dashboardStorage struct {
func (s *dashboardStorage) newStore(scheme *runtime.Scheme, defaultOptsGetter generic.RESTOptionsGetter) (grafanarest.LegacyStorage, error) {
server, err := resource.NewResourceServer(resource.ResourceServerOptions{
Store: s.access,
Search: s.access,
Blob: s.access,
Backend: s.access,
Search: s.access,
Blob: s.access,
// WriteAccess: resource.WriteAccessHooks{
// Folder: func(ctx context.Context, user identity.Requester, uid string) bool {
// // ???

View File

@ -72,7 +72,7 @@ func ProvideResourceServer(db db.DB, cfg *setting.Cfg, features featuremgmt.Feat
server: server,
client: client,
}
opts.Store = bridge
opts.Backend = bridge
opts.Diagnostics = bridge
opts.Lifecycle = bridge
} else {
@ -88,7 +88,7 @@ func ProvideResourceServer(db db.DB, cfg *setting.Cfg, features featuremgmt.Feat
if err != nil {
return nil, err
}
opts.Store, err = resource.NewCDKAppendingStore(context.Background(), resource.CDKAppenderOptions{
opts.Backend, err = resource.NewCDKBackend(context.Background(), resource.CDKBackendOptions{
Tracer: tracer,
Bucket: bucket,
})
@ -262,7 +262,7 @@ func (b *entityBridge) Read(ctx context.Context, req *resource.ReadRequest) (*re
}
// List implements ResourceServer.
func (b *entityBridge) List(ctx context.Context, req *resource.ListRequest) (*resource.ListResponse, error) {
func (b *entityBridge) PrepareList(ctx context.Context, req *resource.ListRequest) (*resource.ListResponse, error) {
key := req.Options.Key
query := &entity.EntityListRequest{
NextPageToken: req.NextPageToken,

View File

@ -1,298 +0,0 @@
package resource
import (
"bytes"
context "context"
"fmt"
"io"
"sort"
"strconv"
"strings"
"sync"
"time"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"gocloud.dev/blob"
_ "gocloud.dev/blob/fileblob"
_ "gocloud.dev/blob/memblob"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
)
type CDKAppenderOptions struct {
Tracer trace.Tracer
Bucket *blob.Bucket
RootFolder string
NextResourceVersion NextResourceVersion
}
func NewCDKAppendingStore(ctx context.Context, opts CDKAppenderOptions) (AppendingStore, error) {
if opts.Tracer == nil {
opts.Tracer = noop.NewTracerProvider().Tracer("cdk-appending-store")
}
if opts.Bucket == nil {
return nil, fmt.Errorf("missing bucket")
}
found, _, err := opts.Bucket.ListPage(ctx, blob.FirstPageToken, 1, &blob.ListOptions{
Prefix: opts.RootFolder,
Delimiter: "/",
})
if err != nil {
return nil, err
}
if found == nil {
return nil, fmt.Errorf("the root folder does not exist")
}
// This is not safe when running in HA!
if opts.NextResourceVersion == nil {
opts.NextResourceVersion = newResourceVersionCounter(time.Now().UnixMilli())
}
return &cdkAppender{
tracer: opts.Tracer,
bucket: opts.Bucket,
root: opts.RootFolder,
nextRV: opts.NextResourceVersion,
}, nil
}
type cdkAppender struct {
tracer trace.Tracer
bucket *blob.Bucket
root string
nextRV NextResourceVersion
mutex sync.Mutex
// Typically one... the server wrapper
subscribers []chan *WrittenEvent
}
func (s *cdkAppender) getPath(key *ResourceKey, rv int64) string {
var buffer bytes.Buffer
buffer.WriteString(s.root)
if key.Group == "" {
return buffer.String()
}
buffer.WriteString(key.Group)
if key.Resource == "" {
return buffer.String()
}
buffer.WriteString("/")
buffer.WriteString(key.Resource)
if key.Namespace == "" {
if key.Name == "" {
return buffer.String()
}
buffer.WriteString("/__cluster__")
} else {
buffer.WriteString("/")
buffer.WriteString(key.Namespace)
}
if key.Name == "" {
return buffer.String()
}
buffer.WriteString("/")
buffer.WriteString(key.Name)
if rv > 0 {
buffer.WriteString(fmt.Sprintf("/%d.json", rv))
}
return buffer.String()
}
func (s *cdkAppender) WriteEvent(ctx context.Context, event WriteEvent) (rv int64, err error) {
// Scope the lock
{
s.mutex.Lock()
defer s.mutex.Unlock()
rv = s.nextRV()
err = s.bucket.WriteAll(ctx, s.getPath(event.Key, rv), event.Value, &blob.WriterOptions{
ContentType: "application/json",
})
}
// Async notify all subscribers
if s.subscribers != nil {
go func() {
write := &WrittenEvent{
WriteEvent: event,
Timestamp: time.Now().UnixMilli(),
ResourceVersion: rv,
}
for _, sub := range s.subscribers {
sub <- write
}
}()
}
return rv, err
}
// Read implements ResourceStoreServer.
func (s *cdkAppender) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, error) {
rv := req.ResourceVersion
path := s.getPath(req.Key, req.ResourceVersion)
if rv < 1 {
iter := s.bucket.List(&blob.ListOptions{Prefix: path + "/", Delimiter: "/"})
for {
obj, err := iter.Next(ctx)
if err == io.EOF {
break
}
if strings.HasSuffix(obj.Key, ".json") {
idx := strings.LastIndex(obj.Key, "/") + 1
edx := strings.LastIndex(obj.Key, ".")
if idx > 0 {
v, err := strconv.ParseInt(obj.Key[idx:edx], 10, 64)
if err == nil && v > rv {
rv = v
path = obj.Key // find the path with biggest resource version
}
}
}
}
}
raw, err := s.bucket.ReadAll(ctx, path)
if err == nil && bytes.Contains(raw, []byte(`"DeletedMarker"`)) {
tmp := &unstructured.Unstructured{}
err = tmp.UnmarshalJSON(raw)
if err == nil && tmp.GetKind() == "DeletedMarker" {
return nil, apierrors.NewNotFound(schema.GroupResource{
Group: req.Key.Group,
Resource: req.Key.Resource,
}, req.Key.Name)
}
}
return &ReadResponse{
ResourceVersion: rv,
Value: raw,
}, err
}
// List implements AppendingStore.
func (s *cdkAppender) List(ctx context.Context, req *ListRequest) (*ListResponse, error) {
resources, err := buildTree(ctx, s, req.Options.Key)
if err != nil {
return nil, err
}
rsp := &ListResponse{}
for _, item := range resources {
latest := item.versions[0]
raw, err := s.bucket.ReadAll(ctx, latest.key)
if err != nil {
return nil, err
}
rsp.Items = append(rsp.Items, &ResourceWrapper{
ResourceVersion: latest.rv,
Value: raw,
})
}
return rsp, nil
}
// Watch implements AppendingStore.
func (s *cdkAppender) WatchWriteEvents(ctx context.Context) (<-chan *WrittenEvent, error) {
stream := make(chan *WrittenEvent, 10)
{
s.mutex.Lock()
defer s.mutex.Unlock()
// Add the event stream
s.subscribers = append(s.subscribers, stream)
}
// Wait for context done
go func() {
// Wait till the context is done
<-ctx.Done()
// Then remove the subscription
s.mutex.Lock()
defer s.mutex.Unlock()
// Copy all streams without our listener
subs := []chan *WrittenEvent{}
for _, sub := range s.subscribers {
if sub != stream {
subs = append(subs, sub)
}
}
s.subscribers = subs
}()
return stream, nil
}
// group > resource > namespace > name > versions
type cdkResource struct {
prefix string
versions []cdkVersion
}
type cdkVersion struct {
rv int64
key string
}
func buildTree(ctx context.Context, s *cdkAppender, key *ResourceKey) ([]cdkResource, error) {
byPrefix := make(map[string]*cdkResource)
path := s.getPath(key, 0)
iter := s.bucket.List(&blob.ListOptions{Prefix: path, Delimiter: ""}) // "" is recursive
for {
obj, err := iter.Next(ctx)
if err == io.EOF {
break
}
if strings.HasSuffix(obj.Key, ".json") {
idx := strings.LastIndex(obj.Key, "/") + 1
edx := strings.LastIndex(obj.Key, ".")
if idx > 0 {
rv, err := strconv.ParseInt(obj.Key[idx:edx], 10, 64)
if err == nil {
prefix := obj.Key[:idx]
res, ok := byPrefix[prefix]
if !ok {
res = &cdkResource{prefix: prefix}
byPrefix[prefix] = res
}
res.versions = append(res.versions, cdkVersion{
rv: rv,
key: obj.Key,
})
}
}
}
}
// Now sort all versions
resources := make([]cdkResource, 0, len(byPrefix))
for _, res := range byPrefix {
sort.Slice(res.versions, func(i, j int) bool {
return res.versions[i].rv > res.versions[j].rv
})
resources = append(resources, *res)
}
sort.Slice(resources, func(i, j int) bool {
a := resources[i].versions[0].rv
b := resources[j].versions[0].rv
return a > b
})
return resources, nil
}

View File

@ -444,7 +444,15 @@ service ResourceStore {
rpc Create(CreateRequest) returns (CreateResponse);
rpc Update(UpdateRequest) returns (UpdateResponse);
rpc Delete(DeleteRequest) returns (DeleteResponse);
// The results *may* include values that should not be returned to the user
// This will perform best-effort filtering to increase performace.
// NOTE: storage.Interface is ultimatly responsible for the final filtering
rpc List(ListRequest) returns (ListResponse);
// The results *may* include values that should not be returned to the user
// This will perform best-effort filtering to increase performace.
// NOTE: storage.Interface is ultimatly responsible for the final filtering
rpc Watch(WatchRequest) returns (stream WatchEvent);
}

View File

@ -40,7 +40,13 @@ type ResourceStoreClient interface {
Create(ctx context.Context, in *CreateRequest, opts ...grpc.CallOption) (*CreateResponse, error)
Update(ctx context.Context, in *UpdateRequest, opts ...grpc.CallOption) (*UpdateResponse, error)
Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*DeleteResponse, error)
// The results *may* include values that should not be returned to the user
// This will perform best-effort filtering to increase performace.
// NOTE: storage.Interface is ultimatly responsible for the final filtering
List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (*ListResponse, error)
// The results *may* include values that should not be returned to the user
// This will perform best-effort filtering to increase performace.
// NOTE: storage.Interface is ultimatly responsible for the final filtering
Watch(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (ResourceStore_WatchClient, error)
}
@ -148,7 +154,13 @@ type ResourceStoreServer interface {
Create(context.Context, *CreateRequest) (*CreateResponse, error)
Update(context.Context, *UpdateRequest) (*UpdateResponse, error)
Delete(context.Context, *DeleteRequest) (*DeleteResponse, error)
// The results *may* include values that should not be returned to the user
// This will perform best-effort filtering to increase performace.
// NOTE: storage.Interface is ultimatly responsible for the final filtering
List(context.Context, *ListRequest) (*ListResponse, error)
// The results *may* include values that should not be returned to the user
// This will perform best-effort filtering to increase performace.
// NOTE: storage.Interface is ultimatly responsible for the final filtering
Watch(*WatchRequest, ResourceStore_WatchServer) error
}

View File

@ -40,19 +40,24 @@ type ResourceServer interface {
LifecycleHooks
}
// This store is not exposed directly to users, it is a helper to implement writing
// resources as a stream of WriteEvents
type AppendingStore interface {
// The StorageBackend is an internal abstraction that supports interacting with
// the underlying raw storage medium. This interface is never exposed directly,
// it is provided by concrete instances that actually write values.
type StorageBackend interface {
// Write a Create/Update/Delete,
// NOTE: the contents of WriteEvent have been validated
// Return the revisionVersion for this event or error
WriteEvent(context.Context, WriteEvent) (int64, error)
// Read a value from storage
// Read a value from storage optionally at an explicit version
Read(context.Context, *ReadRequest) (*ReadResponse, error)
// Implement List -- this expects the read after write semantics
List(context.Context, *ListRequest) (*ListResponse, error)
// When the ResourceServer executes a List request, it will first
// query the backend for potential results. All results will be
// checked against the kubernetes requirements before finally returning
// results. The list options can be used to improve performance
// but are the the final answer.
PrepareList(context.Context, *ListRequest) (*ListResponse, error)
// Get all events from the store
// For HA setups, this will be more events than the local WriteEvent above!
@ -81,7 +86,7 @@ type ResourceServerOptions struct {
Tracer trace.Tracer
// Real storage backend
Store AppendingStore
Backend StorageBackend
// The blob storage engine
Blob BlobStore
@ -108,8 +113,8 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
opts.Tracer = noop.NewTracerProvider().Tracer("resource-server")
}
if opts.Store == nil {
return nil, fmt.Errorf("missing AppendingStore implementation")
if opts.Backend == nil {
return nil, fmt.Errorf("missing Backend implementation")
}
if opts.Search == nil {
opts.Search = &noopService{}
@ -137,7 +142,7 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
return &server{
tracer: opts.Tracer,
log: slog.Default().With("logger", "resource-server"),
store: opts.Store,
backend: opts.Backend,
search: opts.Search,
diagnostics: opts.Diagnostics,
access: opts.WriteAccess,
@ -153,7 +158,7 @@ var _ ResourceServer = &server{}
type server struct {
tracer trace.Tracer
log *slog.Logger
store AppendingStore
backend StorageBackend
search ResourceIndexServer
blob BlobStore
diagnostics DiagnosticsServer
@ -323,7 +328,7 @@ func (s *server) Create(ctx context.Context, req *CreateRequest) (*CreateRespons
return rsp, err
}
rsp.ResourceVersion, err = s.store.WriteEvent(ctx, event)
rsp.ResourceVersion, err = s.backend.WriteEvent(ctx, event)
if err == nil {
rsp.Value = event.Value // with mutated fields
} else {
@ -370,7 +375,7 @@ func (s *server) Update(ctx context.Context, req *UpdateRequest) (*UpdateRespons
return rsp, nil
}
latest, err := s.store.Read(ctx, &ReadRequest{
latest, err := s.backend.Read(ctx, &ReadRequest{
Key: req.Key,
})
if err != nil {
@ -399,7 +404,7 @@ func (s *server) Update(ctx context.Context, req *UpdateRequest) (*UpdateRespons
event.Type = WatchEvent_MODIFIED
event.PreviousRV = latest.ResourceVersion
rsp.ResourceVersion, err = s.store.WriteEvent(ctx, event)
rsp.ResourceVersion, err = s.backend.WriteEvent(ctx, event)
rsp.Status, err = errToStatus(err)
if err == nil {
rsp.Value = event.Value // with mutated fields
@ -422,7 +427,7 @@ func (s *server) Delete(ctx context.Context, req *DeleteRequest) (*DeleteRespons
return nil, apierrors.NewBadRequest("update must include the previous version")
}
latest, err := s.store.Read(ctx, &ReadRequest{
latest, err := s.backend.Read(ctx, &ReadRequest{
Key: req.Key,
})
if err != nil {
@ -468,7 +473,7 @@ func (s *server) Delete(ctx context.Context, req *DeleteRequest) (*DeleteRespons
fmt.Sprintf("unable creating deletion marker, %v", err))
}
rsp.ResourceVersion, err = s.store.WriteEvent(ctx, event)
rsp.ResourceVersion, err = s.backend.WriteEvent(ctx, event)
rsp.Status, err = errToStatus(err)
return rsp, err
}
@ -487,7 +492,7 @@ func (s *server) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, err
return &ReadResponse{Status: status}, nil
}
rsp, err := s.store.Read(ctx, req)
rsp, err := s.backend.Read(ctx, req)
if err != nil {
if rsp == nil {
rsp = &ReadResponse{}
@ -502,7 +507,7 @@ func (s *server) List(ctx context.Context, req *ListRequest) (*ListResponse, err
return nil, err
}
rsp, err := s.store.List(ctx, req)
rsp, err := s.backend.PrepareList(ctx, req)
// Status???
return rsp, err
}
@ -510,7 +515,7 @@ func (s *server) List(ctx context.Context, req *ListRequest) (*ListResponse, err
func (s *server) initWatcher() error {
var err error
s.broadcaster, err = NewBroadcaster(s.ctx, func(out chan<- *WrittenEvent) error {
events, err := s.store.WatchWriteEvents(s.ctx)
events, err := s.backend.WatchWriteEvents(s.ctx)
if err != nil {
return err
}
@ -595,7 +600,7 @@ func (s *server) PutBlob(ctx context.Context, req *PutBlobRequest) (*PutBlobResp
}
func (s *server) getPartialObject(ctx context.Context, key *ResourceKey, rv int64) (utils.GrafanaMetaAccessor, *StatusResult) {
rsp, err := s.store.Read(ctx, &ReadRequest{
rsp, err := s.backend.Read(ctx, &ReadRequest{
Key: key,
ResourceVersion: rv,
})

View File

@ -41,13 +41,13 @@ func TestSimpleServer(t *testing.T) {
fmt.Printf("ROOT: %s\n\n", tmp)
}
store, err := NewCDKAppendingStore(ctx, CDKAppenderOptions{
store, err := NewCDKBackend(ctx, CDKBackendOptions{
Bucket: bucket,
})
require.NoError(t, err)
server, err := NewResourceServer(ResourceServerOptions{
Store: store,
Backend: store,
})
require.NoError(t, err)

View File

@ -43,7 +43,7 @@ func ProvideSQLResourceServer(db db.EntityDBInterface, tracer tracing.Tracer) (r
return resource.NewResourceServer(resource.ResourceServerOptions{
Tracer: tracer,
Store: store,
Backend: store,
Diagnostics: store,
Lifecycle: store,
})
@ -152,7 +152,7 @@ func (s *sqlResourceStore) Read(ctx context.Context, req *resource.ReadRequest)
return nil, ErrNotImplementedYet
}
func (s *sqlResourceStore) List(ctx context.Context, req *resource.ListRequest) (*resource.ListResponse, error) {
func (s *sqlResourceStore) PrepareList(ctx context.Context, req *resource.ListRequest) (*resource.ListResponse, error) {
_, span := s.tracer.Start(ctx, "storage_server.List")
defer span.End()