more body prep

This commit is contained in:
Ryan McKinley 2024-06-13 22:12:55 +03:00
parent 1540c98fe1
commit 79aed54c2c
2 changed files with 108 additions and 33 deletions

View File

@ -3,6 +3,7 @@ package utils
import (
"fmt"
"reflect"
"strconv"
"time"
"k8s.io/apimachinery/pkg/api/meta"
@ -52,6 +53,8 @@ type GrafanaMetaAccessor interface {
metav1.Object
GetGroupVersionKind() schema.GroupVersionKind
GetResourceVersionInt64() (int64, error)
SetResourceVersionInt64(int64)
GetUpdatedTimestamp() (*time.Time, error)
SetUpdatedTimestamp(v *time.Time)
@ -101,7 +104,7 @@ func MetaAccessor(raw interface{}) (GrafanaMetaAccessor, error) {
return nil, err
}
// look for Spec.Title or Spec.Name
// reflection to find title and other non object properties
r := reflect.ValueOf(raw)
if r.Kind() == reflect.Ptr || r.Kind() == reflect.Interface {
r = r.Elem()
@ -109,8 +112,16 @@ func MetaAccessor(raw interface{}) (GrafanaMetaAccessor, error) {
return &grafanaMetaAccessor{raw, obj, r}, nil
}
func (m *grafanaMetaAccessor) Object() metav1.Object {
return m.obj
func (m *grafanaMetaAccessor) GetResourceVersionInt64() (int64, error) {
v := m.obj.GetResourceVersion()
if v == "" {
return 0, nil
}
return strconv.ParseInt(v, 10, 64)
}
func (m *grafanaMetaAccessor) SetResourceVersionInt64(rv int64) {
m.obj.SetResourceVersion(strconv.FormatInt(rv, 10))
}
func (m *grafanaMetaAccessor) SetAnnotation(key string, val string) {

View File

@ -13,6 +13,8 @@ import (
"io"
"reflect"
"strconv"
"strings"
"time"
grpcCodes "google.golang.org/grpc/codes"
grpcStatus "google.golang.org/grpc/status"
@ -22,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/storage"
@ -29,6 +32,8 @@ import (
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
"k8s.io/klog/v2"
"github.com/grafana/grafana/pkg/apimachinery/utils"
"github.com/grafana/grafana/pkg/infra/appcontext"
"github.com/grafana/grafana/pkg/storage/unified/resource"
)
@ -72,12 +77,9 @@ func NewStorage(
}, nil, nil
}
func errorWrap(err error, status *resource.StatusResult) error {
if err != nil {
return err
}
func errorWrap(status *resource.StatusResult) error {
if status != nil {
return &apierrors.StatusError{metav1.Status{
return &apierrors.StatusError{ErrStatus: metav1.Status{
Status: metav1.StatusFailure,
Code: status.Code,
Reason: metav1.StatusReason(status.Reason),
@ -116,11 +118,27 @@ func (s *Storage) Create(ctx context.Context, _ string, obj runtime.Object, out
return err
}
user, err := appcontext.User(ctx)
if err != nil {
return err
}
err = s.Versioner().PrepareObjectForStorage(obj)
if err != nil {
return err
}
meta, err := utils.MetaAccessor(obj)
if err != nil {
return err
}
meta.SetCreatedBy(user.GetUID().String())
origin, err := meta.GetOriginInfo()
if err != nil {
return err
}
meta.SetOriginInfo(origin)
var buf bytes.Buffer
err = s.codec.Encode(obj, &buf)
if err != nil {
@ -135,7 +153,10 @@ func (s *Storage) Create(ctx context.Context, _ string, obj runtime.Object, out
// TODO?? blob from context?
rsp, err := s.store.Create(ctx, cmd)
err = errorWrap(err, rsp.Status)
if err != nil {
return err
}
err = errorWrap(rsp.Status)
if err != nil {
return err
}
@ -144,13 +165,17 @@ func (s *Storage) Create(ctx context.Context, _ string, obj runtime.Object, out
return fmt.Errorf("error in status %+v", rsp.Status)
}
// Copy the output bits
out = obj
after, err := meta.Accessor(out)
// Create into the out value
// can we copy it?
_, _, err = s.codec.Decode(cmd.Value, nil, out)
if err != nil {
return err
}
after.SetResourceVersion(strconv.FormatInt(rsp.ResourceVersion, 10))
after, err := utils.MetaAccessor(out)
if err != nil {
return err
}
after.SetResourceVersionInt64(rsp.ResourceVersion)
return nil
}
@ -183,7 +208,10 @@ func (s *Storage) Delete(ctx context.Context, _ string, out runtime.Object, prec
}
rsp, err := s.store.Delete(ctx, cmd)
err = errorWrap(err, rsp.Status)
if err != nil {
return err
}
err = errorWrap(rsp.Status)
if err != nil {
return err
}
@ -239,7 +267,7 @@ func (s *Storage) Watch(ctx context.Context, _ string, opts storage.ListOptions)
return watch.NewStreamWatcher(decoder, reporter), nil
}
// Get unmarshals object found at key into objPtr. On a not found error, will either
// Get decodes object found at key into objPtr. On a not found error, will either
// return a zero object of the requested type, or an error, depending on 'opts.ignoreNotFound'.
// Treats empty responses and nil response nodes exactly like a not found error.
// The returned contents may be delayed, but it is guaranteed that they will
@ -251,7 +279,10 @@ func (s *Storage) Get(ctx context.Context, _ string, opts storage.GetOptions, ob
}
rsp, err := s.store.Read(ctx, &resource.ReadRequest{Key: key, IgnoreBlob: true})
err = errorWrap(err, rsp.Status)
if err != nil {
return err
}
err = errorWrap(rsp.Status)
if err != nil {
return err
}
@ -260,11 +291,11 @@ func (s *Storage) Get(ctx context.Context, _ string, opts storage.GetOptions, ob
if err != nil {
return err
}
obj, err := meta.Accessor(objPtr)
obj, err := utils.MetaAccessor(objPtr)
if err != nil {
return err
}
obj.SetResourceVersion(strconv.FormatInt(rsp.ResourceVersion, 10))
obj.SetResourceVersionInt64(rsp.ResourceVersion)
return nil
}
@ -290,7 +321,23 @@ func toListRequest(ctx context.Context, opts storage.ListOptions) (*resource.Lis
for _, r := range requirements {
v := r.Key()
if v == SortByKey {
if r.Operator() != selection.Equals {
return nil, predicate, apierrors.NewBadRequest("invalid sort operation // " + r.String())
}
parts := strings.Split(v, " ")
if len(parts) != 2 {
return nil, predicate, apierrors.NewBadRequest("invalid sort operation // " + r.String())
}
sort := &resource.Sort{Field: parts[0]}
switch parts[1] {
case "ASC":
sort.Order = resource.Sort_ASC
case "DESC":
sort.Order = resource.Sort_DESC
default:
return nil, predicate, apierrors.NewBadRequest("invalid sort order // " + r.String())
}
req.Sort = append(req.Sort, sort)
// TODO! Must update the predicate!
continue
}
@ -310,6 +357,7 @@ func toListRequest(ctx context.Context, opts storage.ListOptions) (*resource.Lis
}
req.ResourceVersion = rv
}
switch opts.ResourceVersionMatch {
case metav1.ResourceVersionMatchNotOlderThan:
req.VersionMatch = resource.ResourceVersionMatch_NotOlderThan
@ -357,11 +405,11 @@ func (s *Storage) GetList(ctx context.Context, _ string, opts storage.ListOption
if err != nil {
return err
}
obj, err := meta.Accessor(tmp)
obj, err := utils.MetaAccessor(tmp)
if err != nil {
return err
}
obj.SetResourceVersion(strconv.FormatInt(item.ResourceVersion, 10))
obj.SetResourceVersionInt64(item.ResourceVersion)
// apply any predicates not handled in storage
matches, err := predicate.Matches(tmp)
@ -416,6 +464,11 @@ func (s *Storage) GuaranteedUpdate(
return err
}
user, err := appcontext.User(ctx)
if err != nil {
return err
}
// Get the current version
err = s.Get(ctx, "<ignored>", storage.GetOptions{}, destination)
if err != nil {
@ -427,7 +480,7 @@ func (s *Storage) GuaranteedUpdate(
}
}
accessor, err := meta.Accessor(destination)
accessor, err := utils.MetaAccessor(destination)
if err != nil {
return err
}
@ -467,10 +520,18 @@ func (s *Storage) GuaranteedUpdate(
)
}
accessor, err = meta.Accessor(updatedObj)
now := time.Now()
accessor, err = utils.MetaAccessor(updatedObj)
if err != nil {
return apierrors.NewInternalError(err)
}
accessor.SetUpdatedTimestamp(&now)
accessor.SetUpdatedBy(user.GetUID().String())
origin, err := accessor.GetOriginInfo()
if err != nil {
return err
}
accessor.SetOriginInfo(origin)
var buf bytes.Buffer
err = s.codec.Encode(updatedObj, &buf)
@ -481,11 +542,14 @@ func (s *Storage) GuaranteedUpdate(
req := &resource.UpdateRequest{Key: key, Value: buf.Bytes()}
// TODO... message
rsp, err := s.store.Update(ctx, req)
err = errorWrap(err, rsp.Status)
if err != nil {
return err
}
accessor.SetResourceVersion(strconv.FormatInt(rsp.ResourceVersion, 10))
err = errorWrap(rsp.Status)
if err != nil {
return err
}
accessor.SetResourceVersionInt64(rsp.ResourceVersion)
return nil
}
@ -513,11 +577,11 @@ type Decoder struct {
func (d *Decoder) toObject(w *resource.ResourceWrapper) (runtime.Object, error) {
obj, _, err := d.codec.Decode(w.Value, nil, d.newFunc())
if err == nil {
accessor, err := meta.Accessor(obj)
accessor, err := utils.MetaAccessor(obj)
if err != nil {
return nil, err
}
accessor.SetResourceVersion(strconv.FormatInt(w.ResourceVersion, 10))
accessor.SetResourceVersionInt64(w.ResourceVersion)
}
return obj, err
}
@ -554,13 +618,13 @@ decode:
obj := d.newFunc()
// here k8s expects an empty object with just resource version and k8s.io/initial-events-end annotation
accessor, err := meta.Accessor(obj)
accessor, err := utils.MetaAccessor(obj)
if err != nil {
klog.Errorf("error getting object accessor: %s", err)
return watch.Error, nil, err
}
accessor.SetResourceVersion(strconv.FormatInt(resp.Resource.ResourceVersion, 10))
accessor.SetResourceVersionInt64(resp.Resource.ResourceVersion)
accessor.SetAnnotations(map[string]string{"k8s.io/initial-events-end": "true"})
return watch.Bookmark, obj, nil
}
@ -624,12 +688,12 @@ decode:
// here k8s expects the previous object but with the new resource version
obj = prevObj
accessor, err := meta.Accessor(obj)
accessor, err := utils.MetaAccessor(obj)
if err != nil {
klog.Errorf("error getting object accessor: %s", err)
return watch.Error, nil, err
}
accessor.SetResourceVersion(strconv.FormatInt(resp.Previous.ResourceVersion, 10))
accessor.SetResourceVersionInt64(resp.Previous.ResourceVersion)
} else if !prevMatches {
// if the object didn't previously match, send an Added event
watchAction = watch.Added
@ -641,13 +705,13 @@ decode:
// if we have a previous object, return that in the deleted event
if resp.Previous != nil {
// here k8s expects the previous object but with the new resource version
accessor, err := meta.Accessor(obj)
accessor, err := utils.MetaAccessor(obj)
if err != nil {
klog.Errorf("error getting object accessor: %s", err)
return watch.Error, nil, err
}
accessor.SetResourceVersion(strconv.FormatInt(resp.Previous.ResourceVersion, 10))
accessor.SetResourceVersionInt64(resp.Previous.ResourceVersion)
}
// apply any predicates not handled in storage