mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
K8s: file storage - add sync around resource version (RV) management (#84694)
This commit is contained in:
parent
d084595211
commit
3c42a2efd2
5
Makefile
5
Makefile
@ -174,11 +174,12 @@ run-frontend: deps-js ## Fetch js dependencies and watch frontend for rebuild
|
|||||||
.PHONY: test-go
|
.PHONY: test-go
|
||||||
test-go: test-go-unit test-go-integration
|
test-go: test-go-unit test-go-integration
|
||||||
|
|
||||||
|
### TODO: temporarily run only the failing test (fails in PR only)
|
||||||
.PHONY: test-go-unit
|
.PHONY: test-go-unit
|
||||||
test-go-unit: ## Run unit tests for backend with flags.
|
test-go-unit: ## Run unit tests for backend with flags.
|
||||||
@echo "test backend unit tests"
|
@echo "test backend unit tests"
|
||||||
go list -f '{{.Dir}}/...' -m | xargs \
|
go list -f '{{.Dir}}/...' ./pkg/apiserver/storage/file | xargs \
|
||||||
$(GO) test -short -covermode=atomic -timeout=30m
|
$(GO) test -short -covermode=atomic -timeout=30m
|
||||||
|
|
||||||
.PHONY: test-go-integration
|
.PHONY: test-go-integration
|
||||||
test-go-integration: ## Run integration tests for backend with flags.
|
test-go-integration: ## Run integration tests for backend with flags.
|
||||||
|
@ -26,6 +26,7 @@ import (
|
|||||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||||
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
|
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
|
"k8s.io/klog/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
const MaxUpdateAttempts = 30
|
const MaxUpdateAttempts = 30
|
||||||
@ -36,6 +37,13 @@ var _ storage.Interface = (*Storage)(nil)
|
|||||||
// When we upgrade to 1.29
|
// When we upgrade to 1.29
|
||||||
var errResourceVersionSetOnCreate = errors.New("resourceVersion should not be set on objects to be created")
|
var errResourceVersionSetOnCreate = errors.New("resourceVersion should not be set on objects to be created")
|
||||||
|
|
||||||
|
type parsedKey struct {
|
||||||
|
group string
|
||||||
|
resource string
|
||||||
|
namespace string
|
||||||
|
name string
|
||||||
|
}
|
||||||
|
|
||||||
// Storage implements storage.Interface and storage resources as JSON files on disk.
|
// Storage implements storage.Interface and storage resources as JSON files on disk.
|
||||||
type Storage struct {
|
type Storage struct {
|
||||||
root string
|
root string
|
||||||
@ -49,7 +57,14 @@ type Storage struct {
|
|||||||
trigger storage.IndexerFuncs
|
trigger storage.IndexerFuncs
|
||||||
indexers *cache.Indexers
|
indexers *cache.Indexers
|
||||||
|
|
||||||
watchSet *WatchSet
|
rvGenerationNode *snowflake.Node
|
||||||
|
// rvMutex provides synchronization between Get and GetList+Watch+CUD methods
|
||||||
|
// with access to resource version generation for the latter group
|
||||||
|
rvMutex sync.RWMutex
|
||||||
|
currentRV uint64
|
||||||
|
|
||||||
|
watchSet *WatchSet
|
||||||
|
versioner storage.Versioner
|
||||||
}
|
}
|
||||||
|
|
||||||
// ErrFileNotExists means the file doesn't actually exist.
|
// ErrFileNotExists means the file doesn't actually exist.
|
||||||
@ -58,25 +73,6 @@ var ErrFileNotExists = fmt.Errorf("file doesn't exist")
|
|||||||
// ErrNamespaceNotExists means the directory for the namespace doesn't actually exist.
|
// ErrNamespaceNotExists means the directory for the namespace doesn't actually exist.
|
||||||
var ErrNamespaceNotExists = errors.New("namespace does not exist")
|
var ErrNamespaceNotExists = errors.New("namespace does not exist")
|
||||||
|
|
||||||
var (
|
|
||||||
node *snowflake.Node
|
|
||||||
once sync.Once
|
|
||||||
)
|
|
||||||
|
|
||||||
func getResourceVersion() (*uint64, error) {
|
|
||||||
var err error
|
|
||||||
once.Do(func() {
|
|
||||||
node, err = snowflake.NewNode(1)
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
snowflakeNumber := node.Generate().Int64()
|
|
||||||
resourceVersion := uint64(snowflakeNumber)
|
|
||||||
return &resourceVersion, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewStorage instantiates a new Storage.
|
// NewStorage instantiates a new Storage.
|
||||||
func NewStorage(
|
func NewStorage(
|
||||||
config *storagebackend.ConfigForResource,
|
config *storagebackend.ConfigForResource,
|
||||||
@ -92,34 +88,59 @@ func NewStorage(
|
|||||||
if err := ensureDir(root); err != nil {
|
if err := ensureDir(root); err != nil {
|
||||||
return nil, func() {}, fmt.Errorf("could not establish a writable directory at path=%s", root)
|
return nil, func() {}, fmt.Errorf("could not establish a writable directory at path=%s", root)
|
||||||
}
|
}
|
||||||
ws := NewWatchSet()
|
|
||||||
return &Storage{
|
|
||||||
root: root,
|
|
||||||
resourcePrefix: resourcePrefix,
|
|
||||||
gr: config.GroupResource,
|
|
||||||
codec: config.Codec,
|
|
||||||
keyFunc: keyFunc,
|
|
||||||
newFunc: newFunc,
|
|
||||||
newListFunc: newListFunc,
|
|
||||||
getAttrsFunc: getAttrsFunc,
|
|
||||||
trigger: trigger,
|
|
||||||
indexers: indexers,
|
|
||||||
|
|
||||||
watchSet: ws,
|
rvGenerationNode, err := snowflake.NewNode(1)
|
||||||
}, func() {
|
if err != nil {
|
||||||
ws.cleanupWatchers()
|
return nil, nil, err
|
||||||
}, nil
|
}
|
||||||
|
|
||||||
|
s := &Storage{
|
||||||
|
root: root,
|
||||||
|
resourcePrefix: resourcePrefix,
|
||||||
|
gr: config.GroupResource,
|
||||||
|
codec: config.Codec,
|
||||||
|
keyFunc: keyFunc,
|
||||||
|
newFunc: newFunc,
|
||||||
|
newListFunc: newListFunc,
|
||||||
|
getAttrsFunc: getAttrsFunc,
|
||||||
|
trigger: trigger,
|
||||||
|
indexers: indexers,
|
||||||
|
|
||||||
|
rvGenerationNode: rvGenerationNode,
|
||||||
|
watchSet: NewWatchSet(),
|
||||||
|
|
||||||
|
versioner: &storage.APIObjectVersioner{},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize the RV stored in storage
|
||||||
|
s.getNewResourceVersion()
|
||||||
|
|
||||||
|
return s, func() {
|
||||||
|
s.watchSet.cleanupWatchers()
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Storage) getNewResourceVersion() uint64 {
|
||||||
|
snowflakeNumber := s.rvGenerationNode.Generate().Int64()
|
||||||
|
s.currentRV = uint64(snowflakeNumber)
|
||||||
|
return s.currentRV
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Storage) getCurrentResourceVersion() uint64 {
|
||||||
|
return s.currentRV
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns Versioner associated with this storage.
|
|
||||||
func (s *Storage) Versioner() storage.Versioner {
|
func (s *Storage) Versioner() storage.Versioner {
|
||||||
return &storage.APIObjectVersioner{}
|
return s.versioner
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create adds a new object at a key unless it already exists. 'ttl' is time-to-live
|
// Create adds a new object at a key unless it already exists. 'ttl' is time-to-live
|
||||||
// in seconds (0 means forever). If no error is returned and out is not nil, out will be
|
// in seconds (0 means forever). If no error is returned and out is not nil, out will be
|
||||||
// set to the read value from database.
|
// set to the read value from database.
|
||||||
func (s *Storage) Create(ctx context.Context, key string, obj runtime.Object, out runtime.Object, ttl uint64) error {
|
func (s *Storage) Create(ctx context.Context, key string, obj runtime.Object, out runtime.Object, ttl uint64) error {
|
||||||
|
s.rvMutex.Lock()
|
||||||
|
defer s.rvMutex.Unlock()
|
||||||
|
|
||||||
fpath := s.filePath(key)
|
fpath := s.filePath(key)
|
||||||
if exists(fpath) {
|
if exists(fpath) {
|
||||||
return storage.NewKeyExistsError(key, 0)
|
return storage.NewKeyExistsError(key, 0)
|
||||||
@ -130,10 +151,7 @@ func (s *Storage) Create(ctx context.Context, key string, obj runtime.Object, ou
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
generatedRV, err := getResourceVersion()
|
generatedRV := s.getNewResourceVersion()
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
metaObj, err := meta.Accessor(obj)
|
metaObj, err := meta.Accessor(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -144,7 +162,7 @@ func (s *Storage) Create(ctx context.Context, key string, obj runtime.Object, ou
|
|||||||
return errResourceVersionSetOnCreate
|
return errResourceVersionSetOnCreate
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.Versioner().UpdateObject(obj, *generatedRV); err != nil {
|
if err := s.versioner.UpdateObject(obj, generatedRV); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -168,7 +186,7 @@ func (s *Storage) Create(ctx context.Context, key string, obj runtime.Object, ou
|
|||||||
s.watchSet.notifyWatchers(watch.Event{
|
s.watchSet.notifyWatchers(watch.Event{
|
||||||
Object: out.DeepCopyObject(),
|
Object: out.DeepCopyObject(),
|
||||||
Type: watch.Added,
|
Type: watch.Added,
|
||||||
})
|
}, nil)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -186,6 +204,11 @@ func (s *Storage) Delete(
|
|||||||
validateDeletion storage.ValidateObjectFunc,
|
validateDeletion storage.ValidateObjectFunc,
|
||||||
cachedExistingObject runtime.Object,
|
cachedExistingObject runtime.Object,
|
||||||
) error {
|
) error {
|
||||||
|
// TODO: is it gonna be contentious
|
||||||
|
// Either way, this should have max attempts logic
|
||||||
|
s.rvMutex.Lock()
|
||||||
|
defer s.rvMutex.Unlock()
|
||||||
|
|
||||||
fpath := s.filePath(key)
|
fpath := s.filePath(key)
|
||||||
var currentState runtime.Object
|
var currentState runtime.Object
|
||||||
var stateIsCurrent bool
|
var stateIsCurrent bool
|
||||||
@ -233,11 +256,8 @@ func (s *Storage) Delete(
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
generatedRV, err := getResourceVersion()
|
generatedRV := s.getNewResourceVersion()
|
||||||
if err != nil {
|
if err := s.versioner.UpdateObject(out, generatedRV); err != nil {
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := s.Versioner().UpdateObject(out, *generatedRV); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -248,14 +268,14 @@ func (s *Storage) Delete(
|
|||||||
s.watchSet.notifyWatchers(watch.Event{
|
s.watchSet.notifyWatchers(watch.Event{
|
||||||
Object: out.DeepCopyObject(),
|
Object: out.DeepCopyObject(),
|
||||||
Type: watch.Deleted,
|
Type: watch.Deleted,
|
||||||
})
|
}, nil)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Watch begins watching the specified key. Events are decoded into API objects,
|
// Watch begins watching the specified key. Events are decoded into API objects,
|
||||||
// and any items selected by 'p' are sent down to returned watch.Interface.
|
// and any items selected by the predicate are sent down to returned watch.Interface.
|
||||||
// resourceVersion may be used to specify what version to begin watching,
|
// resourceVersion may be used to specify what version to begin watching,
|
||||||
// which should be the current resourceVersion, and no longer rv+1
|
// which should be the current resourceVersion, and no longer rv+1
|
||||||
// (e.g. reconnecting without missing any updates).
|
// (e.g. reconnecting without missing any updates).
|
||||||
@ -263,39 +283,102 @@ func (s *Storage) Delete(
|
|||||||
// and send it in an "ADDED" event, before watch starts.
|
// and send it in an "ADDED" event, before watch starts.
|
||||||
func (s *Storage) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
|
func (s *Storage) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
|
||||||
p := opts.Predicate
|
p := opts.Predicate
|
||||||
jw := s.watchSet.newWatch()
|
|
||||||
|
|
||||||
listObj := s.newListFunc()
|
listObj := s.newListFunc()
|
||||||
|
|
||||||
if opts.ResourceVersion == "0" {
|
// Parses to 0 for opts.ResourceVersion == 0
|
||||||
err := s.GetList(ctx, key, opts, listObj)
|
requestedRV, err := s.versioner.ParseResourceVersion(opts.ResourceVersion)
|
||||||
|
if err != nil {
|
||||||
|
return nil, apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
|
||||||
|
}
|
||||||
|
|
||||||
|
parsedkey, err := s.convertToParsedKey(key, p)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var namespace *string
|
||||||
|
if parsedkey.namespace != "" {
|
||||||
|
namespace = &parsedkey.namespace
|
||||||
|
}
|
||||||
|
|
||||||
|
if (opts.SendInitialEvents == nil && requestedRV == 0) || (opts.SendInitialEvents != nil && *opts.SendInitialEvents) {
|
||||||
|
if err := s.GetList(ctx, key, opts, listObj); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
listAccessor, err := meta.ListAccessor(listObj)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("could not determine new list accessor in watch")
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// Updated if requesting RV was either "0" or ""
|
||||||
|
maybeUpdatedRV, err := s.versioner.ParseResourceVersion(listAccessor.GetResourceVersion())
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("could not determine new list RV in watch")
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
jw := s.watchSet.newWatch(ctx, maybeUpdatedRV, p, s.versioner, namespace)
|
||||||
|
|
||||||
|
initEvents := make([]watch.Event, 0)
|
||||||
|
listPtr, err := meta.GetItemsPtr(listObj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
v, err := conversion.EnforcePtr(listPtr)
|
||||||
|
if err != nil || v.Kind() != reflect.Slice {
|
||||||
|
return nil, fmt.Errorf("need pointer to slice: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
initEvents := make([]watch.Event, 0)
|
for i := 0; i < v.Len(); i++ {
|
||||||
listPtr, err := meta.GetItemsPtr(listObj)
|
obj, ok := v.Index(i).Addr().Interface().(runtime.Object)
|
||||||
if err != nil {
|
if !ok {
|
||||||
return nil, err
|
return nil, fmt.Errorf("need item to be a runtime.Object: %v", err)
|
||||||
}
|
}
|
||||||
v, err := conversion.EnforcePtr(listPtr)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if v.IsNil() {
|
initEvents = append(initEvents, watch.Event{
|
||||||
jw.Start(p, initEvents)
|
Type: watch.Added,
|
||||||
|
Object: obj.DeepCopyObject(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.AllowWatchBookmarks && len(initEvents) > 0 {
|
||||||
|
lastInitEvent := initEvents[len(initEvents)-1]
|
||||||
|
lastItemRV, err := s.versioner.ObjectResourceVersion(lastInitEvent.Object)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("could not get last init event's revision for bookmark: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
bookmarkEvent := watch.Event{
|
||||||
|
Type: watch.Bookmark,
|
||||||
|
Object: s.newFunc(),
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.versioner.UpdateObject(bookmarkEvent.Object, lastItemRV); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
bookmarkObject, err := meta.Accessor(bookmarkEvent.Object)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("could not get bookmark object's acccesor: %v", err)
|
||||||
|
}
|
||||||
|
bookmarkObject.SetAnnotations(map[string]string{"k8s.io/initial-events-end": "true"})
|
||||||
|
initEvents = append(initEvents, bookmarkEvent)
|
||||||
|
}
|
||||||
|
|
||||||
|
jw.Start(initEvents...)
|
||||||
return jw, nil
|
return jw, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, obj := range v.Elem().Interface().([]runtime.Object) {
|
maybeUpdatedRV := requestedRV
|
||||||
initEvents = append(initEvents, watch.Event{
|
if maybeUpdatedRV == 0 {
|
||||||
Type: watch.Added,
|
s.rvMutex.RLock()
|
||||||
Object: obj.DeepCopyObject(),
|
maybeUpdatedRV = s.getCurrentResourceVersion()
|
||||||
})
|
s.rvMutex.RUnlock()
|
||||||
}
|
}
|
||||||
jw.Start(p, initEvents)
|
jw := s.watchSet.newWatch(ctx, maybeUpdatedRV, p, s.versioner, namespace)
|
||||||
|
|
||||||
|
jw.Start()
|
||||||
return jw, nil
|
return jw, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -305,6 +388,7 @@ func (s *Storage) Watch(ctx context.Context, key string, opts storage.ListOption
|
|||||||
// The returned contents may be delayed, but it is guaranteed that they will
|
// The returned contents may be delayed, but it is guaranteed that they will
|
||||||
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
|
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
|
||||||
func (s *Storage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
|
func (s *Storage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
|
||||||
|
// No RV generation locking in single item get since its read from the disk
|
||||||
fpath := s.filePath(key)
|
fpath := s.filePath(key)
|
||||||
|
|
||||||
// Since it's a get, check if the dir exists and return early as needed
|
// Since it's a get, check if the dir exists and return early as needed
|
||||||
@ -320,14 +404,14 @@ func (s *Storage) Get(ctx context.Context, key string, opts storage.GetOptions,
|
|||||||
if opts.IgnoreNotFound {
|
if opts.IgnoreNotFound {
|
||||||
return runtime.SetZeroValue(objPtr)
|
return runtime.SetZeroValue(objPtr)
|
||||||
}
|
}
|
||||||
rv, err := s.Versioner().ParseResourceVersion(opts.ResourceVersion)
|
rv, err := s.versioner.ParseResourceVersion(opts.ResourceVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return storage.NewKeyNotFoundError(key, int64(rv))
|
return storage.NewKeyNotFoundError(key, int64(rv))
|
||||||
}
|
}
|
||||||
|
|
||||||
currentVersion, err := s.Versioner().ObjectResourceVersion(obj)
|
currentVersion, err := s.versioner.ObjectResourceVersion(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -346,38 +430,53 @@ func (s *Storage) Get(ctx context.Context, key string, opts storage.GetOptions,
|
|||||||
// The returned contents may be delayed, but it is guaranteed that they will
|
// The returned contents may be delayed, but it is guaranteed that they will
|
||||||
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
|
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
|
||||||
func (s *Storage) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
func (s *Storage) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
||||||
generatedRV, err := getResourceVersion()
|
remainingItems := int64(0)
|
||||||
|
|
||||||
|
resourceVersionInt, err := s.versioner.ParseResourceVersion(opts.ResourceVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
remainingItems := int64(0)
|
|
||||||
if err := s.Versioner().UpdateList(listObj, *generatedRV, "", &remainingItems); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Watch is failing when set the list resourceVersion to 0, even though informers provide that in the opts
|
// read state protected by mutex
|
||||||
if opts.ResourceVersion == "0" {
|
objs, err := func() ([]runtime.Object, error) {
|
||||||
opts.ResourceVersion = ""
|
s.rvMutex.Lock()
|
||||||
}
|
defer s.rvMutex.Unlock()
|
||||||
|
|
||||||
if opts.ResourceVersion != "" {
|
if resourceVersionInt == 0 {
|
||||||
resourceVersionInt, err := s.Versioner().ParseResourceVersion(opts.ResourceVersion)
|
resourceVersionInt = s.getNewResourceVersion()
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
if err := s.Versioner().UpdateList(listObj, resourceVersionInt, "", &remainingItems); err != nil {
|
|
||||||
return err
|
var fpath string
|
||||||
|
dirpath := s.dirPath(key)
|
||||||
|
// Since it's a get, check if the dir exists and return early as needed
|
||||||
|
if !exists(dirpath) {
|
||||||
|
// We may have gotten the key targeting an individual item
|
||||||
|
dirpath = filepath.Dir(dirpath)
|
||||||
|
if !exists(dirpath) {
|
||||||
|
// ensure we return empty list in listObj instead of a not found error
|
||||||
|
return []runtime.Object{}, nil
|
||||||
|
}
|
||||||
|
fpath = s.filePath(key)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
dirpath := s.dirPath(key)
|
var objs []runtime.Object
|
||||||
// Since it's a get, check if the dir exists and return early as needed
|
if fpath != "" {
|
||||||
if !exists(dirpath) {
|
obj, err := readFile(s.codec, fpath, func() runtime.Object {
|
||||||
// ensure we return empty list in listObj insted of a not found error
|
return s.newFunc()
|
||||||
return nil
|
})
|
||||||
}
|
if err == nil {
|
||||||
|
objs = append(objs, obj)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
var err error
|
||||||
|
objs, err = readDirRecursive(s.codec, dirpath, s.newFunc)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return objs, nil
|
||||||
|
}()
|
||||||
|
|
||||||
objs, err := readDirRecursive(s.codec, dirpath, s.newFunc)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -392,21 +491,27 @@ func (s *Storage) GetList(ctx context.Context, key string, opts storage.ListOpti
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, obj := range objs {
|
for _, obj := range objs {
|
||||||
currentVersion, err := s.Versioner().ObjectResourceVersion(obj)
|
currentVersion, err := s.versioner.ObjectResourceVersion(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = s.validateMinimumResourceVersion(opts.ResourceVersion, currentVersion); err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
ok, err := opts.Predicate.Matches(obj)
|
ok, err := opts.Predicate.Matches(obj)
|
||||||
if err == nil && ok {
|
if err == nil && ok {
|
||||||
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
|
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
|
||||||
|
|
||||||
|
if err := s.validateMinimumResourceVersion(opts.ResourceVersion, currentVersion); err != nil {
|
||||||
|
// Below log left for debug. It's usually not an error condition
|
||||||
|
// klog.Infof("failed to assert minimum resource version constraint against list version")
|
||||||
|
continue
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := s.versioner.UpdateList(listObj, resourceVersionInt, "", &remainingItems); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -432,16 +537,17 @@ func (s *Storage) GuaranteedUpdate(
|
|||||||
tryUpdate storage.UpdateFunc,
|
tryUpdate storage.UpdateFunc,
|
||||||
cachedExistingObject runtime.Object,
|
cachedExistingObject runtime.Object,
|
||||||
) error {
|
) error {
|
||||||
var res storage.ResponseMeta
|
var (
|
||||||
for attempt := 1; attempt <= MaxUpdateAttempts; attempt = attempt + 1 {
|
res storage.ResponseMeta
|
||||||
var (
|
updatedObj runtime.Object
|
||||||
fpath = s.filePath(key)
|
objFromDisk runtime.Object
|
||||||
dirpath = filepath.Dir(fpath)
|
created bool
|
||||||
|
fpath = s.filePath(key)
|
||||||
|
dirpath = filepath.Dir(fpath)
|
||||||
|
)
|
||||||
|
|
||||||
obj runtime.Object
|
for attempt := 1; attempt <= MaxUpdateAttempts; attempt = attempt + 1 {
|
||||||
err error
|
var err error
|
||||||
created bool
|
|
||||||
)
|
|
||||||
|
|
||||||
if !exists(dirpath) {
|
if !exists(dirpath) {
|
||||||
if err := ensureDir(dirpath); err != nil {
|
if err := ensureDir(dirpath); err != nil {
|
||||||
@ -453,65 +559,90 @@ func (s *Storage) GuaranteedUpdate(
|
|||||||
return apierrors.NewNotFound(s.gr, s.nameFromKey(key))
|
return apierrors.NewNotFound(s.gr, s.nameFromKey(key))
|
||||||
}
|
}
|
||||||
|
|
||||||
obj, err = readFile(s.codec, fpath, s.newFunc)
|
objFromDisk, err = readFile(s.codec, fpath, s.newFunc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// fallback to new object if the file is not found
|
// fallback to new object if the file is not found
|
||||||
obj = s.newFunc()
|
objFromDisk = s.newFunc()
|
||||||
created = true
|
created = true
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := preconditions.Check(key, obj); err != nil {
|
if err := preconditions.Check(key, objFromDisk); err != nil {
|
||||||
if attempt >= MaxUpdateAttempts {
|
if attempt >= MaxUpdateAttempts {
|
||||||
return fmt.Errorf("precondition failed: %w", err)
|
return fmt.Errorf("precondition failed: %w", err)
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
updatedObj, _, err := tryUpdate(obj, res)
|
updatedObj, _, err = tryUpdate(objFromDisk, res)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if attempt >= MaxUpdateAttempts {
|
if attempt >= MaxUpdateAttempts {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
break
|
||||||
unchanged, err := isUnchanged(s.codec, obj, updatedObj)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if unchanged {
|
|
||||||
u, err := conversion.EnforcePtr(updatedObj)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("unable to enforce updated object pointer: %w", err)
|
|
||||||
}
|
|
||||||
d, err := conversion.EnforcePtr(destination)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("unable to enforce destination pointer: %w", err)
|
|
||||||
}
|
|
||||||
d.Set(u)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
generatedRV, err := getResourceVersion()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := s.Versioner().UpdateObject(updatedObj, *generatedRV); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := writeFile(s.codec, fpath, updatedObj); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
eventType := watch.Modified
|
|
||||||
if created {
|
|
||||||
eventType = watch.Added
|
|
||||||
}
|
|
||||||
s.watchSet.notifyWatchers(watch.Event{
|
|
||||||
Object: updatedObj.DeepCopyObject(),
|
|
||||||
Type: eventType,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
unchanged, err := isUnchanged(s.codec, objFromDisk, updatedObj)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if unchanged {
|
||||||
|
u, err := conversion.EnforcePtr(updatedObj)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to enforce updated object pointer: %w", err)
|
||||||
|
}
|
||||||
|
d, err := conversion.EnforcePtr(destination)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to enforce destination pointer: %w", err)
|
||||||
|
}
|
||||||
|
d.Set(u)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
s.rvMutex.Lock()
|
||||||
|
generatedRV := s.getNewResourceVersion()
|
||||||
|
if err != nil {
|
||||||
|
s.rvMutex.Unlock()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
s.rvMutex.Unlock()
|
||||||
|
|
||||||
|
if err := s.versioner.UpdateObject(updatedObj, generatedRV); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := writeFile(s.codec, fpath, updatedObj); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: make a helper for this and re-use
|
||||||
|
u, err := conversion.EnforcePtr(updatedObj)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to enforce updated object pointer: %w", err)
|
||||||
|
}
|
||||||
|
d, err := conversion.EnforcePtr(destination)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to enforce destination pointer: %w", err)
|
||||||
|
}
|
||||||
|
d.Set(u)
|
||||||
|
|
||||||
|
eventType := watch.Modified
|
||||||
|
if created {
|
||||||
|
eventType = watch.Added
|
||||||
|
s.watchSet.notifyWatchers(watch.Event{
|
||||||
|
Object: destination.DeepCopyObject(),
|
||||||
|
Type: eventType,
|
||||||
|
}, nil)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
s.watchSet.notifyWatchers(watch.Event{
|
||||||
|
Object: destination.DeepCopyObject(),
|
||||||
|
Type: eventType,
|
||||||
|
}, objFromDisk.DeepCopyObject())
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -533,7 +664,7 @@ func (s *Storage) Count(key string) (int64, error) {
|
|||||||
//
|
//
|
||||||
// TODO: Remove when storage.Interface will be separate from etc3.store.
|
// TODO: Remove when storage.Interface will be separate from etc3.store.
|
||||||
// Deprecated: Added temporarily to simplify exposing RequestProgress for watch cache.
|
// Deprecated: Added temporarily to simplify exposing RequestProgress for watch cache.
|
||||||
func (s *Storage) RequestWatchProgress(ctx context.Context) error {
|
func (s *Storage) RequestWatchProgress(_ context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -543,7 +674,7 @@ func (s *Storage) validateMinimumResourceVersion(minimumResourceVersion string,
|
|||||||
if minimumResourceVersion == "" {
|
if minimumResourceVersion == "" {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
minimumRV, err := s.Versioner().ParseResourceVersion(minimumResourceVersion)
|
minimumRV, err := s.versioner.ParseResourceVersion(minimumResourceVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
|
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
|
||||||
}
|
}
|
||||||
@ -558,3 +689,63 @@ func (s *Storage) validateMinimumResourceVersion(minimumResourceVersion string,
|
|||||||
func (s *Storage) nameFromKey(key string) string {
|
func (s *Storage) nameFromKey(key string) string {
|
||||||
return strings.Replace(key, s.resourcePrefix+"/", "", 1)
|
return strings.Replace(key, s.resourcePrefix+"/", "", 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// While this is an inefficient way to differentiate the ambiguous keys,
|
||||||
|
// we only need it for initial namespace calculation in watch
|
||||||
|
// This helps us with watcher tests that don't always set up requestcontext correctly
|
||||||
|
func (s *Storage) convertToParsedKey(key string, p storage.SelectionPredicate) (*parsedKey, error) {
|
||||||
|
// NOTE: the following supports the watcher tests that run against v1/pods
|
||||||
|
// Other than that, there are ambiguities in the key format that only field selector
|
||||||
|
// when set to use metadata.name can be used to bring clarity in the 3-segment case
|
||||||
|
|
||||||
|
// Cases handled below:
|
||||||
|
// namespace scoped:
|
||||||
|
// /<group>/<resource>/[<namespace>]/[<name>]
|
||||||
|
// /<group>/<resource>/[<namespace>]
|
||||||
|
//
|
||||||
|
// cluster scoped:
|
||||||
|
// /<group>/<resource>/[<name>]
|
||||||
|
// /<group>/<resource>
|
||||||
|
parts := strings.SplitN(key, "/", 5)
|
||||||
|
if len(parts) < 3 && (len(parts) == 2 && parts[1] != "pods") {
|
||||||
|
return nil, fmt.Errorf("invalid key (expecting at least 2 parts): %s", key)
|
||||||
|
}
|
||||||
|
|
||||||
|
// beware this empty "" as the first separated part for the rest of the parsing below
|
||||||
|
if parts[0] != "" {
|
||||||
|
return nil, fmt.Errorf("invalid key (expecting leading slash): %s", key)
|
||||||
|
}
|
||||||
|
|
||||||
|
k := &parsedKey{}
|
||||||
|
|
||||||
|
// for v1/pods that tests use, Group is empty
|
||||||
|
if len(parts) > 1 && s.gr.Group == "" {
|
||||||
|
k.resource = parts[1]
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(parts) > 2 {
|
||||||
|
// for v1/pods that tests use, Group is empty
|
||||||
|
if parts[1] == s.gr.Resource {
|
||||||
|
k.resource = parts[1]
|
||||||
|
if _, found := p.Field.RequiresExactMatch("metadata.name"); !found {
|
||||||
|
k.namespace = parts[2]
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
k.group = parts[1]
|
||||||
|
k.resource = parts[2]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(parts) > 3 {
|
||||||
|
// for v1/pods that tests use, Group is empty
|
||||||
|
if parts[1] == s.gr.Resource {
|
||||||
|
k.name = parts[3]
|
||||||
|
} else {
|
||||||
|
if _, found := p.Field.RequiresExactMatch("metadata.name"); !found {
|
||||||
|
k.namespace = parts[3]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return k, nil
|
||||||
|
}
|
||||||
|
230
pkg/apiserver/storage/file/watcher_test.go
Normal file
230
pkg/apiserver/storage/file/watcher_test.go
Normal file
@ -0,0 +1,230 @@
|
|||||||
|
// SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
// Provenance-includes-location: https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go
|
||||||
|
// Provenance-includes-license: Apache-2.0
|
||||||
|
// Provenance-includes-copyright: The Kubernetes Authors.
|
||||||
|
|
||||||
|
package file
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io/fs"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"k8s.io/apimachinery/pkg/api/apitesting"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||||
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
|
"k8s.io/apiserver/pkg/apis/example"
|
||||||
|
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
||||||
|
"k8s.io/apiserver/pkg/storage"
|
||||||
|
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||||
|
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
|
||||||
|
storagetesting "k8s.io/apiserver/pkg/storage/testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
var scheme = runtime.NewScheme()
|
||||||
|
var codecs = serializer.NewCodecFactory(scheme)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion)
|
||||||
|
utilruntime.Must(example.AddToScheme(scheme))
|
||||||
|
utilruntime.Must(examplev1.AddToScheme(scheme))
|
||||||
|
}
|
||||||
|
|
||||||
|
type setupOptions struct {
|
||||||
|
codec runtime.Codec
|
||||||
|
newFunc func() runtime.Object
|
||||||
|
newListFunc func() runtime.Object
|
||||||
|
prefix string
|
||||||
|
resourcePrefix string
|
||||||
|
groupResource schema.GroupResource
|
||||||
|
}
|
||||||
|
|
||||||
|
type setupOption func(*setupOptions, testing.TB)
|
||||||
|
|
||||||
|
func withDefaults(options *setupOptions, t testing.TB) {
|
||||||
|
options.codec = apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||||
|
options.newFunc = newPod
|
||||||
|
options.newListFunc = newPodList
|
||||||
|
options.prefix = t.TempDir()
|
||||||
|
options.resourcePrefix = "/pods"
|
||||||
|
options.groupResource = schema.GroupResource{Resource: "pods"}
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ setupOption = withDefaults
|
||||||
|
|
||||||
|
func testSetup(t testing.TB, opts ...setupOption) (context.Context, storage.Interface, factory.DestroyFunc, error) {
|
||||||
|
setupOpts := setupOptions{}
|
||||||
|
opts = append([]setupOption{withDefaults}, opts...)
|
||||||
|
for _, opt := range opts {
|
||||||
|
opt(&setupOpts, t)
|
||||||
|
}
|
||||||
|
|
||||||
|
config := storagebackend.NewDefaultConfig(setupOpts.prefix, setupOpts.codec)
|
||||||
|
store, destroyFunc, err := NewStorage(
|
||||||
|
config.ForResource(setupOpts.groupResource),
|
||||||
|
setupOpts.resourcePrefix,
|
||||||
|
func(obj runtime.Object) (string, error) {
|
||||||
|
return storage.NamespaceKeyFunc(setupOpts.resourcePrefix, obj)
|
||||||
|
},
|
||||||
|
setupOpts.newFunc,
|
||||||
|
setupOpts.newListFunc,
|
||||||
|
storage.DefaultNamespaceScopedAttr,
|
||||||
|
make(map[string]storage.IndexerFunc, 0),
|
||||||
|
nil,
|
||||||
|
)
|
||||||
|
|
||||||
|
// Some tests may start reading before writing
|
||||||
|
if err := os.MkdirAll(path.Join(setupOpts.prefix, "pods", "test-ns"), fs.ModePerm); err != nil {
|
||||||
|
return nil, nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, nil, err
|
||||||
|
}
|
||||||
|
ctx := context.Background()
|
||||||
|
return ctx, store, destroyFunc, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWatch(t *testing.T) {
|
||||||
|
ctx, store, destroyFunc, err := testSetup(t)
|
||||||
|
defer destroyFunc()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
storagetesting.RunTestWatch(ctx, t, store)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestClusterScopedWatch(t *testing.T) {
|
||||||
|
ctx, store, destroyFunc, err := testSetup(t)
|
||||||
|
defer destroyFunc()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
storagetesting.RunTestClusterScopedWatch(ctx, t, store)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNamespaceScopedWatch(t *testing.T) {
|
||||||
|
ctx, store, destroyFunc, err := testSetup(t)
|
||||||
|
defer destroyFunc()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
storagetesting.RunTestNamespaceScopedWatch(ctx, t, store)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDeleteTriggerWatch(t *testing.T) {
|
||||||
|
ctx, store, destroyFunc, err := testSetup(t)
|
||||||
|
defer destroyFunc()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
storagetesting.RunTestDeleteTriggerWatch(ctx, t, store)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWatchFromZero(t *testing.T) {
|
||||||
|
ctx, store, destroyFunc, err := testSetup(t)
|
||||||
|
defer destroyFunc()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
storagetesting.RunTestWatchFromZero(ctx, t, store, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestWatchFromNonZero tests that
|
||||||
|
// - watch from non-0 should just watch changes after given version
|
||||||
|
func TestWatchFromNoneZero(t *testing.T) {
|
||||||
|
ctx, store, destroyFunc, err := testSetup(t)
|
||||||
|
defer destroyFunc()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
storagetesting.RunTestWatchFromNonZero(ctx, t, store)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDelayedWatchDelivery(t *testing.T) {
|
||||||
|
ctx, store, destroyFunc, err := testSetup(t)
|
||||||
|
defer destroyFunc()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
storagetesting.RunTestDelayedWatchDelivery(ctx, t, store)
|
||||||
|
}
|
||||||
|
|
||||||
|
/* func TestWatchError(t *testing.T) {
|
||||||
|
ctx, store, _ := testSetup(t)
|
||||||
|
storagetesting.RunTestWatchError(ctx, t, &storeWithPrefixTransformer{store})
|
||||||
|
} */
|
||||||
|
|
||||||
|
func TestWatchContextCancel(t *testing.T) {
|
||||||
|
ctx, store, destroyFunc, err := testSetup(t)
|
||||||
|
defer destroyFunc()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
storagetesting.RunTestWatchContextCancel(ctx, t, store)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWatcherTimeout(t *testing.T) {
|
||||||
|
ctx, store, destroyFunc, err := testSetup(t)
|
||||||
|
defer destroyFunc()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
storagetesting.RunTestWatcherTimeout(ctx, t, store)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
|
||||||
|
ctx, store, destroyFunc, err := testSetup(t)
|
||||||
|
defer destroyFunc()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
storagetesting.RunTestWatchDeleteEventObjectHaveLatestRV(ctx, t, store)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: enable when we support flow control and priority fairness
|
||||||
|
/* func TestWatchInitializationSignal(t *testing.T) {
|
||||||
|
ctx, store, destroyFunc, err := testSetup(t)
|
||||||
|
defer destroyFunc()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
storagetesting.RunTestWatchInitializationSignal(ctx, t, store)
|
||||||
|
} */
|
||||||
|
|
||||||
|
/* func TestProgressNotify(t *testing.T) {
|
||||||
|
ctx, store, destroyFunc, err := testSetup(t)
|
||||||
|
defer destroyFunc()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
storagetesting.RunOptionalTestProgressNotify(ctx, t, store)
|
||||||
|
} */
|
||||||
|
|
||||||
|
// TestWatchDispatchBookmarkEvents makes sure that
|
||||||
|
// setting allowWatchBookmarks query param against
|
||||||
|
// etcd implementation doesn't have any effect.
|
||||||
|
func TestWatchDispatchBookmarkEvents(t *testing.T) {
|
||||||
|
ctx, store, destroyFunc, err := testSetup(t)
|
||||||
|
defer destroyFunc()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
storagetesting.RunTestWatchDispatchBookmarkEvents(ctx, t, store, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSendInitialEventsBackwardCompatibility(t *testing.T) {
|
||||||
|
ctx, store, destroyFunc, err := testSetup(t)
|
||||||
|
defer destroyFunc()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
storagetesting.RunSendInitialEventsBackwardCompatibility(ctx, t, store)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEtcdWatchSemantics(t *testing.T) {
|
||||||
|
ctx, store, destroyFunc, err := testSetup(t)
|
||||||
|
defer destroyFunc()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
storagetesting.RunWatchSemantics(ctx, t, store)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: determine if this test case is useful to pass
|
||||||
|
// If we simply generate Snowflakes for List RVs (when none is passed in) as opposed to maxRV calculation, it makes
|
||||||
|
// our watch implementation and comparing items against the requested RV much more reliable.
|
||||||
|
// There is no guarantee that maxRV+1 won't end up being a future item's RV.
|
||||||
|
/*
|
||||||
|
func TestEtcdWatchSemanticInitialEventsExtended(t *testing.T) {
|
||||||
|
ctx, store, destroyFunc, err := testSetup(t)
|
||||||
|
defer destroyFunc()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
storagetesting.RunWatchSemanticInitialEventsExtended(ctx, t, store)
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
func newPod() runtime.Object {
|
||||||
|
return &example.Pod{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPodList() runtime.Object {
|
||||||
|
return &example.PodList{}
|
||||||
|
}
|
@ -6,98 +6,371 @@
|
|||||||
package file
|
package file
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
|
"k8s.io/klog/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
UpdateChannelSize = 25
|
||||||
|
InitialWatchNodesSize = 20
|
||||||
|
InitialBufferedEventsSize = 25
|
||||||
|
)
|
||||||
|
|
||||||
|
type eventWrapper struct {
|
||||||
|
ev watch.Event
|
||||||
|
// optional: oldObject is only set for modifications for determining their type as necessary (when using predicate filtering)
|
||||||
|
oldObject runtime.Object
|
||||||
|
}
|
||||||
|
|
||||||
|
type watchNode struct {
|
||||||
|
ctx context.Context
|
||||||
|
s *WatchSet
|
||||||
|
id uint64
|
||||||
|
updateCh chan eventWrapper
|
||||||
|
outCh chan watch.Event
|
||||||
|
requestedRV uint64
|
||||||
|
// the watch may or may not be namespaced for a namespaced resource. This is always nil for cluster-scoped kinds
|
||||||
|
watchNamespace *string
|
||||||
|
predicate storage.SelectionPredicate
|
||||||
|
versioner storage.Versioner
|
||||||
|
}
|
||||||
|
|
||||||
// Keeps track of which watches need to be notified
|
// Keeps track of which watches need to be notified
|
||||||
type WatchSet struct {
|
type WatchSet struct {
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
nodes map[int]*watchNode
|
// mu protects both nodes and counter
|
||||||
counter int
|
nodes map[uint64]*watchNode
|
||||||
|
counter atomic.Uint64
|
||||||
|
buffered []eventWrapper
|
||||||
|
bufferedMutex sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWatchSet() *WatchSet {
|
func NewWatchSet() *WatchSet {
|
||||||
return &WatchSet{
|
return &WatchSet{
|
||||||
nodes: make(map[int]*watchNode, 20),
|
buffered: make([]eventWrapper, 0, InitialBufferedEventsSize),
|
||||||
counter: 0,
|
nodes: make(map[uint64]*watchNode, InitialWatchNodesSize),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Creates a new watch with a unique id, but
|
// Creates a new watch with a unique id, but
|
||||||
// does not start sending events to it until start() is called.
|
// does not start sending events to it until start() is called.
|
||||||
func (s *WatchSet) newWatch() *watchNode {
|
func (s *WatchSet) newWatch(ctx context.Context, requestedRV uint64, p storage.SelectionPredicate, versioner storage.Versioner, namespace *string) *watchNode {
|
||||||
s.mu.Lock()
|
s.counter.Add(1)
|
||||||
defer s.mu.Unlock()
|
|
||||||
|
|
||||||
s.counter++
|
node := &watchNode{
|
||||||
|
ctx: ctx,
|
||||||
return &watchNode{
|
requestedRV: requestedRV,
|
||||||
id: s.counter,
|
id: s.counter.Load(),
|
||||||
s: s,
|
s: s,
|
||||||
updateCh: make(chan watch.Event),
|
// updateCh size needs to be > 1 to allow slower clients to not block passing new events
|
||||||
outCh: make(chan watch.Event),
|
updateCh: make(chan eventWrapper, UpdateChannelSize),
|
||||||
|
// outCh size needs to be > 1 for single process use-cases such as tests where watch and event seeding from CUD
|
||||||
|
// events is happening on the same thread
|
||||||
|
outCh: make(chan watch.Event, UpdateChannelSize),
|
||||||
|
predicate: p,
|
||||||
|
watchNamespace: namespace,
|
||||||
|
versioner: versioner,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return node
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *WatchSet) cleanupWatchers() {
|
func (s *WatchSet) cleanupWatchers() {
|
||||||
// Doesn't protect the below access on nodes slice since it won't ever be modified during cleanup
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
for _, w := range s.nodes {
|
for _, w := range s.nodes {
|
||||||
w.Stop()
|
w.stop()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *WatchSet) notifyWatchers(ev watch.Event) {
|
// oldObject is only passed in the event of a modification
|
||||||
|
// in case a predicate filtered watch is impacted as a result of modification
|
||||||
|
// NOTE: this function gives one the misperception that a newly added node will never
|
||||||
|
// get a double event, one from buffered and one from the update channel
|
||||||
|
// That perception is not true. Even though this function maintains the lock throughout the function body
|
||||||
|
// it is not true of the Start function. So basically, the Start function running after this function
|
||||||
|
// fully stands the chance of another future notifyWatchers double sending it the event through the two means mentioned
|
||||||
|
func (s *WatchSet) notifyWatchers(ev watch.Event, oldObject runtime.Object) {
|
||||||
s.mu.RLock()
|
s.mu.RLock()
|
||||||
for _, w := range s.nodes {
|
defer s.mu.RUnlock()
|
||||||
w.updateCh <- ev
|
|
||||||
|
updateEv := eventWrapper{
|
||||||
|
ev: ev,
|
||||||
|
}
|
||||||
|
if oldObject != nil {
|
||||||
|
updateEv.oldObject = oldObject
|
||||||
|
}
|
||||||
|
|
||||||
|
// Events are always buffered.
|
||||||
|
// this is because of an inadvertent delay which is built into the watch process
|
||||||
|
// Watch() from storage returns Watch.Interface with a async start func.
|
||||||
|
// The only way to guarantee that we can interpret the passed RV correctly is to play it against missed events
|
||||||
|
// (notice the loop below over s.nodes isn't exactly going to work on a new node
|
||||||
|
// unless start is called on it)
|
||||||
|
s.bufferedMutex.Lock()
|
||||||
|
s.buffered = append(s.buffered, updateEv)
|
||||||
|
s.bufferedMutex.Unlock()
|
||||||
|
|
||||||
|
for _, w := range s.nodes {
|
||||||
|
w.updateCh <- updateEv
|
||||||
}
|
}
|
||||||
s.mu.RUnlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type watchNode struct {
|
// isValid is not necessary to be called on oldObject in UpdateEvents - assuming the Watch pushes correctly setup eventWrapper our way
|
||||||
s *WatchSet
|
// first bool is whether the event is valid for current watcher
|
||||||
id int
|
// second bool is whether checking the old value against the predicate may be valuable to the caller
|
||||||
updateCh chan watch.Event
|
// second bool may be a helpful aid to establish context around MODIFIED events
|
||||||
outCh chan watch.Event
|
// (note that this second bool is only marked true if we pass other checks first, namely RV and namespace)
|
||||||
|
func (w *watchNode) isValid(e eventWrapper) (bool, bool, error) {
|
||||||
|
obj, err := meta.Accessor(e.ev.Object)
|
||||||
|
if err != nil {
|
||||||
|
klog.Error("Could not get accessor to object in event")
|
||||||
|
return false, false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
eventRV, err := w.getResourceVersionAsInt(e.ev.Object)
|
||||||
|
if err != nil {
|
||||||
|
return false, false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if eventRV < w.requestedRV {
|
||||||
|
return false, false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if w.watchNamespace != nil && *w.watchNamespace != obj.GetNamespace() {
|
||||||
|
return false, false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
valid, err := w.predicate.Matches(e.ev.Object)
|
||||||
|
if err != nil {
|
||||||
|
return false, false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return valid, e.ev.Type == watch.Modified, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only call this method if current object matches the predicate
|
||||||
|
func (w *watchNode) handleAddedForFilteredList(e eventWrapper) (*watch.Event, error) {
|
||||||
|
if e.oldObject == nil {
|
||||||
|
return nil, fmt.Errorf("oldObject should be set for modified events")
|
||||||
|
}
|
||||||
|
|
||||||
|
ok, err := w.predicate.Matches(e.oldObject)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
e.ev.Type = watch.Added
|
||||||
|
return &e.ev, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *watchNode) handleDeletedForFilteredList(e eventWrapper) (*watch.Event, error) {
|
||||||
|
if e.oldObject == nil {
|
||||||
|
return nil, fmt.Errorf("oldObject should be set for modified events")
|
||||||
|
}
|
||||||
|
|
||||||
|
ok, err := w.predicate.Matches(e.oldObject)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// isn't a match but used to be
|
||||||
|
e.ev.Type = watch.Deleted
|
||||||
|
|
||||||
|
oldObjectAccessor, err := meta.Accessor(e.oldObject)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("Could not get accessor to correct the old RV of filtered out object")
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
currentRV, err := getResourceVersion(e.ev.Object)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("Could not get accessor to object in event")
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
oldObjectAccessor.SetResourceVersion(currentRV)
|
||||||
|
e.ev.Object = e.oldObject
|
||||||
|
|
||||||
|
return &e.ev, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *watchNode) processEvent(e eventWrapper, isInitEvent bool) error {
|
||||||
|
if isInitEvent {
|
||||||
|
// Init events have already been vetted against the predicate and other RV behavior
|
||||||
|
// Let them pass through
|
||||||
|
w.outCh <- e.ev
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
valid, runDeleteFromFilteredListHandler, err := w.isValid(e)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("Could not determine validity of the event: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if valid {
|
||||||
|
if e.ev.Type == watch.Modified {
|
||||||
|
ev, err := w.handleAddedForFilteredList(e)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if ev != nil {
|
||||||
|
w.outCh <- *ev
|
||||||
|
} else {
|
||||||
|
// forward the original event if add handling didn't signal any impact
|
||||||
|
w.outCh <- e.ev
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
w.outCh <- e.ev
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if runDeleteFromFilteredListHandler {
|
||||||
|
if e.ev.Type == watch.Modified {
|
||||||
|
ev, err := w.handleDeletedForFilteredList(e)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if ev != nil {
|
||||||
|
w.outCh <- *ev
|
||||||
|
}
|
||||||
|
} // explicitly doesn't have an event forward for the else case here
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start sending events to this watch.
|
// Start sending events to this watch.
|
||||||
func (w *watchNode) Start(p storage.SelectionPredicate, initEvents []watch.Event) {
|
func (w *watchNode) Start(initEvents ...watch.Event) {
|
||||||
w.s.mu.Lock()
|
w.s.mu.Lock()
|
||||||
w.s.nodes[w.id] = w
|
w.s.nodes[w.id] = w
|
||||||
w.s.mu.Unlock()
|
w.s.mu.Unlock()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
for _, e := range initEvents {
|
maxRV := uint64(0)
|
||||||
w.outCh <- e
|
for _, ev := range initEvents {
|
||||||
}
|
currentRV, err := w.getResourceVersionAsInt(ev.Object)
|
||||||
|
|
||||||
for e := range w.updateCh {
|
|
||||||
ok, err := p.Matches(e.Object)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
klog.Errorf("Could not determine init event RV for deduplication of buffered events: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if !ok {
|
if maxRV < currentRV {
|
||||||
|
maxRV = currentRV
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := w.processEvent(eventWrapper{ev: ev}, true); err != nil {
|
||||||
|
klog.Errorf("Could not process event: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we had no init events, simply rely on the passed RV
|
||||||
|
if maxRV == 0 {
|
||||||
|
maxRV = w.requestedRV
|
||||||
|
}
|
||||||
|
|
||||||
|
w.s.bufferedMutex.RLock()
|
||||||
|
for _, e := range w.s.buffered {
|
||||||
|
eventRV, err := w.getResourceVersionAsInt(e.ev.Object)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("Could not determine RV for deduplication of buffered events: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
w.outCh <- e
|
|
||||||
|
if maxRV >= eventRV {
|
||||||
|
continue
|
||||||
|
} else {
|
||||||
|
maxRV = eventRV
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := w.processEvent(e, false); err != nil {
|
||||||
|
klog.Errorf("Could not process event: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
w.s.bufferedMutex.RUnlock()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case e, ok := <-w.updateCh:
|
||||||
|
if !ok {
|
||||||
|
close(w.outCh)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
eventRV, err := w.getResourceVersionAsInt(e.ev.Object)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("Could not determine RV for deduplication of channel events: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if maxRV >= eventRV {
|
||||||
|
continue
|
||||||
|
} else {
|
||||||
|
maxRV = eventRV
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := w.processEvent(e, false); err != nil {
|
||||||
|
klog.Errorf("Could not process event: %v", err)
|
||||||
|
}
|
||||||
|
case <-w.ctx.Done():
|
||||||
|
close(w.outCh)
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
close(w.outCh)
|
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *watchNode) Stop() {
|
func (w *watchNode) Stop() {
|
||||||
w.s.mu.Lock()
|
w.s.mu.Lock()
|
||||||
delete(w.s.nodes, w.id)
|
defer w.s.mu.Unlock()
|
||||||
w.s.mu.Unlock()
|
w.stop()
|
||||||
|
}
|
||||||
|
|
||||||
close(w.updateCh)
|
// Unprotected func: ensure mutex on the parent watch set is locked before calling
|
||||||
|
func (w *watchNode) stop() {
|
||||||
|
if _, ok := w.s.nodes[w.id]; ok {
|
||||||
|
delete(w.s.nodes, w.id)
|
||||||
|
close(w.updateCh)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *watchNode) ResultChan() <-chan watch.Event {
|
func (w *watchNode) ResultChan() <-chan watch.Event {
|
||||||
return w.outCh
|
return w.outCh
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getResourceVersion(obj runtime.Object) (string, error) {
|
||||||
|
accessor, err := meta.Accessor(obj)
|
||||||
|
if err != nil {
|
||||||
|
klog.Error("Could not get accessor to object in event")
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return accessor.GetResourceVersion(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *watchNode) getResourceVersionAsInt(obj runtime.Object) (uint64, error) {
|
||||||
|
accessor, err := meta.Accessor(obj)
|
||||||
|
if err != nil {
|
||||||
|
klog.Error("Could not get accessor to object in event")
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return w.versioner.ParseResourceVersion(accessor.GetResourceVersion())
|
||||||
|
}
|
||||||
|
@ -42,6 +42,7 @@ import (
|
|||||||
apiregistrationclientset "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
|
apiregistrationclientset "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
|
||||||
apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1"
|
apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1"
|
||||||
apiregistrationInformers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1"
|
apiregistrationInformers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1"
|
||||||
|
"k8s.io/kube-aggregator/pkg/controllers"
|
||||||
"k8s.io/kube-aggregator/pkg/controllers/autoregister"
|
"k8s.io/kube-aggregator/pkg/controllers/autoregister"
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/apiserver/builder"
|
"github.com/grafana/grafana/pkg/apiserver/builder"
|
||||||
@ -173,7 +174,7 @@ func CreateAggregatorServer(config *Config, delegateAPIServer genericapiserver.D
|
|||||||
aggregatorConfig := config.KubeAggregatorConfig
|
aggregatorConfig := config.KubeAggregatorConfig
|
||||||
sharedInformerFactory := config.Informers
|
sharedInformerFactory := config.Informers
|
||||||
remoteServicesConfig := config.RemoteServicesConfig
|
remoteServicesConfig := config.RemoteServicesConfig
|
||||||
|
externalNamesInformer := sharedInformerFactory.Service().V0alpha1().ExternalNames()
|
||||||
completedConfig := aggregatorConfig.Complete()
|
completedConfig := aggregatorConfig.Complete()
|
||||||
|
|
||||||
aggregatorServer, err := completedConfig.NewWithDelegate(delegateAPIServer)
|
aggregatorServer, err := completedConfig.NewWithDelegate(delegateAPIServer)
|
||||||
@ -209,7 +210,8 @@ func CreateAggregatorServer(config *Config, delegateAPIServer genericapiserver.D
|
|||||||
if remoteServicesConfig != nil {
|
if remoteServicesConfig != nil {
|
||||||
addRemoteAPIServicesToRegister(remoteServicesConfig, autoRegistrationController)
|
addRemoteAPIServicesToRegister(remoteServicesConfig, autoRegistrationController)
|
||||||
externalNames := getRemoteExternalNamesToRegister(remoteServicesConfig)
|
externalNames := getRemoteExternalNamesToRegister(remoteServicesConfig)
|
||||||
err = aggregatorServer.GenericAPIServer.AddPostStartHook("grafana-apiserver-remote-autoregistration", func(_ genericapiserver.PostStartHookContext) error {
|
err = aggregatorServer.GenericAPIServer.AddPostStartHook("grafana-apiserver-remote-autoregistration", func(ctx genericapiserver.PostStartHookContext) error {
|
||||||
|
controllers.WaitForCacheSync("grafana-apiserver-remote-autoregistration", ctx.StopCh, externalNamesInformer.Informer().HasSynced)
|
||||||
namespacedClient := remoteServicesConfig.serviceClientSet.ServiceV0alpha1().ExternalNames(remoteServicesConfig.ExternalNamesNamespace)
|
namespacedClient := remoteServicesConfig.serviceClientSet.ServiceV0alpha1().ExternalNames(remoteServicesConfig.ExternalNamesNamespace)
|
||||||
for _, externalName := range externalNames {
|
for _, externalName := range externalNames {
|
||||||
_, err := namespacedClient.Apply(context.Background(), externalName, metav1.ApplyOptions{
|
_, err := namespacedClient.Apply(context.Background(), externalName, metav1.ApplyOptions{
|
||||||
@ -245,7 +247,7 @@ func CreateAggregatorServer(config *Config, delegateAPIServer genericapiserver.D
|
|||||||
|
|
||||||
availableController, err := NewAvailableConditionController(
|
availableController, err := NewAvailableConditionController(
|
||||||
aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(),
|
aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(),
|
||||||
sharedInformerFactory.Service().V0alpha1().ExternalNames(),
|
externalNamesInformer,
|
||||||
apiregistrationClient.ApiregistrationV1(),
|
apiregistrationClient.ApiregistrationV1(),
|
||||||
nil,
|
nil,
|
||||||
(func() ([]byte, []byte))(nil),
|
(func() ([]byte, []byte))(nil),
|
||||||
|
Loading…
Reference in New Issue
Block a user