grafana/pkg/services/grafana-apiserver/storage/file/file.go
2023-11-17 14:20:54 -05:00

523 lines
15 KiB
Go

// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/kubernetes-sigs/apiserver-runtime/blob/main/pkg/experimental/storage/filepath/jsonfile_rest.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Kubernetes Authors.
package file
import (
"context"
"errors"
"fmt"
"path/filepath"
"reflect"
"strings"
"time"
"github.com/bwmarrin/snowflake"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
"k8s.io/client-go/tools/cache"
)
const MaxUpdateAttempts = 30
var _ storage.Interface = (*Storage)(nil)
// Replace with: https://github.com/kubernetes/kubernetes/blob/v1.29.0-alpha.3/staging/src/k8s.io/apiserver/pkg/storage/errors.go#L28
// When we upgrade to 1.29
var errResourceVersionSetOnCreate = errors.New("resourceVersion should not be set on objects to be created")
// Storage implements storage.Interface and storage resources as JSON files on disk.
type Storage struct {
root string
gr schema.GroupResource
codec runtime.Codec
keyFunc func(obj runtime.Object) (string, error)
newFunc func() runtime.Object
newListFunc func() runtime.Object
getAttrsFunc storage.AttrFunc
trigger storage.IndexerFuncs
indexers *cache.Indexers
watchSet *WatchSet
}
// ErrFileNotExists means the file doesn't actually exist.
var ErrFileNotExists = fmt.Errorf("file doesn't exist")
// ErrNamespaceNotExists means the directory for the namespace doesn't actually exist.
var ErrNamespaceNotExists = errors.New("namespace does not exist")
func getResourceVersion() (*uint64, error) {
node, err := snowflake.NewNode(1)
if err != nil {
return nil, err
}
snowflakeNumber := node.Generate().Int64()
resourceVersion := uint64(snowflakeNumber)
return &resourceVersion, nil
}
// NewStorage instantiates a new Storage.
func NewStorage(
config *storagebackend.ConfigForResource,
resourcePrefix string,
keyFunc func(obj runtime.Object) (string, error),
newFunc func() runtime.Object,
newListFunc func() runtime.Object,
getAttrsFunc storage.AttrFunc,
trigger storage.IndexerFuncs,
indexers *cache.Indexers,
) (storage.Interface, factory.DestroyFunc, error) {
if err := ensureDir(resourcePrefix); err != nil {
return nil, func() {}, fmt.Errorf("could not establish a writable directory at path=%s", resourcePrefix)
}
ws := NewWatchSet()
return &Storage{
root: resourcePrefix,
gr: config.GroupResource,
codec: config.Codec,
keyFunc: keyFunc,
newFunc: newFunc,
newListFunc: newListFunc,
getAttrsFunc: getAttrsFunc,
trigger: trigger,
indexers: indexers,
watchSet: ws,
}, func() {
ws.cleanupWatchers()
}, nil
}
// Returns Versioner associated with this storage.
func (s *Storage) Versioner() storage.Versioner {
return &storage.APIObjectVersioner{}
}
// Create adds a new object at a key unless it already exists. 'ttl' is time-to-live
// in seconds (0 means forever). If no error is returned and out is not nil, out will be
// set to the read value from database.
func (s *Storage) Create(ctx context.Context, key string, obj runtime.Object, out runtime.Object, ttl uint64) error {
filename := s.filePath(key)
if exists(filename) {
return storage.NewKeyExistsError(key, 0)
}
dirname := filepath.Dir(filename)
if err := ensureDir(dirname); err != nil {
return err
}
generatedRV, err := getResourceVersion()
if err != nil {
return err
}
metaObj, err := meta.Accessor(obj)
if err != nil {
return err
}
metaObj.SetSelfLink("")
if metaObj.GetResourceVersion() != "" {
return errResourceVersionSetOnCreate
}
if err := s.Versioner().UpdateObject(obj, *generatedRV); err != nil {
return err
}
if err := writeFile(s.codec, filename, obj); err != nil {
return err
}
// set a timer to delete the file after ttl seconds
if ttl > 0 {
time.AfterFunc(time.Second*time.Duration(ttl), func() {
if err := s.Delete(ctx, key, s.newFunc(), &storage.Preconditions{}, func(ctx context.Context, obj runtime.Object) error { return nil }, obj); err != nil {
panic(err)
}
})
}
if err := s.Get(ctx, key, storage.GetOptions{}, out); err != nil {
return err
}
s.watchSet.notifyWatchers(watch.Event{
Object: out.DeepCopyObject(),
Type: watch.Added,
})
return nil
}
// Delete removes the specified key and returns the value that existed at that spot.
// If key didn't exist, it will return NotFound storage error.
// If 'cachedExistingObject' is non-nil, it can be used as a suggestion about the
// current version of the object to avoid read operation from storage to get it.
// However, the implementations have to retry in case suggestion is stale.
func (s *Storage) Delete(
ctx context.Context,
key string,
out runtime.Object,
preconditions *storage.Preconditions,
validateDeletion storage.ValidateObjectFunc,
cachedExistingObject runtime.Object,
) error {
filename := s.filePath(key)
var currentState runtime.Object
var stateIsCurrent bool
if cachedExistingObject != nil {
currentState = cachedExistingObject
} else {
getOptions := storage.GetOptions{}
if preconditions != nil && preconditions.ResourceVersion != nil {
getOptions.ResourceVersion = *preconditions.ResourceVersion
}
if err := s.Get(ctx, key, getOptions, currentState); err == nil {
stateIsCurrent = true
}
}
for {
if preconditions != nil {
if err := preconditions.Check(key, out); err != nil {
if stateIsCurrent {
return err
}
// If the state is not current, we need to re-read the state and try again.
if err := s.Get(ctx, key, storage.GetOptions{}, currentState); err != nil {
return err
}
stateIsCurrent = true
continue
}
}
if err := validateDeletion(ctx, out); err != nil {
if stateIsCurrent {
return err
}
// If the state is not current, we need to re-read the state and try again.
if err := s.Get(ctx, key, storage.GetOptions{}, currentState); err == nil {
stateIsCurrent = true
}
continue
}
if err := s.Get(ctx, key, storage.GetOptions{}, out); err != nil {
return err
}
generatedRV, err := getResourceVersion()
if err != nil {
return err
}
if err := s.Versioner().UpdateObject(out, *generatedRV); err != nil {
return err
}
if err := deleteFile(filename); err != nil {
return err
}
s.watchSet.notifyWatchers(watch.Event{
Object: out.DeepCopyObject(),
Type: watch.Deleted,
})
return nil
}
}
// Watch begins watching the specified key. Events are decoded into API objects,
// and any items selected by 'p' are sent down to returned watch.Interface.
// resourceVersion may be used to specify what version to begin watching,
// which should be the current resourceVersion, and no longer rv+1
// (e.g. reconnecting without missing any updates).
// If resource version is "0", this interface will get current object at given key
// and send it in an "ADDED" event, before watch starts.
func (s *Storage) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
p := opts.Predicate
jw := s.watchSet.newWatch()
listObj := s.newListFunc()
if opts.ResourceVersion == "0" {
err := s.GetList(ctx, key, opts, listObj)
if err != nil {
return nil, err
}
}
initEvents := make([]watch.Event, 0)
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return nil, err
}
v, err := conversion.EnforcePtr(listPtr)
if err != nil {
return nil, err
}
if v.IsNil() {
jw.Start(p, initEvents)
return jw, nil
}
for _, obj := range v.Elem().Interface().([]runtime.Object) {
initEvents = append(initEvents, watch.Event{
Type: watch.Added,
Object: obj.DeepCopyObject(),
})
}
jw.Start(p, initEvents)
return jw, nil
}
// Get unmarshals object found at key into objPtr. On a not found error, will either
// return a zero object of the requested type, or an error, depending on 'opts.ignoreNotFound'.
// Treats empty responses and nil response nodes exactly like a not found error.
// The returned contents may be delayed, but it is guaranteed that they will
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
func (s *Storage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
filename := s.filePath(key)
obj, err := readFile(s.codec, filename, func() runtime.Object {
return objPtr
})
if err != nil {
if opts.IgnoreNotFound {
return runtime.SetZeroValue(objPtr)
}
rv, err := s.Versioner().ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return err
}
return storage.NewKeyNotFoundError(key, int64(rv))
}
currentVersion, err := s.Versioner().ObjectResourceVersion(obj)
if err != nil {
return err
}
if err = s.validateMinimumResourceVersion(opts.ResourceVersion, currentVersion); err != nil {
return err
}
return nil
}
// GetList unmarshalls objects found at key into a *List api object (an object
// that satisfies runtime.IsList definition).
// If 'opts.Recursive' is false, 'key' is used as an exact match. If `opts.Recursive'
// is true, 'key' is used as a prefix.
// The returned contents may be delayed, but it is guaranteed that they will
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
func (s *Storage) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
generatedRV, err := getResourceVersion()
if err != nil {
return err
}
remainingItems := int64(0)
if err := s.Versioner().UpdateList(listObj, *generatedRV, "", &remainingItems); err != nil {
return err
}
if opts.ResourceVersion != "" {
resourceVersionInt, err := s.Versioner().ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return err
}
if err := s.Versioner().UpdateList(listObj, resourceVersionInt, "", &remainingItems); err != nil {
return err
}
}
objs, err := readDirRecursive(s.codec, key, s.newFunc)
if err != nil {
return err
}
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err
}
v, err := conversion.EnforcePtr(listPtr)
if err != nil {
return err
}
for _, obj := range objs {
currentVersion, err := s.Versioner().ObjectResourceVersion(obj)
if err != nil {
return err
}
if err = s.validateMinimumResourceVersion(opts.ResourceVersion, currentVersion); err != nil {
continue
}
ok, err := opts.Predicate.Matches(obj)
if err == nil && ok {
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
}
}
return nil
}
// GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'destination')
// retrying the update until success if there is index conflict.
// Note that object passed to tryUpdate may change across invocations of tryUpdate() if
// other writers are simultaneously updating it, so tryUpdate() needs to take into account
// the current contents of the object when deciding how the update object should look.
// If the key doesn't exist, it will return NotFound storage error if ignoreNotFound=false
// else `destination` will be set to the zero value of it's type.
// If the eventual successful invocation of `tryUpdate` returns an output with the same serialized
// contents as the input, it won't perform any update, but instead set `destination` to an object with those
// contents.
// If 'cachedExistingObject' is non-nil, it can be used as a suggestion about the
// current version of the object to avoid read operation from storage to get it.
// However, the implementations have to retry in case suggestion is stale.
func (s *Storage) GuaranteedUpdate(
ctx context.Context,
key string,
destination runtime.Object,
ignoreNotFound bool,
preconditions *storage.Preconditions,
tryUpdate storage.UpdateFunc,
cachedExistingObject runtime.Object,
) error {
var res storage.ResponseMeta
for attempt := 1; attempt <= MaxUpdateAttempts; attempt = attempt + 1 {
var (
filename = s.filePath(key)
obj runtime.Object
err error
created bool
)
if !exists(filename) && !ignoreNotFound {
return apierrors.NewNotFound(s.gr, s.nameFromKey(key))
}
obj, err = readFile(s.codec, filename, s.newFunc)
if err != nil {
// fallback to new object if the file is not found
obj = s.newFunc()
created = true
}
if err := preconditions.Check(key, obj); err != nil {
if attempt >= MaxUpdateAttempts {
return fmt.Errorf("precondition failed: %w", err)
}
continue
}
updatedObj, _, err := tryUpdate(obj, res)
if err != nil {
if attempt >= MaxUpdateAttempts {
return err
}
continue
}
unchanged, err := isUnchanged(s.codec, obj, updatedObj)
if err != nil {
return err
}
if unchanged {
u, err := conversion.EnforcePtr(updatedObj)
if err != nil {
return fmt.Errorf("unable to enforce updated object pointer: %w", err)
}
d, err := conversion.EnforcePtr(destination)
if err != nil {
return fmt.Errorf("unable to enforce destination pointer: %w", err)
}
d.Set(u)
return nil
}
generatedRV, err := getResourceVersion()
if err != nil {
return err
}
if err := s.Versioner().UpdateObject(updatedObj, *generatedRV); err != nil {
return err
}
if err := writeFile(s.codec, filename, updatedObj); err != nil {
return err
}
eventType := watch.Modified
if created {
eventType = watch.Added
}
s.watchSet.notifyWatchers(watch.Event{
Object: updatedObj.DeepCopyObject(),
Type: eventType,
})
}
return nil
}
// Count returns number of different entries under the key (generally being path prefix).
func (s *Storage) Count(key string) (int64, error) {
return 0, nil
}
// RequestWatchProgress requests the a watch stream progress status be sent in the
// watch response stream as soon as possible.
// Used for monitor watch progress even if watching resources with no changes.
//
// If watch is lagging, progress status might:
// * be pointing to stale resource version. Use etcd KV request to get linearizable resource version.
// * not be delivered at all. It's recommended to poll request progress periodically.
//
// Note: Only watches with matching context grpc metadata will be notified.
// https://github.com/kubernetes/kubernetes/blob/9325a57125e8502941d1b0c7379c4bb80a678d5c/vendor/go.etcd.io/etcd/client/v3/watch.go#L1037-L1042
//
// TODO: Remove when storage.Interface will be separate from etc3.store.
// Deprecated: Added temporarily to simplify exposing RequestProgress for watch cache.
func (s *Storage) RequestWatchProgress(ctx context.Context) error {
return nil
}
// validateMinimumResourceVersion returns a 'too large resource' version error when the provided minimumResourceVersion is
// greater than the most recent actualRevision available from storage.
func (s *Storage) validateMinimumResourceVersion(minimumResourceVersion string, actualRevision uint64) error {
if minimumResourceVersion == "" {
return nil
}
minimumRV, err := s.Versioner().ParseResourceVersion(minimumResourceVersion)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
}
// Enforce the storage.Interface guarantee that the resource version of the returned data
// "will be at least 'resourceVersion'".
if minimumRV > actualRevision {
return storage.NewTooLargeResourceVersionError(minimumRV, actualRevision, 0)
}
return nil
}
func (s *Storage) nameFromKey(key string) string {
return strings.Replace(key, s.root+"/", "", 1)
}