mirror of
https://github.com/grafana/grafana.git
synced 2025-01-27 00:37:04 -06:00
61f3d08c3f
* Add origin keys to List request * Check origin keys requirement in the sql entity server's List method * Refactor mode 2 List * Check origin keys before making Storage List call --------- Co-authored-by: Dan Cech <dcech@grafana.com>
798 lines
22 KiB
Go
798 lines
22 KiB
Go
// SPDX-License-Identifier: AGPL-3.0-only
|
|
// Provenance-includes-location: https://github.com/kubernetes-sigs/apiserver-runtime/blob/main/pkg/experimental/storage/filepath/jsonfile_rest.go
|
|
// Provenance-includes-license: Apache-2.0
|
|
// Provenance-includes-copyright: The Kubernetes Authors.
|
|
|
|
package entity
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"reflect"
|
|
"strconv"
|
|
|
|
grpcCodes "google.golang.org/grpc/codes"
|
|
grpcStatus "google.golang.org/grpc/status"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
"k8s.io/apimachinery/pkg/api/meta"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/conversion"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
"k8s.io/apimachinery/pkg/selection"
|
|
"k8s.io/apimachinery/pkg/watch"
|
|
"k8s.io/apiserver/pkg/endpoints/request"
|
|
"k8s.io/apiserver/pkg/storage"
|
|
"k8s.io/apiserver/pkg/storage/storagebackend"
|
|
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
|
|
"k8s.io/klog/v2"
|
|
|
|
entityStore "github.com/grafana/grafana/pkg/services/store/entity"
|
|
)
|
|
|
|
var _ storage.Interface = (*Storage)(nil)
|
|
|
|
// Storage implements storage.Interface and stores resources in unified storage
|
|
type Storage struct {
|
|
config *storagebackend.ConfigForResource
|
|
store entityStore.EntityStoreClient
|
|
gr schema.GroupResource
|
|
codec runtime.Codec
|
|
keyFunc func(obj runtime.Object) (string, error)
|
|
newFunc func() runtime.Object
|
|
newListFunc func() runtime.Object
|
|
getAttrsFunc storage.AttrFunc
|
|
// trigger storage.IndexerFuncs
|
|
// indexers *cache.Indexers
|
|
}
|
|
|
|
func NewStorage(
|
|
config *storagebackend.ConfigForResource,
|
|
gr schema.GroupResource,
|
|
store entityStore.EntityStoreClient,
|
|
codec runtime.Codec,
|
|
keyFunc func(obj runtime.Object) (string, error),
|
|
newFunc func() runtime.Object,
|
|
newListFunc func() runtime.Object,
|
|
getAttrsFunc storage.AttrFunc,
|
|
) (storage.Interface, factory.DestroyFunc, error) {
|
|
return &Storage{
|
|
config: config,
|
|
gr: gr,
|
|
codec: codec,
|
|
store: store,
|
|
keyFunc: keyFunc,
|
|
newFunc: newFunc,
|
|
newListFunc: newListFunc,
|
|
getAttrsFunc: getAttrsFunc,
|
|
}, nil, nil
|
|
}
|
|
|
|
// Create adds a new object at a key unless it already exists. 'ttl' is time-to-live
|
|
// in seconds (0 means forever). If no error is returned and out is not nil, out will be
|
|
// set to the read value from database.
|
|
func (s *Storage) Create(ctx context.Context, key string, obj runtime.Object, out runtime.Object, ttl uint64) error {
|
|
requestInfo, ok := request.RequestInfoFrom(ctx)
|
|
if !ok {
|
|
return apierrors.NewInternalError(fmt.Errorf("could not get request info"))
|
|
}
|
|
|
|
if err := s.Versioner().PrepareObjectForStorage(obj); err != nil {
|
|
return err
|
|
}
|
|
|
|
e, err := resourceToEntity(obj, requestInfo, s.codec)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
req := &entityStore.CreateEntityRequest{
|
|
Entity: e,
|
|
}
|
|
|
|
rsp, err := s.store.Create(ctx, req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if rsp.Status != entityStore.CreateEntityResponse_CREATED {
|
|
return fmt.Errorf("this was not a create operation... (%s)", rsp.Status.String())
|
|
}
|
|
|
|
err = entityToResource(rsp.Entity, out, s.codec)
|
|
if err != nil {
|
|
return apierrors.NewInternalError(err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Delete removes the specified key and returns the value that existed at that spot.
|
|
// If key didn't exist, it will return NotFound storage error.
|
|
// If 'cachedExistingObject' is non-nil, it can be used as a suggestion about the
|
|
// current version of the object to avoid read operation from storage to get it.
|
|
// However, the implementations have to retry in case suggestion is stale.
|
|
func (s *Storage) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error {
|
|
requestInfo, ok := request.RequestInfoFrom(ctx)
|
|
if !ok {
|
|
return apierrors.NewInternalError(fmt.Errorf("could not get request info"))
|
|
}
|
|
|
|
k := &entityStore.Key{
|
|
Group: requestInfo.APIGroup,
|
|
Resource: requestInfo.Resource,
|
|
Namespace: requestInfo.Namespace,
|
|
Name: requestInfo.Name,
|
|
Subresource: requestInfo.Subresource,
|
|
}
|
|
|
|
previousVersion := int64(0)
|
|
if preconditions != nil && preconditions.ResourceVersion != nil {
|
|
previousVersion, _ = strconv.ParseInt(*preconditions.ResourceVersion, 10, 64)
|
|
}
|
|
|
|
rsp, err := s.store.Delete(ctx, &entityStore.DeleteEntityRequest{
|
|
Key: k.String(),
|
|
PreviousVersion: previousVersion,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = entityToResource(rsp.Entity, out, s.codec)
|
|
if err != nil {
|
|
return apierrors.NewInternalError(err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Watch begins watching the specified key. Events are decoded into API objects,
|
|
// and any items selected by 'p' are sent down to returned watch.Interface.
|
|
// resourceVersion may be used to specify what version to begin watching,
|
|
// which should be the current resourceVersion, and no longer rv+1
|
|
// (e.g. reconnecting without missing any updates).
|
|
// If resource version is "0", this interface will get current object at given key
|
|
// and send it in an "ADDED" event, before watch starts.
|
|
func (s *Storage) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
|
|
requestInfo, ok := request.RequestInfoFrom(ctx)
|
|
if !ok {
|
|
return nil, apierrors.NewInternalError(fmt.Errorf("could not get request info"))
|
|
}
|
|
|
|
k := &entityStore.Key{
|
|
Group: requestInfo.APIGroup,
|
|
Resource: requestInfo.Resource,
|
|
Namespace: requestInfo.Namespace,
|
|
Name: requestInfo.Name,
|
|
Subresource: requestInfo.Subresource,
|
|
}
|
|
|
|
if opts.Predicate.Field != nil {
|
|
// check for metadata.name field selector
|
|
if v, ok := opts.Predicate.Field.RequiresExactMatch("metadata.name"); ok && k.Name == "" {
|
|
// just watch the specific key if we have a name field selector
|
|
k.Name = v
|
|
}
|
|
|
|
// check for metadata.namespace field selector
|
|
if v, ok := opts.Predicate.Field.RequiresExactMatch("metadata.namespace"); ok && k.Namespace == "" {
|
|
// just watch the specific namespace if we have a namespace field selector
|
|
k.Namespace = v
|
|
}
|
|
}
|
|
|
|
// translate grafana.app/* label selectors into field requirements
|
|
requirements, newSelector, err := ReadLabelSelectors(opts.Predicate.Label)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Update the selector to remove the unneeded requirements
|
|
opts.Predicate.Label = newSelector
|
|
|
|
// if we got a listHistory label selector, watch the specified resource
|
|
if requirements.ListHistory != "" {
|
|
if k.Name != "" && k.Name != requirements.ListHistory {
|
|
return nil, apierrors.NewBadRequest("name field selector does not match listHistory")
|
|
}
|
|
k.Name = requirements.ListHistory
|
|
}
|
|
|
|
req := &entityStore.EntityWatchRequest{
|
|
Action: entityStore.EntityWatchRequest_START,
|
|
Key: []string{
|
|
k.String(),
|
|
},
|
|
Labels: map[string]string{},
|
|
WithBody: true,
|
|
WithStatus: true,
|
|
SendInitialEvents: false,
|
|
AllowWatchBookmarks: opts.Predicate.AllowWatchBookmarks,
|
|
}
|
|
|
|
if opts.ResourceVersion != "" {
|
|
rv, err := strconv.ParseInt(opts.ResourceVersion, 10, 64)
|
|
if err != nil {
|
|
return nil, apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %s", opts.ResourceVersion))
|
|
}
|
|
|
|
req.Since = rv
|
|
}
|
|
|
|
if opts.SendInitialEvents == nil && req.Since == 0 {
|
|
req.SendInitialEvents = true
|
|
} else if opts.SendInitialEvents != nil {
|
|
req.SendInitialEvents = *opts.SendInitialEvents
|
|
}
|
|
|
|
if requirements.Folder != nil {
|
|
req.Folder = *requirements.Folder
|
|
}
|
|
|
|
// translate "equals" label selectors to storage label conditions
|
|
labelRequirements, selectable := opts.Predicate.Label.Requirements()
|
|
if !selectable {
|
|
return nil, apierrors.NewBadRequest("label selector is not selectable")
|
|
}
|
|
|
|
for _, r := range labelRequirements {
|
|
if r.Operator() == selection.Equals {
|
|
req.Labels[r.Key()] = r.Values().List()[0]
|
|
}
|
|
}
|
|
|
|
client, err := s.store.Watch(ctx)
|
|
if err != nil {
|
|
// if the context was canceled, just return a new empty watch
|
|
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, io.EOF) {
|
|
return watch.NewEmptyWatch(), nil
|
|
}
|
|
|
|
return nil, err
|
|
}
|
|
|
|
err = client.Send(req)
|
|
if err != nil {
|
|
err2 := client.CloseSend()
|
|
if err2 != nil {
|
|
klog.Errorf("watch close failed: %s\n", err2)
|
|
}
|
|
|
|
// if the context was canceled, just return a new empty watch
|
|
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, io.EOF) {
|
|
return watch.NewEmptyWatch(), nil
|
|
}
|
|
|
|
return nil, err
|
|
}
|
|
|
|
reporter := apierrors.NewClientErrorReporter(500, "WATCH", "")
|
|
|
|
decoder := &Decoder{
|
|
client: client,
|
|
newFunc: s.newFunc,
|
|
opts: opts,
|
|
codec: s.codec,
|
|
}
|
|
|
|
w := watch.NewStreamWatcher(decoder, reporter)
|
|
|
|
return w, nil
|
|
}
|
|
|
|
// Get unmarshals object found at key into objPtr. On a not found error, will either
|
|
// return a zero object of the requested type, or an error, depending on 'opts.ignoreNotFound'.
|
|
// Treats empty responses and nil response nodes exactly like a not found error.
|
|
// The returned contents may be delayed, but it is guaranteed that they will
|
|
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
|
|
func (s *Storage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
|
|
requestInfo, ok := request.RequestInfoFrom(ctx)
|
|
if !ok {
|
|
return apierrors.NewInternalError(fmt.Errorf("could not get request info"))
|
|
}
|
|
|
|
k := &entityStore.Key{
|
|
Group: requestInfo.APIGroup,
|
|
Resource: requestInfo.Resource,
|
|
Namespace: requestInfo.Namespace,
|
|
Name: requestInfo.Name,
|
|
Subresource: requestInfo.Subresource,
|
|
}
|
|
|
|
resourceVersion := int64(0)
|
|
var err error
|
|
if opts.ResourceVersion != "" {
|
|
resourceVersion, err = strconv.ParseInt(opts.ResourceVersion, 10, 64)
|
|
if err != nil {
|
|
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %s", opts.ResourceVersion))
|
|
}
|
|
}
|
|
|
|
rsp, err := s.store.Read(ctx, &entityStore.ReadEntityRequest{
|
|
Key: k.String(),
|
|
WithBody: true,
|
|
WithStatus: true,
|
|
ResourceVersion: resourceVersion,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if rsp.Key == "" {
|
|
if opts.IgnoreNotFound {
|
|
return nil
|
|
}
|
|
|
|
return apierrors.NewNotFound(s.gr, k.Name)
|
|
}
|
|
|
|
err = entityToResource(rsp, objPtr, s.codec)
|
|
if err != nil {
|
|
return apierrors.NewInternalError(err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetList unmarshalls objects found at key into a *List api object (an object
|
|
// that satisfies runtime.IsList definition).
|
|
// If 'opts.Recursive' is false, 'key' is used as an exact match. If `opts.Recursive'
|
|
// is true, 'key' is used as a prefix.
|
|
// The returned contents may be delayed, but it is guaranteed that they will
|
|
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
|
|
func (s *Storage) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
|
requestInfo, ok := request.RequestInfoFrom(ctx)
|
|
if !ok {
|
|
return apierrors.NewInternalError(fmt.Errorf("could not get request info"))
|
|
}
|
|
|
|
k := &entityStore.Key{
|
|
Group: requestInfo.APIGroup,
|
|
Resource: requestInfo.Resource,
|
|
Namespace: requestInfo.Namespace,
|
|
Name: requestInfo.Name,
|
|
Subresource: requestInfo.Subresource,
|
|
}
|
|
|
|
listPtr, err := meta.GetItemsPtr(listObj)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
v, err := conversion.EnforcePtr(listPtr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// translate grafana.app/* label selectors into field requirements
|
|
requirements, newSelector, err := ReadLabelSelectors(opts.Predicate.Label)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Update the selector to remove the unneeded requirements
|
|
opts.Predicate.Label = newSelector
|
|
|
|
if requirements.ListHistory != "" {
|
|
k.Name = requirements.ListHistory
|
|
|
|
req := &entityStore.EntityHistoryRequest{
|
|
Key: k.String(),
|
|
WithBody: true,
|
|
WithStatus: true,
|
|
NextPageToken: opts.Predicate.Continue,
|
|
Limit: opts.Predicate.Limit,
|
|
Sort: requirements.SortBy,
|
|
}
|
|
|
|
rsp, err := s.store.History(ctx, req)
|
|
if err != nil {
|
|
return apierrors.NewInternalError(err)
|
|
}
|
|
|
|
for _, r := range rsp.Versions {
|
|
res := s.newFunc()
|
|
|
|
err := entityToResource(r, res, s.codec)
|
|
if err != nil {
|
|
return apierrors.NewInternalError(err)
|
|
}
|
|
|
|
// apply any predicates not handled in storage
|
|
matches, err := opts.Predicate.Matches(res)
|
|
if err != nil {
|
|
return apierrors.NewInternalError(err)
|
|
}
|
|
if !matches {
|
|
continue
|
|
}
|
|
|
|
v.Set(reflect.Append(v, reflect.ValueOf(res).Elem()))
|
|
}
|
|
|
|
listAccessor, err := meta.ListAccessor(listObj)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if rsp.NextPageToken != "" {
|
|
listAccessor.SetContinue(rsp.NextPageToken)
|
|
}
|
|
|
|
listAccessor.SetResourceVersion(strconv.FormatInt(rsp.ResourceVersion, 10))
|
|
|
|
return nil
|
|
}
|
|
|
|
req := &entityStore.EntityListRequest{
|
|
Key: []string{
|
|
k.String(),
|
|
},
|
|
WithBody: true,
|
|
WithStatus: true,
|
|
NextPageToken: opts.Predicate.Continue,
|
|
Limit: opts.Predicate.Limit,
|
|
Labels: map[string]string{},
|
|
}
|
|
|
|
if requirements.Folder != nil {
|
|
req.Folder = *requirements.Folder
|
|
}
|
|
if len(requirements.SortBy) > 0 {
|
|
req.Sort = requirements.SortBy
|
|
}
|
|
if len(requirements.ListOriginKeys) > 0 {
|
|
req.OriginKeys = requirements.ListOriginKeys
|
|
}
|
|
if requirements.ListDeleted {
|
|
req.Deleted = true
|
|
}
|
|
|
|
// translate "equals" label selectors to storage label conditions
|
|
labelRequirements, selectable := opts.Predicate.Label.Requirements()
|
|
if !selectable {
|
|
return apierrors.NewBadRequest("label selector is not selectable")
|
|
}
|
|
|
|
for _, r := range labelRequirements {
|
|
if r.Operator() == selection.Equals {
|
|
req.Labels[r.Key()] = r.Values().List()[0]
|
|
}
|
|
}
|
|
|
|
rsp, err := s.store.List(ctx, req)
|
|
if err != nil {
|
|
return apierrors.NewInternalError(err)
|
|
}
|
|
|
|
for _, r := range rsp.Results {
|
|
res := s.newFunc()
|
|
|
|
err := entityToResource(r, res, s.codec)
|
|
if err != nil {
|
|
return apierrors.NewInternalError(err)
|
|
}
|
|
|
|
// apply any predicates not handled in storage
|
|
matches, err := opts.Predicate.Matches(res)
|
|
if err != nil {
|
|
return apierrors.NewInternalError(err)
|
|
}
|
|
if !matches {
|
|
continue
|
|
}
|
|
|
|
v.Set(reflect.Append(v, reflect.ValueOf(res).Elem()))
|
|
}
|
|
|
|
listAccessor, err := meta.ListAccessor(listObj)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if rsp.NextPageToken != "" {
|
|
listAccessor.SetContinue(rsp.NextPageToken)
|
|
}
|
|
|
|
listAccessor.SetResourceVersion(strconv.FormatInt(rsp.ResourceVersion, 10))
|
|
|
|
return nil
|
|
}
|
|
|
|
// GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'destination')
|
|
// retrying the update until success if there is index conflict.
|
|
// Note that object passed to tryUpdate may change across invocations of tryUpdate() if
|
|
// other writers are simultaneously updating it, so tryUpdate() needs to take into account
|
|
// the current contents of the object when deciding how the update object should look.
|
|
// If the key doesn't exist, it will return NotFound storage error if ignoreNotFound=false
|
|
// else `destination` will be set to the zero value of it's type.
|
|
// If the eventual successful invocation of `tryUpdate` returns an output with the same serialized
|
|
// contents as the input, it won't perform any update, but instead set `destination` to an object with those
|
|
// contents.
|
|
// If 'cachedExistingObject' is non-nil, it can be used as a suggestion about the
|
|
// current version of the object to avoid read operation from storage to get it.
|
|
// However, the implementations have to retry in case suggestion is stale.
|
|
func (s *Storage) GuaranteedUpdate(
|
|
ctx context.Context,
|
|
key string,
|
|
destination runtime.Object,
|
|
ignoreNotFound bool,
|
|
preconditions *storage.Preconditions,
|
|
tryUpdate storage.UpdateFunc,
|
|
cachedExistingObject runtime.Object,
|
|
) error {
|
|
requestInfo, ok := request.RequestInfoFrom(ctx)
|
|
if !ok {
|
|
return apierrors.NewInternalError(fmt.Errorf("could not get request info"))
|
|
}
|
|
|
|
k := &entityStore.Key{
|
|
Group: requestInfo.APIGroup,
|
|
Resource: requestInfo.Resource,
|
|
Namespace: requestInfo.Namespace,
|
|
Name: requestInfo.Name,
|
|
Subresource: requestInfo.Subresource,
|
|
}
|
|
|
|
getErr := s.Get(ctx, k.String(), storage.GetOptions{}, destination)
|
|
if getErr != nil {
|
|
if ignoreNotFound && apierrors.IsNotFound(getErr) {
|
|
// destination is already set to zero value
|
|
// we'll create the resource
|
|
} else {
|
|
return getErr
|
|
}
|
|
}
|
|
|
|
accessor, err := meta.Accessor(destination)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
previousVersion, _ := strconv.ParseInt(accessor.GetResourceVersion(), 10, 64)
|
|
if preconditions != nil && preconditions.ResourceVersion != nil {
|
|
previousVersion, _ = strconv.ParseInt(*preconditions.ResourceVersion, 10, 64)
|
|
}
|
|
|
|
res := &storage.ResponseMeta{}
|
|
updatedObj, _, err := tryUpdate(destination, *res)
|
|
if err != nil {
|
|
var statusErr *apierrors.StatusError
|
|
if errors.As(err, &statusErr) {
|
|
// For now, forbidden may come from a mutation handler
|
|
if statusErr.ErrStatus.Reason == metav1.StatusReasonForbidden {
|
|
return statusErr
|
|
}
|
|
}
|
|
|
|
return apierrors.NewInternalError(fmt.Errorf("could not successfully update object. key=%s, err=%s", k.String(), err.Error()))
|
|
}
|
|
|
|
e, err := resourceToEntity(updatedObj, requestInfo, s.codec)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// if we have a non-nil getErr, then we've ignored a not found error
|
|
if getErr != nil {
|
|
// object does not exist, create it
|
|
req := &entityStore.CreateEntityRequest{
|
|
Entity: e,
|
|
}
|
|
|
|
rsp, err := s.store.Create(ctx, req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = entityToResource(rsp.Entity, destination, s.codec)
|
|
if err != nil {
|
|
return apierrors.NewInternalError(err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// update the existing object
|
|
req := &entityStore.UpdateEntityRequest{
|
|
Entity: e,
|
|
PreviousVersion: previousVersion,
|
|
}
|
|
|
|
rsp, err := s.store.Update(ctx, req)
|
|
if err != nil {
|
|
return err // continue???
|
|
}
|
|
|
|
if rsp.Status == entityStore.UpdateEntityResponse_UNCHANGED {
|
|
return nil // destination is already set
|
|
}
|
|
|
|
err = entityToResource(rsp.Entity, destination, s.codec)
|
|
if err != nil {
|
|
return apierrors.NewInternalError(err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Count returns number of different entries under the key (generally being path prefix).
|
|
func (s *Storage) Count(key string) (int64, error) {
|
|
return 0, nil
|
|
}
|
|
|
|
func (s *Storage) Versioner() storage.Versioner {
|
|
return &storage.APIObjectVersioner{}
|
|
}
|
|
|
|
func (s *Storage) RequestWatchProgress(ctx context.Context) error {
|
|
return nil
|
|
}
|
|
|
|
type Decoder struct {
|
|
client entityStore.EntityStore_WatchClient
|
|
newFunc func() runtime.Object
|
|
opts storage.ListOptions
|
|
codec runtime.Codec
|
|
}
|
|
|
|
func (d *Decoder) Decode() (action watch.EventType, object runtime.Object, err error) {
|
|
decode:
|
|
for {
|
|
err := d.client.Context().Err()
|
|
if err != nil {
|
|
klog.Errorf("client: context error: %s\n", err)
|
|
return watch.Error, nil, err
|
|
}
|
|
|
|
resp, err := d.client.Recv()
|
|
if errors.Is(err, io.EOF) {
|
|
return watch.Error, nil, err
|
|
}
|
|
|
|
if grpcStatus.Code(err) == grpcCodes.Canceled {
|
|
return watch.Error, nil, err
|
|
}
|
|
|
|
if err != nil {
|
|
klog.Errorf("client: error receiving result: %s", err)
|
|
return watch.Error, nil, err
|
|
}
|
|
|
|
if resp.Entity == nil {
|
|
klog.Errorf("client: received nil entity\n")
|
|
continue decode
|
|
}
|
|
|
|
obj := d.newFunc()
|
|
|
|
if resp.Entity.Action == entityStore.Entity_BOOKMARK {
|
|
// here k8s expects an empty object with just resource version and k8s.io/initial-events-end annotation
|
|
accessor, err := meta.Accessor(obj)
|
|
if err != nil {
|
|
klog.Errorf("error getting object accessor: %s", err)
|
|
return watch.Error, nil, err
|
|
}
|
|
|
|
accessor.SetResourceVersion(fmt.Sprintf("%d", resp.Entity.ResourceVersion))
|
|
accessor.SetAnnotations(map[string]string{"k8s.io/initial-events-end": "true"})
|
|
return watch.Bookmark, obj, nil
|
|
}
|
|
|
|
err = entityToResource(resp.Entity, obj, d.codec)
|
|
if err != nil {
|
|
klog.Errorf("error decoding entity: %s", err)
|
|
return watch.Error, nil, err
|
|
}
|
|
|
|
var watchAction watch.EventType
|
|
switch resp.Entity.Action {
|
|
case entityStore.Entity_CREATED:
|
|
// apply any predicates not handled in storage
|
|
matches, err := d.opts.Predicate.Matches(obj)
|
|
if err != nil {
|
|
klog.Errorf("error matching object: %s", err)
|
|
return watch.Error, nil, err
|
|
}
|
|
if !matches {
|
|
continue decode
|
|
}
|
|
|
|
watchAction = watch.Added
|
|
case entityStore.Entity_UPDATED:
|
|
watchAction = watch.Modified
|
|
|
|
// apply any predicates not handled in storage
|
|
matches, err := d.opts.Predicate.Matches(obj)
|
|
if err != nil {
|
|
klog.Errorf("error matching object: %s", err)
|
|
return watch.Error, nil, err
|
|
}
|
|
|
|
// if we have a previous object, check if it matches
|
|
prevMatches := false
|
|
prevObj := d.newFunc()
|
|
if resp.Previous != nil {
|
|
err = entityToResource(resp.Previous, prevObj, d.codec)
|
|
if err != nil {
|
|
klog.Errorf("error decoding entity: %s", err)
|
|
return watch.Error, nil, err
|
|
}
|
|
|
|
// apply any predicates not handled in storage
|
|
prevMatches, err = d.opts.Predicate.Matches(prevObj)
|
|
if err != nil {
|
|
klog.Errorf("error matching object: %s", err)
|
|
return watch.Error, nil, err
|
|
}
|
|
}
|
|
|
|
if !matches {
|
|
if !prevMatches {
|
|
continue decode
|
|
}
|
|
|
|
// if the object didn't match, send a Deleted event
|
|
watchAction = watch.Deleted
|
|
|
|
// here k8s expects the previous object but with the new resource version
|
|
obj = prevObj
|
|
|
|
accessor, err := meta.Accessor(obj)
|
|
if err != nil {
|
|
klog.Errorf("error getting object accessor: %s", err)
|
|
return watch.Error, nil, err
|
|
}
|
|
|
|
accessor.SetResourceVersion(fmt.Sprintf("%d", resp.Entity.ResourceVersion))
|
|
} else if !prevMatches {
|
|
// if the object didn't previously match, send an Added event
|
|
watchAction = watch.Added
|
|
}
|
|
case entityStore.Entity_DELETED:
|
|
watchAction = watch.Deleted
|
|
|
|
// if we have a previous object, return that in the deleted event
|
|
if resp.Previous != nil {
|
|
err = entityToResource(resp.Previous, obj, d.codec)
|
|
if err != nil {
|
|
klog.Errorf("error decoding entity: %s", err)
|
|
return watch.Error, nil, err
|
|
}
|
|
|
|
// here k8s expects the previous object but with the new resource version
|
|
accessor, err := meta.Accessor(obj)
|
|
if err != nil {
|
|
klog.Errorf("error getting object accessor: %s", err)
|
|
return watch.Error, nil, err
|
|
}
|
|
|
|
accessor.SetResourceVersion(fmt.Sprintf("%d", resp.Entity.ResourceVersion))
|
|
}
|
|
|
|
// apply any predicates not handled in storage
|
|
matches, err := d.opts.Predicate.Matches(obj)
|
|
if err != nil {
|
|
klog.Errorf("error matching object: %s", err)
|
|
return watch.Error, nil, err
|
|
}
|
|
if !matches {
|
|
continue decode
|
|
}
|
|
default:
|
|
watchAction = watch.Error
|
|
}
|
|
|
|
return watchAction, obj, nil
|
|
}
|
|
}
|
|
|
|
func (d *Decoder) Close() {
|
|
err := d.client.CloseSend()
|
|
if err != nil {
|
|
klog.Errorf("error closing watch stream: %s", err)
|
|
}
|
|
}
|
|
|
|
var _ watch.Decoder = (*Decoder)(nil)
|