Storage: Use our own key format and support unnamespaced objects (#83929)

* use our own key format and support unnamespaced objects

* fix tests
This commit is contained in:
Dan Cech 2024-03-05 16:31:39 -05:00 committed by GitHub
parent 01fb2cff62
commit 7e4badff1d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 217 additions and 171 deletions

View File

@ -30,13 +30,10 @@ import (
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
entityStore "github.com/grafana/grafana/pkg/services/store/entity"
"github.com/grafana/grafana/pkg/util"
)
var _ storage.Interface = (*Storage)(nil)
const MaxUpdateAttempts = 1
// Storage implements storage.Interface and storage resources as JSON files on disk.
type Storage struct {
config *storagebackend.ConfigForResource
@ -88,25 +85,7 @@ func (s *Storage) Create(ctx context.Context, key string, obj runtime.Object, ou
return err
}
metaAccessor, err := meta.Accessor(obj)
if err != nil {
return err
}
// Replace the default name generation strategy
if metaAccessor.GetGenerateName() != "" {
k, err := entityStore.ParseKey(key)
if err != nil {
return err
}
k.Name = util.GenerateShortUID()
key = k.String()
metaAccessor.SetName(k.Name)
metaAccessor.SetGenerateName("")
}
e, err := resourceToEntity(key, obj, requestInfo, s.codec)
e, err := resourceToEntity(obj, requestInfo, s.codec)
if err != nil {
return err
}
@ -128,13 +107,6 @@ func (s *Storage) Create(ctx context.Context, key string, obj runtime.Object, ou
return apierrors.NewInternalError(err)
}
/*
s.watchSet.notifyWatchers(watch.Event{
Object: out.DeepCopyObject(),
Type: watch.Added,
})
*/
return nil
}
@ -144,13 +116,26 @@ func (s *Storage) Create(ctx context.Context, key string, obj runtime.Object, ou
// current version of the object to avoid read operation from storage to get it.
// However, the implementations have to retry in case suggestion is stale.
func (s *Storage) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error {
requestInfo, ok := request.RequestInfoFrom(ctx)
if !ok {
return apierrors.NewInternalError(fmt.Errorf("could not get request info"))
}
k := &entityStore.Key{
Group: requestInfo.APIGroup,
Resource: requestInfo.Resource,
Namespace: requestInfo.Namespace,
Name: requestInfo.Name,
Subresource: requestInfo.Subresource,
}
previousVersion := int64(0)
if preconditions != nil && preconditions.ResourceVersion != nil {
previousVersion, _ = strconv.ParseInt(*preconditions.ResourceVersion, 10, 64)
}
rsp, err := s.store.Delete(ctx, &entityStore.DeleteEntityRequest{
Key: key,
Key: k.String(),
PreviousVersion: previousVersion,
})
if err != nil {
@ -165,72 +150,6 @@ func (s *Storage) Delete(ctx context.Context, key string, out runtime.Object, pr
return nil
}
type Decoder struct {
client entityStore.EntityStore_WatchClient
newFunc func() runtime.Object
opts storage.ListOptions
codec runtime.Codec
}
func (d *Decoder) Decode() (action watch.EventType, object runtime.Object, err error) {
for {
resp, err := d.client.Recv()
if errors.Is(err, io.EOF) {
log.Printf("watch is done")
return watch.Error, nil, err
}
if grpcStatus.Code(err) == grpcCodes.Canceled {
log.Printf("watch was canceled")
return watch.Error, nil, err
}
if err != nil {
log.Printf("error receiving result: %s", err)
return watch.Error, nil, err
}
obj := d.newFunc()
err = entityToResource(resp.Entity, obj, d.codec)
if err != nil {
log.Printf("error decoding entity: %s", err)
return watch.Error, nil, err
}
// apply any predicates not handled in storage
var matches bool
matches, err = d.opts.Predicate.Matches(obj)
if err != nil {
log.Printf("error matching object: %s", err)
return watch.Error, nil, err
}
if !matches {
continue
}
var watchAction watch.EventType
switch resp.Entity.Action {
case entityStore.Entity_CREATED:
watchAction = watch.Added
case entityStore.Entity_UPDATED:
watchAction = watch.Modified
case entityStore.Entity_DELETED:
watchAction = watch.Deleted
default:
watchAction = watch.Error
}
return watchAction, obj, nil
}
}
func (d *Decoder) Close() {
_ = d.client.CloseSend()
}
var _ watch.Decoder = (*Decoder)(nil)
// Watch begins watching the specified key. Events are decoded into API objects,
// and any items selected by 'p' are sent down to returned watch.Interface.
// resourceVersion may be used to specify what version to begin watching,
@ -239,8 +158,23 @@ var _ watch.Decoder = (*Decoder)(nil)
// If resource version is "0", this interface will get current object at given key
// and send it in an "ADDED" event, before watch starts.
func (s *Storage) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
requestInfo, ok := request.RequestInfoFrom(ctx)
if !ok {
return nil, apierrors.NewInternalError(fmt.Errorf("could not get request info"))
}
k := &entityStore.Key{
Group: requestInfo.APIGroup,
Resource: requestInfo.Resource,
Namespace: requestInfo.Namespace,
Name: requestInfo.Name,
Subresource: requestInfo.Subresource,
}
req := &entityStore.EntityWatchRequest{
Key: []string{key},
Key: []string{
k.String(),
},
WithBody: true,
}
@ -278,6 +212,19 @@ func (s *Storage) Watch(ctx context.Context, key string, opts storage.ListOption
// The returned contents may be delayed, but it is guaranteed that they will
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
func (s *Storage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
requestInfo, ok := request.RequestInfoFrom(ctx)
if !ok {
return apierrors.NewInternalError(fmt.Errorf("could not get request info"))
}
k := &entityStore.Key{
Group: requestInfo.APIGroup,
Resource: requestInfo.Resource,
Namespace: requestInfo.Namespace,
Name: requestInfo.Name,
Subresource: requestInfo.Subresource,
}
resourceVersion := int64(0)
var err error
if opts.ResourceVersion != "" {
@ -288,7 +235,7 @@ func (s *Storage) Get(ctx context.Context, key string, opts storage.GetOptions,
}
rsp, err := s.store.Read(ctx, &entityStore.ReadEntityRequest{
Key: key,
Key: k.String(),
WithBody: true,
WithStatus: true,
ResourceVersion: resourceVersion,
@ -302,7 +249,7 @@ func (s *Storage) Get(ctx context.Context, key string, opts storage.GetOptions,
return nil
}
return apierrors.NewNotFound(s.gr, key)
return apierrors.NewNotFound(s.gr, k.Name)
}
err = entityToResource(rsp, objPtr, s.codec)
@ -320,6 +267,19 @@ func (s *Storage) Get(ctx context.Context, key string, opts storage.GetOptions,
// The returned contents may be delayed, but it is guaranteed that they will
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
func (s *Storage) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
requestInfo, ok := request.RequestInfoFrom(ctx)
if !ok {
return apierrors.NewInternalError(fmt.Errorf("could not get request info"))
}
k := &entityStore.Key{
Group: requestInfo.APIGroup,
Resource: requestInfo.Resource,
Namespace: requestInfo.Namespace,
Name: requestInfo.Name,
Subresource: requestInfo.Subresource,
}
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err
@ -330,7 +290,9 @@ func (s *Storage) GetList(ctx context.Context, key string, opts storage.ListOpti
}
req := &entityStore.EntityListRequest{
Key: []string{key},
Key: []string{
k.String(),
},
WithBody: true,
WithStatus: true,
NextPageToken: opts.Predicate.Continue,
@ -425,33 +387,21 @@ func (s *Storage) GuaranteedUpdate(
preconditions *storage.Preconditions,
tryUpdate storage.UpdateFunc,
cachedExistingObject runtime.Object,
) error {
var err error
for attempt := 1; attempt <= MaxUpdateAttempts; attempt = attempt + 1 {
err = s.guaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, cachedExistingObject)
if err == nil {
return nil
}
}
return err
}
func (s *Storage) guaranteedUpdate(
ctx context.Context,
key string,
destination runtime.Object,
ignoreNotFound bool,
preconditions *storage.Preconditions,
tryUpdate storage.UpdateFunc,
cachedExistingObject runtime.Object,
) error {
requestInfo, ok := request.RequestInfoFrom(ctx)
if !ok {
return apierrors.NewInternalError(fmt.Errorf("could not get request info"))
}
err := s.Get(ctx, key, storage.GetOptions{}, destination)
k := &entityStore.Key{
Group: requestInfo.APIGroup,
Resource: requestInfo.Resource,
Namespace: requestInfo.Namespace,
Name: requestInfo.Name,
Subresource: requestInfo.Subresource,
}
err := s.Get(ctx, k.String(), storage.GetOptions{}, destination)
if err != nil {
return err
}
@ -476,10 +426,10 @@ func (s *Storage) guaranteedUpdate(
}
}
return apierrors.NewInternalError(fmt.Errorf("could not successfully update object. key=%s, err=%s", key, err.Error()))
return apierrors.NewInternalError(fmt.Errorf("could not successfully update object. key=%s, err=%s", k.String(), err.Error()))
}
e, err := resourceToEntity(key, updatedObj, requestInfo, s.codec)
e, err := resourceToEntity(updatedObj, requestInfo, s.codec)
if err != nil {
return err
}
@ -503,13 +453,6 @@ func (s *Storage) guaranteedUpdate(
return apierrors.NewInternalError(err)
}
/*
s.watchSet.notifyWatchers(watch.Event{
Object: destination.DeepCopyObject(),
Type: watch.Modified,
})
*/
return nil
}
@ -525,3 +468,69 @@ func (s *Storage) Versioner() storage.Versioner {
func (s *Storage) RequestWatchProgress(ctx context.Context) error {
return nil
}
type Decoder struct {
client entityStore.EntityStore_WatchClient
newFunc func() runtime.Object
opts storage.ListOptions
codec runtime.Codec
}
func (d *Decoder) Decode() (action watch.EventType, object runtime.Object, err error) {
for {
resp, err := d.client.Recv()
if errors.Is(err, io.EOF) {
log.Printf("watch is done")
return watch.Error, nil, err
}
if grpcStatus.Code(err) == grpcCodes.Canceled {
log.Printf("watch was canceled")
return watch.Error, nil, err
}
if err != nil {
log.Printf("error receiving result: %s", err)
return watch.Error, nil, err
}
obj := d.newFunc()
err = entityToResource(resp.Entity, obj, d.codec)
if err != nil {
log.Printf("error decoding entity: %s", err)
return watch.Error, nil, err
}
// apply any predicates not handled in storage
var matches bool
matches, err = d.opts.Predicate.Matches(obj)
if err != nil {
log.Printf("error matching object: %s", err)
return watch.Error, nil, err
}
if !matches {
continue
}
var watchAction watch.EventType
switch resp.Entity.Action {
case entityStore.Entity_CREATED:
watchAction = watch.Added
case entityStore.Entity_UPDATED:
watchAction = watch.Modified
case entityStore.Entity_DELETED:
watchAction = watch.Deleted
default:
watchAction = watch.Error
}
return watchAction, obj, nil
}
}
func (d *Decoder) Close() {
_ = d.client.CloseSend()
}
var _ watch.Decoder = (*Decoder)(nil)

View File

@ -19,7 +19,6 @@ import (
entityStore "github.com/grafana/grafana/pkg/services/store/entity"
)
// this is terrible... but just making it work!!!!
func entityToResource(rsp *entityStore.Entity, res runtime.Object, codec runtime.Codec) error {
var err error
@ -99,7 +98,7 @@ func entityToResource(rsp *entityStore.Entity, res runtime.Object, codec runtime
return nil
}
func resourceToEntity(key string, res runtime.Object, requestInfo *request.RequestInfo, codec runtime.Codec) (*entityStore.Entity, error) {
func resourceToEntity(res runtime.Object, requestInfo *request.RequestInfo, codec runtime.Codec) (*entityStore.Entity, error) {
metaAccessor, err := meta.Accessor(res)
if err != nil {
return nil, err
@ -111,14 +110,22 @@ func resourceToEntity(key string, res runtime.Object, requestInfo *request.Reque
}
rv, _ := strconv.ParseInt(metaAccessor.GetResourceVersion(), 10, 64)
k := &entityStore.Key{
Group: requestInfo.APIGroup,
Resource: requestInfo.Resource,
Namespace: requestInfo.Namespace,
Name: metaAccessor.GetName(),
Subresource: requestInfo.Subresource,
}
rsp := &entityStore.Entity{
Group: requestInfo.APIGroup,
Group: k.Group,
GroupVersion: requestInfo.APIVersion,
Resource: requestInfo.Resource,
Subresource: requestInfo.Subresource,
Namespace: metaAccessor.GetNamespace(),
Key: key,
Name: metaAccessor.GetName(),
Resource: k.Resource,
Subresource: k.Subresource,
Namespace: k.Namespace,
Key: k.String(),
Name: k.Name,
Guid: string(metaAccessor.GetUID()),
ResourceVersion: rv,
Folder: grafanaAccessor.GetFolder(),

View File

@ -24,22 +24,18 @@ func TestResourceToEntity(t *testing.T) {
updatedAt := createdAt.Add(time.Hour).Truncate(time.Second)
updatedAtStr := updatedAt.UTC().Format(time.RFC3339)
apiVersion := "v0alpha1"
requestInfo := &request.RequestInfo{
APIVersion: apiVersion,
}
Scheme := runtime.NewScheme()
Scheme.AddKnownTypes(v0alpha1.PlaylistResourceInfo.GroupVersion(), &v0alpha1.Playlist{})
Codecs := serializer.NewCodecFactory(Scheme)
testCases := []struct {
key string
requestInfo *request.RequestInfo
resource runtime.Object
codec runtime.Codec
expectedKey string
expectedGroupVersion string
expectedName string
expectedNamespace string
expectedTitle string
expectedGuid string
expectedVersion string
@ -55,11 +51,14 @@ func TestResourceToEntity(t *testing.T) {
expectedBody []byte
}{
{
key: "/playlist.grafana.app/playlists/default/test-uid",
requestInfo: &request.RequestInfo{
APIGroup: "playlist.grafana.app",
APIVersion: "v0alpha1",
Resource: "playlists",
Namespace: "default",
Name: "test-name",
},
resource: &v0alpha1.Playlist{
TypeMeta: metav1.TypeMeta{
APIVersion: apiVersion,
},
ObjectMeta: metav1.ObjectMeta{
CreationTimestamp: createdAt,
Labels: map[string]string{"label1": "value1", "label2": "value2"},
@ -83,9 +82,10 @@ func TestResourceToEntity(t *testing.T) {
},
},
},
expectedKey: "/playlist.grafana.app/playlists/default/test-uid",
expectedGroupVersion: apiVersion,
expectedKey: "/playlist.grafana.app/playlists/namespaces/default/test-name",
expectedGroupVersion: "v0alpha1",
expectedName: "test-name",
expectedNamespace: "default",
expectedTitle: "A playlist",
expectedGuid: "test-uid",
expectedVersion: "1",
@ -104,10 +104,11 @@ func TestResourceToEntity(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.resource.GetObjectKind().GroupVersionKind().Kind+" to entity conversion should succeed", func(t *testing.T) {
entity, err := resourceToEntity(tc.key, tc.resource, requestInfo, Codecs.LegacyCodec(v0alpha1.PlaylistResourceInfo.GroupVersion()))
entity, err := resourceToEntity(tc.resource, tc.requestInfo, Codecs.LegacyCodec(v0alpha1.PlaylistResourceInfo.GroupVersion()))
require.NoError(t, err)
assert.Equal(t, tc.expectedKey, entity.Key)
assert.Equal(t, tc.expectedName, entity.Name)
assert.Equal(t, tc.expectedNamespace, entity.Namespace)
assert.Equal(t, tc.expectedTitle, entity.Title)
assert.Equal(t, tc.expectedGroupVersion, entity.GroupVersion)
assert.Equal(t, tc.expectedName, entity.Name)
@ -152,7 +153,7 @@ func TestEntityToResource(t *testing.T) {
}{
{
entity: &entityStore.Entity{
Key: "/playlist.grafana.app/playlists/default/test-uid",
Key: "/playlist.grafana.app/playlists/namespaces/default/test-uid",
GroupVersion: "v0alpha1",
Name: "test-uid",
Title: "A playlist",

View File

@ -14,10 +14,10 @@ type Key struct {
}
func ParseKey(key string) (*Key, error) {
// /<group>/<resource>/<namespace>(/<name>(/<subresource>))
parts := strings.SplitN(key, "/", 6)
if len(parts) < 4 {
return nil, fmt.Errorf("invalid key (expecting at least 3 parts): %s", key)
// /<group>/<resource>[/namespaces/<namespace>][/<name>[/<subresource>]]
parts := strings.Split(key, "/")
if len(parts) < 3 {
return nil, fmt.Errorf("invalid key (expecting at least 2 parts): %s", key)
}
if parts[0] != "" {
@ -25,24 +25,45 @@ func ParseKey(key string) (*Key, error) {
}
k := &Key{
Group: parts[1],
Resource: parts[2],
Namespace: parts[3],
Group: parts[1],
Resource: parts[2],
}
if len(parts) > 4 {
k.Name = parts[4]
if len(parts) == 3 {
return k, nil
}
if len(parts) > 5 {
k.Subresource = parts[5]
if parts[3] != "namespaces" {
k.Name = parts[3]
if len(parts) > 4 {
k.Subresource = strings.Join(parts[4:], "/")
}
return k, nil
}
if len(parts) < 5 {
return nil, fmt.Errorf("invalid key (expecting namespace after 'namespaces'): %s", key)
}
k.Namespace = parts[4]
if len(parts) == 5 {
return k, nil
}
k.Name = parts[5]
if len(parts) > 6 {
k.Subresource = strings.Join(parts[6:], "/")
}
return k, nil
}
func (k *Key) String() string {
s := "/" + k.Group + "/" + k.Resource + "/" + k.Namespace
s := "/" + k.Group + "/" + k.Resource
if len(k.Namespace) > 0 {
s += "/namespaces/" + k.Namespace
}
if len(k.Name) > 0 {
s += "/" + k.Name
if len(k.Subresource) > 0 {

View File

@ -1099,8 +1099,12 @@ func (s *sqlEntityServer) List(ctx context.Context, r *entity.EntityListRequest)
return nil, err
}
args = append(args, key.Namespace, key.Group, key.Resource)
whereclause := "(" + s.dialect.Quote("namespace") + "=? AND " + s.dialect.Quote("group") + "=? AND " + s.dialect.Quote("resource") + "=?"
args = append(args, key.Group, key.Resource)
whereclause := "(" + s.dialect.Quote("group") + "=? AND " + s.dialect.Quote("resource") + "=?"
if key.Namespace != "" {
args = append(args, key.Namespace)
whereclause += " AND " + s.dialect.Quote("namespace") + "=?"
}
if key.Name != "" {
args = append(args, key.Name)
whereclause += " AND " + s.dialect.Quote("name") + "=?"
@ -1257,8 +1261,12 @@ func (s *sqlEntityServer) watchInit(ctx context.Context, r *entity.EntityWatchRe
return err
}
args = append(args, key.Namespace, key.Group, key.Resource)
whereclause := "(" + s.dialect.Quote("namespace") + "=? AND " + s.dialect.Quote("group") + "=? AND " + s.dialect.Quote("resource") + "=?"
args = append(args, key.Group, key.Resource)
whereclause := "(" + s.dialect.Quote("group") + "=? AND " + s.dialect.Quote("resource") + "=?"
if key.Namespace != "" {
args = append(args, key.Namespace)
whereclause += " AND " + s.dialect.Quote("namespace") + "=?"
}
if key.Name != "" {
args = append(args, key.Name)
whereclause += " AND " + s.dialect.Quote("name") + "=?"
@ -1465,7 +1473,7 @@ func watchMatches(r *entity.EntityWatchRequest, result *entity.Entity) bool {
return false
}
if key.Namespace == result.Namespace && key.Group == result.Group && key.Resource == result.Resource && (key.Name == "" || key.Name == result.Name) {
if key.Group == result.Group && key.Resource == result.Resource && (key.Namespace == "" || key.Namespace == result.Namespace) && (key.Name == "" || key.Name == result.Name) {
matched = true
break
}

View File

@ -34,7 +34,7 @@ func TestCreate(t *testing.T) {
Resource: "playlists",
Namespace: "default",
Name: "set-minimum-uid",
Key: "/playlist.grafana.app/playlists/default/set-minimum-uid",
Key: "/playlist.grafana.app/playlists/namespaces/default/set-minimum-uid",
CreatedBy: "set-minimum-creator",
Origin: &entity.EntityOriginInfo{},
},
@ -44,7 +44,7 @@ func TestCreate(t *testing.T) {
{
"request with no entity creator",
&entity.Entity{
Key: "/playlist.grafana.app/playlists/default/set-only-key",
Key: "/playlist.grafana.app/playlists/namespaces/default/set-only-key",
},
true,
false,