2024-07-05 14:17:20 -04:00
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/kubernetes/apiserver/blob/master/pkg/storage/testing/watcher_tests.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Kubernetes Authors.
package testing
import (
"context"
"fmt"
"net/http"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/apis/example"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/value"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/utils/ptr"
)
func RunTestWatch ( ctx context . Context , t * testing . T , store storage . Interface ) {
testWatch ( ctx , t , store , false )
testWatch ( ctx , t , store , true )
}
// It tests that
// - first occurrence of objects should notify Add event
// - update should trigger Modified event
// - update that gets filtered should trigger Deleted event
func testWatch ( ctx context . Context , t * testing . T , store storage . Interface , recursive bool ) {
basePod := & example . Pod {
ObjectMeta : metav1 . ObjectMeta { Name : "foo" } ,
Spec : example . PodSpec { NodeName : "" } ,
}
basePodAssigned := & example . Pod {
ObjectMeta : metav1 . ObjectMeta { Name : "foo" } ,
Spec : example . PodSpec { NodeName : "bar" } ,
}
selectedPod := func ( pod * example . Pod ) * example . Pod {
result := pod . DeepCopy ( )
result . Labels = map [ string ] string { "select" : "true" }
return result
}
tests := [ ] struct {
name string
namespace string
key string
pred storage . SelectionPredicate
watchTests [ ] * testWatchStruct
} { {
name : "create a key" ,
namespace : fmt . Sprintf ( "test-ns-1-%t" , recursive ) ,
watchTests : [ ] * testWatchStruct { { basePod , true , watch . Added } } ,
pred : storage . Everything ,
} , {
name : "key updated to match predicate" ,
namespace : fmt . Sprintf ( "test-ns-2-%t" , recursive ) ,
watchTests : [ ] * testWatchStruct { { basePod , false , "" } , { basePodAssigned , true , watch . Added } } ,
pred : storage . SelectionPredicate {
Label : labels . Everything ( ) ,
Field : fields . ParseSelectorOrDie ( "spec.nodeName=bar" ) ,
GetAttrs : func ( obj runtime . Object ) ( labels . Set , fields . Set , error ) {
pod := obj . ( * example . Pod )
return nil , fields . Set { "spec.nodeName" : pod . Spec . NodeName } , nil
} ,
} ,
} , {
name : "update" ,
namespace : fmt . Sprintf ( "test-ns-3-%t" , recursive ) ,
watchTests : [ ] * testWatchStruct { { basePod , true , watch . Added } , { basePodAssigned , true , watch . Modified } } ,
pred : storage . Everything ,
} , {
name : "delete because of being filtered" ,
namespace : fmt . Sprintf ( "test-ns-4-%t" , recursive ) ,
watchTests : [ ] * testWatchStruct { { basePod , true , watch . Added } , { basePodAssigned , true , watch . Deleted } } ,
pred : storage . SelectionPredicate {
Label : labels . Everything ( ) ,
Field : fields . ParseSelectorOrDie ( "spec.nodeName!=bar" ) ,
GetAttrs : func ( obj runtime . Object ) ( labels . Set , fields . Set , error ) {
pod := obj . ( * example . Pod )
return nil , fields . Set { "spec.nodeName" : pod . Spec . NodeName } , nil
} ,
} ,
} , {
name : "filtering" ,
namespace : fmt . Sprintf ( "test-ns-5-%t" , recursive ) ,
watchTests : [ ] * testWatchStruct {
{ selectedPod ( basePod ) , true , watch . Added } ,
{ basePod , true , watch . Deleted } ,
{ selectedPod ( basePod ) , true , watch . Added } ,
{ selectedPod ( basePodAssigned ) , true , watch . Modified } ,
{ nil , true , watch . Deleted } ,
} ,
pred : storage . SelectionPredicate {
Label : labels . SelectorFromSet ( labels . Set { "select" : "true" } ) ,
Field : fields . Everything ( ) ,
GetAttrs : func ( obj runtime . Object ) ( labels . Set , fields . Set , error ) {
pod := obj . ( * example . Pod )
return labels . Set ( pod . Labels ) , nil , nil
} ,
} ,
} }
for _ , tt := range tests {
t . Run ( tt . name , func ( t * testing . T ) {
watchKey := KeyFunc ( tt . namespace , "" )
key := KeyFunc ( tt . namespace , "foo" )
if ! recursive {
watchKey = key
}
// Get the current RV from which we can start watching.
out := & example . PodList { }
if err := store . GetList ( ctx , watchKey , storage . ListOptions { ResourceVersion : "" , Predicate : tt . pred , Recursive : recursive } , out ) ; err != nil {
t . Fatalf ( "List failed: %v" , err )
}
w , err := store . Watch ( ctx , watchKey , storage . ListOptions { ResourceVersion : out . ResourceVersion , Predicate : tt . pred , Recursive : recursive } )
if err != nil {
t . Fatalf ( "Watch failed: %v" , err )
}
// Create a pod in a different namespace first to ensure
// that its corresponding event will not be propagated.
badKey := KeyFunc ( fmt . Sprintf ( "%s-bad" , tt . namespace ) , "foo" )
badOut := & example . Pod { }
err = store . GuaranteedUpdate ( ctx , badKey , badOut , true , nil , storage . SimpleUpdate (
func ( runtime . Object ) ( runtime . Object , error ) {
obj := basePod . DeepCopy ( )
obj . Namespace = fmt . Sprintf ( "%s-bad" , tt . namespace )
return obj , nil
} ) , nil )
if err != nil {
t . Fatalf ( "GuaranteedUpdate of bad pod failed: %v" , err )
}
var prevObj * example . Pod
for _ , watchTest := range tt . watchTests {
out := & example . Pod { }
if watchTest . obj != nil {
err := store . GuaranteedUpdate ( ctx , key , out , true , nil , storage . SimpleUpdate (
func ( runtime . Object ) ( runtime . Object , error ) {
obj := watchTest . obj . DeepCopy ( )
obj . Namespace = tt . namespace
return obj , nil
} ) , nil )
if err != nil {
t . Fatalf ( "GuaranteedUpdate failed: %v" , err )
}
} else {
2025-01-06 18:20:35 +03:00
err := store . Delete ( ctx , key , out , nil , storage . ValidateAllObjectFunc , nil , storage . DeleteOptions { } )
2024-07-05 14:17:20 -04:00
if err != nil {
t . Fatalf ( "Delete failed: %v" , err )
}
}
if watchTest . expectEvent {
expectObj := out
if watchTest . watchType == watch . Deleted {
expectObj = prevObj
expectObj . ResourceVersion = out . ResourceVersion
}
testCheckResult ( t , w , watch . Event { Type : watchTest . watchType , Object : expectObj } )
}
prevObj = out
}
w . Stop ( )
testCheckStop ( t , w )
} )
}
}
// RunTestWatchFromZero tests that
// - watch from 0 should sync up and grab the object added before
// - For testing with etcd, watch from 0 is able to return events for objects
// whose previous version has been compacted. If testing with cacher, we
// expect compaction to be nil.
func RunTestWatchFromZero ( ctx context . Context , t * testing . T , store storage . Interface , compaction Compaction ) {
key , storedObj := testPropagateStore ( ctx , t , store , & example . Pod { ObjectMeta : metav1 . ObjectMeta { Name : "foo" , Namespace : "test-ns" } } )
w , err := store . Watch ( ctx , key , storage . ListOptions { ResourceVersion : "0" , Predicate : storage . Everything } )
if err != nil {
t . Fatalf ( "Watch failed: %v" , err )
}
testCheckResult ( t , w , watch . Event { Type : watch . Added , Object : storedObj } )
// Update
out := & example . Pod { }
err = store . GuaranteedUpdate ( ctx , key , out , true , nil , storage . SimpleUpdate (
func ( runtime . Object ) ( runtime . Object , error ) {
return & example . Pod { ObjectMeta : metav1 . ObjectMeta { Name : "foo" , Namespace : "test-ns" , Annotations : map [ string ] string { "a" : "1" } } } , nil
} ) , nil )
if err != nil {
t . Fatalf ( "GuaranteedUpdate failed: %v" , err )
}
// Check that we receive a modified watch event. This check also
// indirectly ensures that the cache is synced. This is important
// when testing with the Cacher since we may have to allow for slow
// processing by allowing updates to propagate to the watch cache.
// This allows for that.
testCheckResult ( t , w , watch . Event { Type : watch . Modified , Object : out } )
w . Stop ( )
// Make sure when we watch from 0 we receive an ADDED event
w , err = store . Watch ( ctx , key , storage . ListOptions { ResourceVersion : "0" , Predicate : storage . Everything } )
if err != nil {
t . Fatalf ( "Watch failed: %v" , err )
}
testCheckResult ( t , w , watch . Event { Type : watch . Added , Object : out } )
w . Stop ( )
// Compact previous versions
if compaction == nil {
t . Skip ( "compaction callback not provided" )
}
// Update again
newOut := & example . Pod { }
err = store . GuaranteedUpdate ( ctx , key , newOut , true , nil , storage . SimpleUpdate (
func ( runtime . Object ) ( runtime . Object , error ) {
return & example . Pod { ObjectMeta : metav1 . ObjectMeta { Name : "foo" , Namespace : "test-ns" } } , nil
} ) , nil )
if err != nil {
t . Fatalf ( "GuaranteedUpdate failed: %v" , err )
}
// Compact previous versions
compaction ( ctx , t , newOut . ResourceVersion )
// Make sure we can still watch from 0 and receive an ADDED event
w , err = store . Watch ( ctx , key , storage . ListOptions { ResourceVersion : "0" , Predicate : storage . Everything } )
defer w . Stop ( )
if err != nil {
t . Fatalf ( "Watch failed: %v" , err )
}
testCheckResult ( t , w , watch . Event { Type : watch . Added , Object : newOut } )
// Make sure we can't watch from older resource versions anymoer and get a "Gone" error.
tooOldWatcher , err := store . Watch ( ctx , key , storage . ListOptions { ResourceVersion : out . ResourceVersion , Predicate : storage . Everything } )
if err != nil {
t . Fatalf ( "Watch failed: %v" , err )
}
defer tooOldWatcher . Stop ( )
expiredError := errors . NewResourceExpired ( "" ) . ErrStatus
// TODO(wojtek-t): It seems that etcd is currently returning a different error,
// being an Internal error of "etcd event received with PrevKv=nil".
// We temporary allow both but we should unify here.
internalError := metav1 . Status {
Status : metav1 . StatusFailure ,
Code : http . StatusInternalServerError ,
Reason : metav1 . StatusReasonInternalError ,
}
testCheckResultFunc ( t , tooOldWatcher , func ( actualEvent watch . Event ) {
expectNoDiff ( t , "incorrect event type" , watch . Error , actualEvent . Type )
if ! apiequality . Semantic . DeepDerivative ( & expiredError , actualEvent . Object ) && ! apiequality . Semantic . DeepDerivative ( & internalError , actualEvent . Object ) {
t . Errorf ( "expected: %#v; got %#v" , & expiredError , actualEvent . Object )
}
} )
}
func RunTestDeleteTriggerWatch ( ctx context . Context , t * testing . T , store storage . Interface ) {
key , storedObj := testPropagateStore ( ctx , t , store , & example . Pod { ObjectMeta : metav1 . ObjectMeta { Name : "foo" , Namespace : "test-ns" } } )
w , err := store . Watch ( ctx , key , storage . ListOptions { ResourceVersion : storedObj . ResourceVersion , Predicate : storage . Everything } )
if err != nil {
t . Fatalf ( "Watch failed: %v" , err )
}
2025-01-06 18:20:35 +03:00
if err := store . Delete ( ctx , key , & example . Pod { } , nil , storage . ValidateAllObjectFunc , nil , storage . DeleteOptions { } ) ; err != nil {
2024-07-05 14:17:20 -04:00
t . Fatalf ( "Delete failed: %v" , err )
}
testCheckEventType ( t , w , watch . Deleted )
}
func RunTestWatchFromNonZero ( ctx context . Context , t * testing . T , store storage . Interface ) {
key , storedObj := testPropagateStore ( ctx , t , store , & example . Pod { ObjectMeta : metav1 . ObjectMeta { Name : "foo" , Namespace : "test-ns" } } )
w , err := store . Watch ( ctx , key , storage . ListOptions { ResourceVersion : storedObj . ResourceVersion , Predicate : storage . Everything } )
if err != nil {
t . Fatalf ( "Watch failed: %v" , err )
}
out := & example . Pod { }
_ = store . GuaranteedUpdate ( ctx , key , out , true , nil , storage . SimpleUpdate (
func ( runtime . Object ) ( runtime . Object , error ) {
newObj := storedObj . DeepCopy ( )
newObj . Annotations = map [ string ] string { "version" : "2" }
return newObj , nil
} ) , nil )
testCheckResult ( t , w , watch . Event { Type : watch . Modified , Object : out } )
}
func RunTestDelayedWatchDelivery ( ctx context . Context , t * testing . T , store storage . Interface ) {
_ , storedObj := testPropagateStore ( ctx , t , store , & example . Pod { ObjectMeta : metav1 . ObjectMeta { Name : "foo" , Namespace : "test-ns" } } )
startRV := storedObj . ResourceVersion
watcher , err := store . Watch ( ctx , KeyFunc ( "test-ns" , "" ) , storage . ListOptions { ResourceVersion : startRV , Predicate : storage . Everything , Recursive : true } )
if err != nil {
t . Fatalf ( "Unexpected error: %v" , err )
}
// Depending on the implementation, different number of events that
// should be delivered to the watcher can be created before it will
// block the implementation and as a result force the watcher to be
// closed (as otherwise events would have to be dropped).
// For now, this number is smallest for Cacher and it equals 21 for it.
totalPods := 21
for i := 0 ; i < totalPods ; i ++ {
out := & example . Pod { }
pod := & example . Pod {
ObjectMeta : metav1 . ObjectMeta { Name : fmt . Sprintf ( "foo-%d" , i ) , Namespace : "test-ns" } ,
}
err := store . GuaranteedUpdate ( ctx , computePodKey ( pod ) , out , true , nil , storage . SimpleUpdate (
func ( runtime . Object ) ( runtime . Object , error ) {
return pod , nil
} ) , nil )
if err != nil {
t . Errorf ( "GuaranteedUpdate failed: %v" , err )
}
}
// Now stop the watcher and check if the consecutive events are being delivered.
watcher . Stop ( )
watched := 0
for {
event , ok := <- watcher . ResultChan ( )
if ! ok {
break
}
object := event . Object
if co , ok := object . ( runtime . CacheableObject ) ; ok {
object = co . GetObject ( )
}
if a , e := object . ( * example . Pod ) . Name , fmt . Sprintf ( "foo-%d" , watched ) ; e != a {
t . Errorf ( "Unexpected object watched: %s, expected %s" , a , e )
}
watched ++
}
// We expect at least N events to be delivered, depending on the implementation.
// For now, this number is smallest for Cacher and it equals 10 (size of the out buffer).
if watched < 10 {
t . Errorf ( "Unexpected number of events: %v, expected: %v" , watched , totalPods )
}
}
func RunTestWatchError ( ctx context . Context , t * testing . T , store InterfaceWithPrefixTransformer ) {
obj := & example . Pod { ObjectMeta : metav1 . ObjectMeta { Name : "foo" , Namespace : "test-ns" } }
key := computePodKey ( obj )
// Compute the initial resource version from which we can start watching later.
list := & example . PodList { }
storageOpts := storage . ListOptions {
ResourceVersion : "0" ,
Predicate : storage . Everything ,
Recursive : true ,
}
if err := store . GetList ( ctx , KeyFunc ( "" , "" ) , storageOpts , list ) ; err != nil {
t . Errorf ( "Unexpected error: %v" , err )
}
if err := store . GuaranteedUpdate ( ctx , key , & example . Pod { } , true , nil , storage . SimpleUpdate (
func ( runtime . Object ) ( runtime . Object , error ) {
return obj , nil
} ) , nil ) ; err != nil {
t . Fatalf ( "GuaranteedUpdate failed: %v" , err )
}
// Now trigger watch error by injecting failing transformer.
revertTransformer := store . UpdatePrefixTransformer (
func ( previousTransformer * PrefixTransformer ) value . Transformer {
return & failingTransformer { }
} )
defer revertTransformer ( )
w , err := store . Watch ( ctx , key , storage . ListOptions { ResourceVersion : list . ResourceVersion , Predicate : storage . Everything } )
if err != nil {
t . Fatalf ( "Watch failed: %v" , err )
}
testCheckEventType ( t , w , watch . Error )
}
func RunTestWatchContextCancel ( ctx context . Context , t * testing . T , store storage . Interface ) {
canceledCtx , cancel := context . WithCancel ( ctx )
cancel ( )
// When we watch with a canceled context, we should detect that it's context canceled.
// We won't take it as error and also close the watcher.
w , err := store . Watch ( canceledCtx , KeyFunc ( "not-existing" , "" ) , storage . ListOptions {
ResourceVersion : "0" ,
Predicate : storage . Everything ,
} )
if err != nil {
t . Fatal ( err )
}
select {
case _ , ok := <- w . ResultChan ( ) :
if ok {
t . Error ( "ResultChan() should be closed" )
}
case <- time . After ( wait . ForeverTestTimeout ) :
t . Errorf ( "timeout after %v" , wait . ForeverTestTimeout )
}
}
func RunTestWatcherTimeout ( ctx context . Context , t * testing . T , store storage . Interface ) {
// initialRV is used to initate the watcher at the beginning of the world.
podList := example . PodList { }
options := storage . ListOptions {
Predicate : storage . Everything ,
Recursive : true ,
}
if err := store . GetList ( ctx , KeyFunc ( "" , "" ) , options , & podList ) ; err != nil {
t . Fatalf ( "Failed to list pods: %v" , err )
}
initialRV := podList . ResourceVersion
options = storage . ListOptions {
ResourceVersion : initialRV ,
Predicate : storage . Everything ,
Recursive : true ,
}
// Create a number of watchers that will not be reading any result.
nonReadingWatchers := 50
for i := 0 ; i < nonReadingWatchers ; i ++ {
watcher , err := store . Watch ( ctx , KeyFunc ( "test-ns" , "" ) , options )
if err != nil {
t . Fatalf ( "Unexpected error: %v" , err )
}
defer watcher . Stop ( )
}
// Create a second watcher that will be reading result.
readingWatcher , err := store . Watch ( ctx , KeyFunc ( "test-ns" , "" ) , options )
if err != nil {
t . Fatalf ( "Unexpected error: %v" , err )
}
defer readingWatcher . Stop ( )
// Depending on the implementation, different number of events that
// should be delivered to the watcher can be created before it will
// block the implementation and as a result force the watcher to be
// closed (as otherwise events would have to be dropped).
// For now, this number is smallest for Cacher and it equals 21 for it.
//
// Create more events to ensure that we're not blocking other watchers
// forever.
startTime := time . Now ( )
for i := 0 ; i < 22 ; i ++ {
out := & example . Pod { }
pod := & example . Pod { ObjectMeta : metav1 . ObjectMeta { Name : fmt . Sprintf ( "foo-%d" , i ) , Namespace : "test-ns" } }
if err := store . Create ( ctx , computePodKey ( pod ) , pod , out , 0 ) ; err != nil {
t . Fatalf ( "Create failed: %v" , err )
}
testCheckResult ( t , readingWatcher , watch . Event { Type : watch . Added , Object : out } )
}
if time . Since ( startTime ) > time . Duration ( 250 * nonReadingWatchers ) * time . Millisecond {
t . Errorf ( "waiting for events took too long: %v" , time . Since ( startTime ) )
}
}
func RunTestWatchDeleteEventObjectHaveLatestRV ( ctx context . Context , t * testing . T , store storage . Interface ) {
key , storedObj := testPropagateStore ( ctx , t , store , & example . Pod { ObjectMeta : metav1 . ObjectMeta { Name : "foo" , Namespace : "test-ns" } } )
watchCtx , cancel := context . WithTimeout ( ctx , wait . ForeverTestTimeout )
t . Cleanup ( cancel )
w , err := store . Watch ( watchCtx , key , storage . ListOptions { ResourceVersion : storedObj . ResourceVersion , Predicate : storage . Everything } )
if err != nil {
t . Fatalf ( "Watch failed: %v" , err )
}
deletedObj := & example . Pod { }
2025-01-06 18:20:35 +03:00
if err := store . Delete ( ctx , key , deletedObj , & storage . Preconditions { } , storage . ValidateAllObjectFunc , nil , storage . DeleteOptions { } ) ; err != nil {
2024-07-05 14:17:20 -04:00
t . Fatalf ( "Delete failed: %v" , err )
}
// Verify that ResourceVersion has changed on deletion.
if storedObj . ResourceVersion == deletedObj . ResourceVersion {
t . Fatalf ( "ResourceVersion didn't changed on deletion: %s" , deletedObj . ResourceVersion )
}
testCheckResult ( t , w , watch . Event { Type : watch . Deleted , Object : deletedObj } )
}
func RunTestWatchInitializationSignal ( ctx context . Context , t * testing . T , store storage . Interface ) {
ctx , cancel := context . WithTimeout ( ctx , 5 * time . Second )
t . Cleanup ( cancel )
initSignal := utilflowcontrol . NewInitializationSignal ( )
ctx = utilflowcontrol . WithInitializationSignal ( ctx , initSignal )
key , storedObj := testPropagateStore ( ctx , t , store , & example . Pod { ObjectMeta : metav1 . ObjectMeta { Name : "foo" , Namespace : "test-ns" } } )
_ , err := store . Watch ( ctx , key , storage . ListOptions { ResourceVersion : storedObj . ResourceVersion , Predicate : storage . Everything } )
if err != nil {
t . Fatalf ( "Watch failed: %v" , err )
}
initSignal . Wait ( )
}
// RunOptionalTestProgressNotify tests ProgressNotify feature of ListOptions.
// Given this feature is currently not explicitly used by higher layers of Kubernetes
// (it rather is used by wrappers of storage.Interface to implement its functionalities)
// this test is currently considered optional.
func RunOptionalTestProgressNotify ( ctx context . Context , t * testing . T , store storage . Interface ) {
input := & example . Pod { ObjectMeta : metav1 . ObjectMeta { Name : "name" , Namespace : "test-ns" } }
key := computePodKey ( input )
out := & example . Pod { }
if err := store . Create ( ctx , key , input , out , 0 ) ; err != nil {
t . Fatalf ( "Create failed: %v" , err )
}
validateResourceVersion := resourceVersionNotOlderThan ( out . ResourceVersion )
opts := storage . ListOptions {
ResourceVersion : out . ResourceVersion ,
Predicate : storage . Everything ,
ProgressNotify : true ,
}
w , err := store . Watch ( ctx , key , opts )
if err != nil {
t . Fatalf ( "Watch failed: %v" , err )
}
// when we send a bookmark event, the client expects the event to contain an
// object of the correct type, but with no fields set other than the resourceVersion
testCheckResultFunc ( t , w , func ( actualEvent watch . Event ) {
expectNoDiff ( t , "incorrect event type" , watch . Bookmark , actualEvent . Type )
// first, check that we have the correct resource version
obj , ok := actualEvent . Object . ( metav1 . Object )
if ! ok {
t . Fatalf ( "got %T, not metav1.Object" , actualEvent . Object )
}
if err := validateResourceVersion ( obj . GetResourceVersion ( ) ) ; err != nil {
t . Fatal ( err )
}
// then, check that we have the right type and content
pod , ok := actualEvent . Object . ( * example . Pod )
if ! ok {
t . Fatalf ( "got %T, not *example.Pod" , actualEvent . Object )
}
pod . ResourceVersion = ""
expectNoDiff ( t , "bookmark event should contain an object with no fields set other than resourceVersion" , & example . Pod { } , pod )
} )
}
// It tests watches of cluster-scoped resources.
func RunTestClusterScopedWatch ( ctx context . Context , t * testing . T , store storage . Interface ) {
tests := [ ] struct {
name string
// For watch request, the name of object is specified with field selector
// "metadata.name=objectName". So in this watch tests, we should set the
// requestedName and field selector "metadata.name=requestedName" at the
// same time or set neighter of them.
requestedName string
recursive bool
fieldSelector fields . Selector
indexFields [ ] string
watchTests [ ] * testWatchStruct
} {
{
name : "cluster-wide watch, request without name, without field selector" ,
recursive : true ,
fieldSelector : fields . Everything ( ) ,
watchTests : [ ] * testWatchStruct {
{ basePod ( "t1-foo1" ) , true , watch . Added } ,
{ basePodUpdated ( "t1-foo1" ) , true , watch . Modified } ,
{ basePodAssigned ( "t1-foo2" , "t1-bar1" ) , true , watch . Added } ,
} ,
} ,
{
name : "cluster-wide watch, request without name, field selector with spec.nodeName" ,
recursive : true ,
fieldSelector : fields . ParseSelectorOrDie ( "spec.nodeName=t2-bar1" ) ,
indexFields : [ ] string { "spec.nodeName" } ,
watchTests : [ ] * testWatchStruct {
{ basePod ( "t2-foo1" ) , false , "" } ,
{ basePodAssigned ( "t2-foo1" , "t2-bar1" ) , true , watch . Added } ,
} ,
} ,
{
name : "cluster-wide watch, request without name, field selector with spec.nodeName to filter out watch" ,
recursive : true ,
fieldSelector : fields . ParseSelectorOrDie ( "spec.nodeName!=t3-bar1" ) ,
indexFields : [ ] string { "spec.nodeName" } ,
watchTests : [ ] * testWatchStruct {
{ basePod ( "t3-foo1" ) , true , watch . Added } ,
{ basePod ( "t3-foo2" ) , true , watch . Added } ,
{ basePodUpdated ( "t3-foo1" ) , true , watch . Modified } ,
{ basePodAssigned ( "t3-foo1" , "t3-bar1" ) , true , watch . Deleted } ,
} ,
} ,
{
name : "cluster-wide watch, request with name, field selector with metadata.name" ,
requestedName : "t4-foo1" ,
fieldSelector : fields . ParseSelectorOrDie ( "metadata.name=t4-foo1" ) ,
watchTests : [ ] * testWatchStruct {
{ basePod ( "t4-foo1" ) , true , watch . Added } ,
{ basePod ( "t4-foo2" ) , false , "" } ,
{ basePodUpdated ( "t4-foo1" ) , true , watch . Modified } ,
{ basePodUpdated ( "t4-foo2" ) , false , "" } ,
} ,
} ,
{
name : "cluster-wide watch, request with name, field selector with metadata.name and spec.nodeName" ,
requestedName : "t5-foo1" ,
fieldSelector : fields . SelectorFromSet ( fields . Set {
"metadata.name" : "t5-foo1" ,
"spec.nodeName" : "t5-bar1" ,
} ) ,
indexFields : [ ] string { "spec.nodeName" } ,
watchTests : [ ] * testWatchStruct {
{ basePod ( "t5-foo1" ) , false , "" } ,
{ basePod ( "t5-foo2" ) , false , "" } ,
{ basePodUpdated ( "t5-foo1" ) , false , "" } ,
{ basePodUpdated ( "t5-foo2" ) , false , "" } ,
{ basePodAssigned ( "t5-foo1" , "t5-bar1" ) , true , watch . Added } ,
} ,
} ,
{
name : "cluster-wide watch, request with name, field selector with metadata.name, and with spec.nodeName to filter out watch" ,
requestedName : "t6-foo1" ,
fieldSelector : fields . AndSelectors (
fields . ParseSelectorOrDie ( "spec.nodeName!=t6-bar1" ) ,
fields . SelectorFromSet ( fields . Set { "metadata.name" : "t6-foo1" } ) ,
) ,
indexFields : [ ] string { "spec.nodeName" } ,
watchTests : [ ] * testWatchStruct {
{ basePod ( "t6-foo1" ) , true , watch . Added } ,
{ basePod ( "t6-foo2" ) , false , "" } ,
{ basePodUpdated ( "t6-foo1" ) , true , watch . Modified } ,
{ basePodAssigned ( "t6-foo1" , "t6-bar1" ) , true , watch . Deleted } ,
{ basePodAssigned ( "t6-foo2" , "t6-bar1" ) , false , "" } ,
} ,
} ,
}
for _ , tt := range tests {
t . Run ( tt . name , func ( t * testing . T ) {
requestInfo := & genericapirequest . RequestInfo { }
requestInfo . Name = tt . requestedName
requestInfo . Namespace = ""
ctx = genericapirequest . WithRequestInfo ( ctx , requestInfo )
ctx = genericapirequest . WithNamespace ( ctx , "" )
watchKey := KeyFunc ( "" , tt . requestedName )
predicate := createPodPredicate ( tt . fieldSelector , false , tt . indexFields )
list := & example . PodList { }
opts := storage . ListOptions {
ResourceVersion : "" ,
Predicate : predicate ,
Recursive : true ,
}
if err := store . GetList ( ctx , KeyFunc ( "" , "" ) , opts , list ) ; err != nil {
t . Errorf ( "Unexpected error: %v" , err )
}
opts . ResourceVersion = list . ResourceVersion
opts . Recursive = tt . recursive
w , err := store . Watch ( ctx , watchKey , opts )
if err != nil {
t . Fatalf ( "Watch failed: %v" , err )
}
currentObjs := map [ string ] * example . Pod { }
for _ , watchTest := range tt . watchTests {
out := & example . Pod { }
key := "pods/" + watchTest . obj . Name
err := store . GuaranteedUpdate ( ctx , key , out , true , nil , storage . SimpleUpdate (
func ( runtime . Object ) ( runtime . Object , error ) {
obj := watchTest . obj . DeepCopy ( )
return obj , nil
} ) , nil )
if err != nil {
t . Fatalf ( "GuaranteedUpdate failed: %v" , err )
}
expectObj := out
if watchTest . watchType == watch . Deleted {
expectObj = currentObjs [ watchTest . obj . Name ]
expectObj . ResourceVersion = out . ResourceVersion
delete ( currentObjs , watchTest . obj . Name )
} else {
currentObjs [ watchTest . obj . Name ] = out
}
if watchTest . expectEvent {
testCheckResult ( t , w , watch . Event { Type : watchTest . watchType , Object : expectObj } )
}
}
w . Stop ( )
testCheckStop ( t , w )
} )
}
}
// It tests watch of namespace-scoped resources.
func RunTestNamespaceScopedWatch ( ctx context . Context , t * testing . T , store storage . Interface ) {
tests := [ ] struct {
name string
// For watch request, the name of object is specified with field selector
// "metadata.name=objectName". So in this watch tests, we should set the
// requestedName and field selector "metadata.name=requestedName" at the
// same time or set neighter of them.
requestedName string
requestedNamespace string
recursive bool
fieldSelector fields . Selector
indexFields [ ] string
watchTests [ ] * testWatchStruct
} {
{
name : "namespaced watch, request without name, request without namespace, without field selector" ,
recursive : true ,
fieldSelector : fields . Everything ( ) ,
watchTests : [ ] * testWatchStruct {
{ baseNamespacedPod ( "t1-foo1" , "t1-ns1" ) , true , watch . Added } ,
{ baseNamespacedPod ( "t1-foo2" , "t1-ns2" ) , true , watch . Added } ,
{ baseNamespacedPodUpdated ( "t1-foo1" , "t1-ns1" ) , true , watch . Modified } ,
{ baseNamespacedPodUpdated ( "t1-foo2" , "t1-ns2" ) , true , watch . Modified } ,
} ,
} ,
{
name : "namespaced watch, request without name, request without namespace, field selector with metadata.namespace" ,
recursive : true ,
fieldSelector : fields . ParseSelectorOrDie ( "metadata.namespace=t2-ns1" ) ,
watchTests : [ ] * testWatchStruct {
{ baseNamespacedPod ( "t2-foo1" , "t2-ns1" ) , true , watch . Added } ,
{ baseNamespacedPod ( "t2-foo1" , "t2-ns2" ) , false , "" } ,
{ baseNamespacedPodUpdated ( "t2-foo1" , "t2-ns1" ) , true , watch . Modified } ,
{ baseNamespacedPodUpdated ( "t2-foo1" , "t2-ns2" ) , false , "" } ,
} ,
} ,
{
name : "namespaced watch, request without name, request without namespace, field selector with spec.nodename" ,
recursive : true ,
fieldSelector : fields . ParseSelectorOrDie ( "spec.nodeName=t3-bar1" ) ,
indexFields : [ ] string { "spec.nodeName" } ,
watchTests : [ ] * testWatchStruct {
{ baseNamespacedPod ( "t3-foo1" , "t3-ns1" ) , false , "" } ,
{ baseNamespacedPod ( "t3-foo2" , "t3-ns2" ) , false , "" } ,
{ baseNamespacedPodAssigned ( "t3-foo1" , "t3-ns1" , "t3-bar1" ) , true , watch . Added } ,
{ baseNamespacedPodAssigned ( "t3-foo2" , "t3-ns2" , "t3-bar1" ) , true , watch . Added } ,
} ,
} ,
{
name : "namespaced watch, request without name, request without namespace, field selector with spec.nodename to filter out watch" ,
recursive : true ,
fieldSelector : fields . ParseSelectorOrDie ( "spec.nodeName!=t4-bar1" ) ,
indexFields : [ ] string { "spec.nodeName" } ,
watchTests : [ ] * testWatchStruct {
{ baseNamespacedPod ( "t4-foo1" , "t4-ns1" ) , true , watch . Added } ,
{ baseNamespacedPod ( "t4-foo2" , "t4-ns1" ) , true , watch . Added } ,
{ baseNamespacedPodUpdated ( "t4-foo1" , "t4-ns1" ) , true , watch . Modified } ,
{ baseNamespacedPodAssigned ( "t4-foo1" , "t4-ns1" , "t4-bar1" ) , true , watch . Deleted } ,
} ,
} ,
{
name : "namespaced watch, request without name, request with namespace, without field selector" ,
requestedNamespace : "t5-ns1" ,
recursive : true ,
fieldSelector : fields . Everything ( ) ,
watchTests : [ ] * testWatchStruct {
{ baseNamespacedPod ( "t5-foo1" , "t5-ns1" ) , true , watch . Added } ,
{ baseNamespacedPod ( "t5-foo1" , "t5-ns2" ) , false , "" } ,
{ baseNamespacedPod ( "t5-foo2" , "t5-ns1" ) , true , watch . Added } ,
{ baseNamespacedPodUpdated ( "t5-foo1" , "t5-ns1" ) , true , watch . Modified } ,
{ baseNamespacedPodUpdated ( "t5-foo1" , "t5-ns2" ) , false , "" } ,
} ,
} ,
{
name : "namespaced watch, request without name, request with namespace, field selector with matched metadata.namespace" ,
requestedNamespace : "t6-ns1" ,
recursive : true ,
fieldSelector : fields . ParseSelectorOrDie ( "metadata.namespace=t6-ns1" ) ,
watchTests : [ ] * testWatchStruct {
{ baseNamespacedPod ( "t6-foo1" , "t6-ns1" ) , true , watch . Added } ,
{ baseNamespacedPod ( "t6-foo1" , "t6-ns2" ) , false , "" } ,
{ baseNamespacedPodUpdated ( "t6-foo1" , "t6-ns1" ) , true , watch . Modified } ,
} ,
} ,
{
name : "namespaced watch, request without name, request with namespace, field selector with non-matched metadata.namespace" ,
requestedNamespace : "t7-ns1" ,
recursive : true ,
fieldSelector : fields . ParseSelectorOrDie ( "metadata.namespace=t7-ns2" ) ,
watchTests : [ ] * testWatchStruct {
{ baseNamespacedPod ( "t7-foo1" , "t7-ns1" ) , false , "" } ,
{ baseNamespacedPod ( "t7-foo1" , "t7-ns2" ) , false , "" } ,
{ baseNamespacedPodUpdated ( "t7-foo1" , "t7-ns1" ) , false , "" } ,
{ baseNamespacedPodUpdated ( "t7-foo1" , "t7-ns2" ) , false , "" } ,
} ,
} ,
{
name : "namespaced watch, request without name, request with namespace, field selector with spec.nodename" ,
requestedNamespace : "t8-ns1" ,
recursive : true ,
fieldSelector : fields . ParseSelectorOrDie ( "spec.nodeName=t8-bar2" ) ,
indexFields : [ ] string { "spec.nodeName" } ,
watchTests : [ ] * testWatchStruct {
{ baseNamespacedPod ( "t8-foo1" , "t8-ns1" ) , false , "" } ,
{ baseNamespacedPodAssigned ( "t8-foo1" , "t8-ns1" , "t8-bar1" ) , false , "" } ,
{ baseNamespacedPodAssigned ( "t8-foo1" , "t8-ns2" , "t8-bar2" ) , false , "" } ,
{ baseNamespacedPodAssigned ( "t8-foo1" , "t8-ns1" , "t8-bar2" ) , true , watch . Added } ,
} ,
} ,
{
name : "namespaced watch, request without name, request with namespace, field selector with spec.nodename to filter out watch" ,
requestedNamespace : "t9-ns2" ,
recursive : true ,
fieldSelector : fields . ParseSelectorOrDie ( "spec.nodeName!=t9-bar1" ) ,
indexFields : [ ] string { "spec.nodeName" } ,
watchTests : [ ] * testWatchStruct {
{ baseNamespacedPod ( "t9-foo1" , "t9-ns1" ) , false , "" } ,
{ baseNamespacedPod ( "t9-foo1" , "t9-ns2" ) , true , watch . Added } ,
{ baseNamespacedPodAssigned ( "t9-foo1" , "t9-ns2" , "t9-bar1" ) , true , watch . Deleted } ,
{ baseNamespacedPodAssigned ( "t9-foo1" , "t9-ns2" , "t9-bar2" ) , true , watch . Added } ,
} ,
} ,
{
name : "namespaced watch, request with name, request without namespace, field selector with metadata.name" ,
requestedName : "t10-foo1" ,
recursive : true ,
fieldSelector : fields . ParseSelectorOrDie ( "metadata.name=t10-foo1" ) ,
watchTests : [ ] * testWatchStruct {
{ baseNamespacedPod ( "t10-foo1" , "t10-ns1" ) , true , watch . Added } ,
{ baseNamespacedPod ( "t10-foo1" , "t10-ns2" ) , true , watch . Added } ,
{ baseNamespacedPod ( "t10-foo2" , "t10-ns1" ) , false , "" } ,
{ baseNamespacedPodUpdated ( "t10-foo1" , "t10-ns1" ) , true , watch . Modified } ,
{ baseNamespacedPodAssigned ( "t10-foo1" , "t10-ns1" , "t10-bar1" ) , true , watch . Modified } ,
} ,
} ,
{
name : "namespaced watch, request with name, request without namespace, field selector with metadata.name and metadata.namespace" ,
requestedName : "t11-foo1" ,
recursive : true ,
fieldSelector : fields . SelectorFromSet ( fields . Set {
"metadata.name" : "t11-foo1" ,
"metadata.namespace" : "t11-ns1" ,
} ) ,
watchTests : [ ] * testWatchStruct {
{ baseNamespacedPod ( "t11-foo1" , "t11-ns1" ) , true , watch . Added } ,
{ baseNamespacedPod ( "t11-foo2" , "t11-ns1" ) , false , "" } ,
{ baseNamespacedPod ( "t11-foo1" , "t11-ns2" ) , false , "" } ,
{ baseNamespacedPodUpdated ( "t11-foo1" , "t11-ns1" ) , true , watch . Modified } ,
{ baseNamespacedPodAssigned ( "t11-foo1" , "t11-ns1" , "t11-bar1" ) , true , watch . Modified } ,
} ,
} ,
{
name : "namespaced watch, request with name, request without namespace, field selector with metadata.name and spec.nodeName" ,
requestedName : "t12-foo1" ,
recursive : true ,
fieldSelector : fields . SelectorFromSet ( fields . Set {
"metadata.name" : "t12-foo1" ,
"spec.nodeName" : "t12-bar1" ,
} ) ,
indexFields : [ ] string { "spec.nodeName" } ,
watchTests : [ ] * testWatchStruct {
{ baseNamespacedPod ( "t12-foo1" , "t12-ns1" ) , false , "" } ,
{ baseNamespacedPodUpdated ( "t12-foo1" , "t12-ns1" ) , false , "" } ,
{ baseNamespacedPodAssigned ( "t12-foo1" , "t12-ns1" , "t12-bar1" ) , true , watch . Added } ,
} ,
} ,
{
name : "namespaced watch, request with name, request without namespace, field selector with metadata.name, and with spec.nodeName to filter out watch" ,
requestedName : "t15-foo1" ,
recursive : true ,
fieldSelector : fields . AndSelectors (
fields . ParseSelectorOrDie ( "spec.nodeName!=t15-bar1" ) ,
fields . SelectorFromSet ( fields . Set { "metadata.name" : "t15-foo1" } ) ,
) ,
indexFields : [ ] string { "spec.nodeName" } ,
watchTests : [ ] * testWatchStruct {
{ baseNamespacedPod ( "t15-foo1" , "t15-ns1" ) , true , watch . Added } ,
{ baseNamespacedPod ( "t15-foo2" , "t15-ns1" ) , false , "" } ,
{ baseNamespacedPodUpdated ( "t15-foo1" , "t15-ns1" ) , true , watch . Modified } ,
{ baseNamespacedPodAssigned ( "t15-foo1" , "t15-ns1" , "t15-bar1" ) , true , watch . Deleted } ,
{ baseNamespacedPodAssigned ( "t15-foo1" , "t15-ns1" , "t15-bar2" ) , true , watch . Added } ,
} ,
} ,
{
name : "namespaced watch, request with name, request with namespace, with field selector metadata.name" ,
requestedName : "t16-foo1" ,
requestedNamespace : "t16-ns1" ,
fieldSelector : fields . ParseSelectorOrDie ( "metadata.name=t16-foo1" ) ,
watchTests : [ ] * testWatchStruct {
{ baseNamespacedPod ( "t16-foo1" , "t16-ns1" ) , true , watch . Added } ,
{ baseNamespacedPod ( "t16-foo2" , "t16-ns1" ) , false , "" } ,
{ baseNamespacedPodUpdated ( "t16-foo1" , "t16-ns1" ) , true , watch . Modified } ,
{ baseNamespacedPodAssigned ( "t16-foo1" , "t16-ns1" , "t16-bar1" ) , true , watch . Modified } ,
} ,
} ,
{
name : "namespaced watch, request with name, request with namespace, with field selector metadata.name and metadata.namespace" ,
requestedName : "t17-foo2" ,
requestedNamespace : "t17-ns1" ,
fieldSelector : fields . SelectorFromSet ( fields . Set {
"metadata.name" : "t17-foo2" ,
"metadata.namespace" : "t17-ns1" ,
} ) ,
watchTests : [ ] * testWatchStruct {
{ baseNamespacedPod ( "t17-foo1" , "t17-ns1" ) , false , "" } ,
{ baseNamespacedPod ( "t17-foo2" , "t17-ns1" ) , true , watch . Added } ,
{ baseNamespacedPodUpdated ( "t17-foo1" , "t17-ns1" ) , false , "" } ,
{ baseNamespacedPodAssigned ( "t17-foo2" , "t17-ns1" , "t17-bar1" ) , true , watch . Modified } ,
} ,
} ,
{
name : "namespaced watch, request with name, request with namespace, with field selector metadata.name, metadata.namespace and spec.nodename" ,
requestedName : "t18-foo1" ,
requestedNamespace : "t18-ns1" ,
fieldSelector : fields . SelectorFromSet ( fields . Set {
"metadata.name" : "t18-foo1" ,
"metadata.namespace" : "t18-ns1" ,
"spec.nodeName" : "t18-bar1" ,
} ) ,
indexFields : [ ] string { "spec.nodeName" } ,
watchTests : [ ] * testWatchStruct {
{ baseNamespacedPod ( "t18-foo1" , "t18-ns1" ) , false , "" } ,
{ baseNamespacedPod ( "t18-foo2" , "t18-ns1" ) , false , "" } ,
{ baseNamespacedPod ( "t18-foo1" , "t18-ns2" ) , false , "" } ,
{ baseNamespacedPodUpdated ( "t18-foo1" , "t18-ns1" ) , false , "" } ,
{ baseNamespacedPodAssigned ( "t18-foo1" , "t18-ns1" , "t18-bar1" ) , true , watch . Added } ,
} ,
} ,
{
name : "namespaced watch, request with name, request with namespace, with field selector metadata.name, metadata.namespace, and with spec.nodename to filter out watch" ,
requestedName : "t19-foo2" ,
requestedNamespace : "t19-ns1" ,
fieldSelector : fields . AndSelectors (
fields . ParseSelectorOrDie ( "spec.nodeName!=t19-bar1" ) ,
fields . SelectorFromSet ( fields . Set { "metadata.name" : "t19-foo2" , "metadata.namespace" : "t19-ns1" } ) ,
) ,
indexFields : [ ] string { "spec.nodeName" } ,
watchTests : [ ] * testWatchStruct {
{ baseNamespacedPod ( "t19-foo1" , "t19-ns1" ) , false , "" } ,
{ baseNamespacedPod ( "t19-foo2" , "t19-ns2" ) , false , "" } ,
{ baseNamespacedPod ( "t19-foo2" , "t19-ns1" ) , true , watch . Added } ,
{ baseNamespacedPodUpdated ( "t19-foo2" , "t19-ns1" ) , true , watch . Modified } ,
{ baseNamespacedPodAssigned ( "t19-foo2" , "t19-ns1" , "t19-bar1" ) , true , watch . Deleted } ,
} ,
} ,
}
for _ , tt := range tests {
t . Run ( tt . name , func ( t * testing . T ) {
requestInfo := & genericapirequest . RequestInfo { }
requestInfo . Name = tt . requestedName
requestInfo . Namespace = tt . requestedNamespace
ctx = genericapirequest . WithRequestInfo ( ctx , requestInfo )
ctx = genericapirequest . WithNamespace ( ctx , tt . requestedNamespace )
watchKey := KeyFunc ( tt . requestedNamespace , tt . requestedName )
predicate := createPodPredicate ( tt . fieldSelector , true , tt . indexFields )
list := & example . PodList { }
opts := storage . ListOptions {
ResourceVersion : "" ,
Predicate : predicate ,
Recursive : true ,
}
if err := store . GetList ( ctx , KeyFunc ( "" , "" ) , opts , list ) ; err != nil {
t . Errorf ( "Unexpected error: %v" , err )
}
opts . ResourceVersion = list . ResourceVersion
opts . Recursive = tt . recursive
w , err := store . Watch ( ctx , watchKey , opts )
if err != nil {
t . Fatalf ( "Watch failed: %v" , err )
}
currentObjs := map [ string ] * example . Pod { }
for _ , watchTest := range tt . watchTests {
out := & example . Pod { }
key := "pods/" + watchTest . obj . Namespace + "/" + watchTest . obj . Name
err := store . GuaranteedUpdate ( ctx , key , out , true , nil , storage . SimpleUpdate (
func ( runtime . Object ) ( runtime . Object , error ) {
obj := watchTest . obj . DeepCopy ( )
return obj , nil
} ) , nil )
if err != nil {
t . Fatalf ( "GuaranteedUpdate failed: %v" , err )
}
expectObj := out
podIdentifier := watchTest . obj . Namespace + "/" + watchTest . obj . Name
if watchTest . watchType == watch . Deleted {
expectObj = currentObjs [ podIdentifier ]
expectObj . ResourceVersion = out . ResourceVersion
delete ( currentObjs , podIdentifier )
} else {
currentObjs [ podIdentifier ] = out
}
if watchTest . expectEvent {
testCheckResult ( t , w , watch . Event { Type : watchTest . watchType , Object : expectObj } )
}
}
w . Stop ( )
testCheckStop ( t , w )
} )
}
}
// RunOptionalTestWatchDispatchBookmarkEvents tests whether bookmark events are sent.
// This feature is currently implemented in watch cache layer, so this is optional.
//
// TODO(#109831): ProgressNotify feature is effectively implementing the same
//
// functionality, so we should refactor this functionality to share the same input.
func RunTestWatchDispatchBookmarkEvents ( ctx context . Context , t * testing . T , store storage . Interface , expectedWatchBookmarks bool ) {
key , storedObj := testPropagateStore ( ctx , t , store , & example . Pod { ObjectMeta : metav1 . ObjectMeta { Name : "foo" , Namespace : "test-ns" } } )
startRV := storedObj . ResourceVersion
tests := [ ] struct {
name string
timeout time . Duration
expected bool
allowWatchBookmarks bool
} {
{ // test old client won't get Bookmark event
name : "allowWatchBookmarks=false" ,
timeout : 3 * time . Second ,
expected : false ,
allowWatchBookmarks : false ,
} ,
{
name : "allowWatchBookmarks=true" ,
timeout : 3 * time . Second ,
expected : expectedWatchBookmarks ,
allowWatchBookmarks : true ,
} ,
}
for i , tt := range tests {
t . Run ( tt . name , func ( t * testing . T ) {
pred := storage . Everything
pred . AllowWatchBookmarks = tt . allowWatchBookmarks
2024-07-18 07:47:47 -07:00
ctx , cancel := context . WithTimeout ( NewContext ( ) , tt . timeout )
2024-07-05 14:17:20 -04:00
defer cancel ( )
watcher , err := store . Watch ( ctx , key , storage . ListOptions { ResourceVersion : startRV , Predicate : pred } )
if err != nil {
t . Fatalf ( "Unexpected error: %v" , err )
}
defer watcher . Stop ( )
// Create events of pods in a different namespace
out := & example . Pod { }
obj := & example . Pod { ObjectMeta : metav1 . ObjectMeta { Name : "foo" , Namespace : fmt . Sprintf ( "other-ns-%d" , i ) } }
objKey := computePodKey ( obj )
if err := store . Create ( ctx , objKey , obj , out , 0 ) ; err != nil {
t . Fatalf ( "Create failed: %v" , err )
}
// Now wait for Bookmark event
select {
case event , ok := <- watcher . ResultChan ( ) :
if ! ok && tt . expected {
t . Errorf ( "Unexpected object watched (no objects)" )
}
if tt . expected && event . Type != watch . Bookmark {
t . Errorf ( "Unexpected object watched %#v" , event )
}
case <- time . After ( time . Second * 3 ) :
if tt . expected {
t . Errorf ( "Unexpected object watched (timeout)" )
}
}
} )
}
}
// RunOptionalTestWatchBookmarksWithCorrectResourceVersion tests whether bookmark events are
// sent with correct resource versions.
// This feature is currently implemented in watch cache layer, so this is optional.
//
// TODO(#109831): ProgressNotify feature is effectively implementing the same
//
// functionality, so we should refactor this functionality to share the same input.
func RunTestOptionalWatchBookmarksWithCorrectResourceVersion ( ctx context . Context , t * testing . T , store storage . Interface ) {
// Compute the initial resource version.
list := & example . PodList { }
storageOpts := storage . ListOptions {
Predicate : storage . Everything ,
Recursive : true ,
}
if err := store . GetList ( ctx , KeyFunc ( "" , "" ) , storageOpts , list ) ; err != nil {
t . Errorf ( "Unexpected error: %v" , err )
}
startRV := list . ResourceVersion
key := KeyFunc ( "test-ns" , "" )
pred := storage . Everything
pred . AllowWatchBookmarks = true
2024-07-18 07:47:47 -07:00
ctx , cancel := context . WithTimeout ( NewContext ( ) , 3 * time . Second )
2024-07-05 14:17:20 -04:00
defer cancel ( )
watcher , err := store . Watch ( ctx , key , storage . ListOptions { ResourceVersion : startRV , Predicate : pred , Recursive : true } )
if err != nil {
t . Fatalf ( "Unexpected error: %v" , err )
}
defer watcher . Stop ( )
done := make ( chan struct { } )
errc := make ( chan error , 1 )
var wg sync . WaitGroup
wg . Add ( 1 )
// We must wait for the waitgroup to exit before we terminate the cache or the server in prior defers.
defer wg . Wait ( )
// Call close first, so the goroutine knows to exit.
defer close ( done )
go func ( ) {
defer wg . Done ( )
for i := 0 ; i < 100 ; i ++ {
select {
case <- done :
return
default :
out := & example . Pod { }
pod := & example . Pod {
ObjectMeta : metav1 . ObjectMeta {
Name : fmt . Sprintf ( "foo-%d" , i ) ,
Namespace : "test-ns" ,
} ,
}
podKey := computePodKey ( pod )
if err := store . Create ( ctx , podKey , pod , out , 0 ) ; err != nil {
errc <- fmt . Errorf ( "failed to create pod %v: %v" , pod , err )
return
}
time . Sleep ( 10 * time . Millisecond )
}
}
} ( )
bookmarkReceived := false
lastObservedResourceVersion := uint64 ( 0 )
for {
select {
case err := <- errc :
t . Fatal ( err )
case event , ok := <- watcher . ResultChan ( ) :
if ! ok {
// Make sure we have received a bookmark event
if ! bookmarkReceived {
t . Fatalf ( "Unpexected error, we did not received a bookmark event" )
}
return
}
rv , err := storage . APIObjectVersioner { } . ObjectResourceVersion ( event . Object )
if err != nil {
t . Fatalf ( "failed to parse resourceVersion from %#v" , event )
}
if event . Type == watch . Bookmark {
bookmarkReceived = true
// bookmark event has a RV greater than or equal to the before one
if rv < lastObservedResourceVersion {
t . Fatalf ( "Unexpected bookmark resourceVersion %v less than observed %v)" , rv , lastObservedResourceVersion )
}
} else {
// non-bookmark event has a RV greater than anything before
if rv <= lastObservedResourceVersion {
t . Fatalf ( "Unexpected event resourceVersion %v less than or equal to bookmark %v)" , rv , lastObservedResourceVersion )
}
}
lastObservedResourceVersion = rv
}
}
}
// RunSendInitialEventsBackwardCompatibility test backward compatibility
// when SendInitialEvents option is set against various implementations.
// Backward compatibility is defined as RV = "" || RV = "O" and AllowWatchBookmark is set to false.
// In that case we expect a watch request to be established.
func RunSendInitialEventsBackwardCompatibility ( ctx context . Context , t * testing . T , store storage . Interface ) {
opts := storage . ListOptions { Predicate : storage . Everything }
opts . SendInitialEvents = ptr . To ( true )
w , err := store . Watch ( ctx , KeyFunc ( "" , "" ) , opts )
require . NoError ( t , err )
w . Stop ( )
}
// RunWatchSemantics test the following cases:
//
// +-----------------+---------------------+-------------------+
// | ResourceVersion | AllowWatchBookmarks | SendInitialEvents |
// +=================+=====================+===================+
// | Unset | true/false | true/false |
// | 0 | true/false | true/false |
// | 1 | true/false | true/false |
// | Current | true/false | true/false |
// +-----------------+---------------------+-------------------+
// where:
// - false indicates the value of the param was set to "false" by a test case
// - true indicates the value of the param was set to "true" by a test case
func RunWatchSemantics ( ctx context . Context , t * testing . T , store storage . Interface ) {
trueVal , falseVal := true , false
addEventsFromCreatedPods := func ( createdInitialPods [ ] * example . Pod ) [ ] watch . Event {
var ret [ ] watch . Event
for _ , createdPod := range createdInitialPods {
ret = append ( ret , watch . Event { Type : watch . Added , Object : createdPod } )
}
return ret
}
initialEventsEndFromLastCreatedPod := func ( createdInitialPods [ ] * example . Pod ) watch . Event {
return watch . Event {
Type : watch . Bookmark ,
Object : & example . Pod {
ObjectMeta : metav1 . ObjectMeta {
ResourceVersion : createdInitialPods [ len ( createdInitialPods ) - 1 ] . ResourceVersion ,
Annotations : map [ string ] string { "k8s.io/initial-events-end" : "true" } ,
} ,
} ,
}
}
scenarios := [ ] struct {
name string
allowWatchBookmarks bool
sendInitialEvents * bool
resourceVersion string
// useCurrentRV if set gets the current RV from the storage
// after adding the initial pods which is then used to establish a new watch request
useCurrentRV bool
initialPods [ ] * example . Pod
podsAfterEstablishingWatch [ ] * example . Pod
expectedInitialEventsInRandomOrder func ( createdInitialPods [ ] * example . Pod ) [ ] watch . Event
expectedInitialEventsInStrictOrder func ( createdInitialPods [ ] * example . Pod ) [ ] watch . Event
expectedEventsAfterEstablishingWatch func ( createdPodsAfterWatch [ ] * example . Pod ) [ ] watch . Event
} {
{
name : "allowWatchBookmarks=true, sendInitialEvents=true, RV=unset" ,
allowWatchBookmarks : true ,
sendInitialEvents : & trueVal ,
initialPods : [ ] * example . Pod { makePod ( "1" ) , makePod ( "2" ) , makePod ( "3" ) } ,
expectedInitialEventsInRandomOrder : addEventsFromCreatedPods ,
expectedInitialEventsInStrictOrder : func ( createdInitialPods [ ] * example . Pod ) [ ] watch . Event {
return [ ] watch . Event { initialEventsEndFromLastCreatedPod ( createdInitialPods ) }
} ,
podsAfterEstablishingWatch : [ ] * example . Pod { makePod ( "4" ) , makePod ( "5" ) } ,
expectedEventsAfterEstablishingWatch : addEventsFromCreatedPods ,
} ,
{
name : "allowWatchBookmarks=true, sendInitialEvents=false, RV=unset" ,
allowWatchBookmarks : true ,
sendInitialEvents : & falseVal ,
initialPods : [ ] * example . Pod { makePod ( "1" ) , makePod ( "2" ) , makePod ( "3" ) } ,
podsAfterEstablishingWatch : [ ] * example . Pod { makePod ( "4" ) , makePod ( "5" ) } ,
expectedEventsAfterEstablishingWatch : addEventsFromCreatedPods ,
} ,
{
name : "allowWatchBookmarks=false, sendInitialEvents=false, RV=unset" ,
sendInitialEvents : & falseVal ,
initialPods : [ ] * example . Pod { makePod ( "1" ) , makePod ( "2" ) , makePod ( "3" ) } ,
podsAfterEstablishingWatch : [ ] * example . Pod { makePod ( "4" ) , makePod ( "5" ) } ,
expectedEventsAfterEstablishingWatch : addEventsFromCreatedPods ,
} ,
{
name : "allowWatchBookmarks=false, sendInitialEvents=true, RV=unset" ,
sendInitialEvents : & trueVal ,
initialPods : [ ] * example . Pod { makePod ( "1" ) , makePod ( "2" ) , makePod ( "3" ) } ,
expectedInitialEventsInRandomOrder : addEventsFromCreatedPods ,
podsAfterEstablishingWatch : [ ] * example . Pod { makePod ( "4" ) , makePod ( "5" ) } ,
expectedEventsAfterEstablishingWatch : addEventsFromCreatedPods ,
} ,
{
name : "allowWatchBookmarks=true, sendInitialEvents=true, RV=0" ,
allowWatchBookmarks : true ,
sendInitialEvents : & trueVal ,
resourceVersion : "0" ,
initialPods : [ ] * example . Pod { makePod ( "1" ) , makePod ( "2" ) , makePod ( "3" ) } ,
expectedInitialEventsInRandomOrder : addEventsFromCreatedPods ,
expectedInitialEventsInStrictOrder : func ( createdInitialPods [ ] * example . Pod ) [ ] watch . Event {
return [ ] watch . Event { initialEventsEndFromLastCreatedPod ( createdInitialPods ) }
} ,
podsAfterEstablishingWatch : [ ] * example . Pod { makePod ( "4" ) , makePod ( "5" ) } ,
expectedEventsAfterEstablishingWatch : addEventsFromCreatedPods ,
} ,
{
name : "allowWatchBookmarks=true, sendInitialEvents=false, RV=0" ,
allowWatchBookmarks : true ,
sendInitialEvents : & falseVal ,
resourceVersion : "0" ,
initialPods : [ ] * example . Pod { makePod ( "1" ) , makePod ( "2" ) , makePod ( "3" ) } ,
podsAfterEstablishingWatch : [ ] * example . Pod { makePod ( "4" ) , makePod ( "5" ) } ,
expectedEventsAfterEstablishingWatch : addEventsFromCreatedPods ,
} ,
{
name : "allowWatchBookmarks=false, sendInitialEvents=false, RV=0" ,
sendInitialEvents : & falseVal ,
resourceVersion : "0" ,
initialPods : [ ] * example . Pod { makePod ( "1" ) , makePod ( "2" ) , makePod ( "3" ) } ,
podsAfterEstablishingWatch : [ ] * example . Pod { makePod ( "4" ) , makePod ( "5" ) } ,
expectedEventsAfterEstablishingWatch : addEventsFromCreatedPods ,
} ,
{
name : "allowWatchBookmarks=false, sendInitialEvents=true, RV=0" ,
sendInitialEvents : & trueVal ,
resourceVersion : "0" ,
initialPods : [ ] * example . Pod { makePod ( "1" ) , makePod ( "2" ) , makePod ( "3" ) } ,
expectedInitialEventsInRandomOrder : addEventsFromCreatedPods ,
podsAfterEstablishingWatch : [ ] * example . Pod { makePod ( "4" ) , makePod ( "5" ) } ,
expectedEventsAfterEstablishingWatch : addEventsFromCreatedPods ,
} ,
{
name : "allowWatchBookmarks=true, sendInitialEvents=true, RV=1" ,
allowWatchBookmarks : true ,
sendInitialEvents : & trueVal ,
resourceVersion : "1" ,
initialPods : [ ] * example . Pod { makePod ( "1" ) , makePod ( "2" ) , makePod ( "3" ) } ,
expectedInitialEventsInRandomOrder : addEventsFromCreatedPods ,
expectedInitialEventsInStrictOrder : func ( createdInitialPods [ ] * example . Pod ) [ ] watch . Event {
return [ ] watch . Event { initialEventsEndFromLastCreatedPod ( createdInitialPods ) }
} ,
podsAfterEstablishingWatch : [ ] * example . Pod { makePod ( "4" ) , makePod ( "5" ) } ,
expectedEventsAfterEstablishingWatch : addEventsFromCreatedPods ,
} ,
{
name : "allowWatchBookmarks=true, sendInitialEvents=false, RV=1" ,
allowWatchBookmarks : true ,
sendInitialEvents : & falseVal ,
resourceVersion : "1" ,
initialPods : [ ] * example . Pod { makePod ( "1" ) , makePod ( "2" ) , makePod ( "3" ) } ,
expectedInitialEventsInStrictOrder : addEventsFromCreatedPods ,
podsAfterEstablishingWatch : [ ] * example . Pod { makePod ( "4" ) , makePod ( "5" ) } ,
expectedEventsAfterEstablishingWatch : addEventsFromCreatedPods ,
} ,
{
name : "allowWatchBookmarks=false, sendInitialEvents=false, RV=1" ,
sendInitialEvents : & falseVal ,
resourceVersion : "1" ,
initialPods : [ ] * example . Pod { makePod ( "1" ) , makePod ( "2" ) , makePod ( "3" ) } ,
expectedInitialEventsInStrictOrder : addEventsFromCreatedPods ,
podsAfterEstablishingWatch : [ ] * example . Pod { makePod ( "4" ) , makePod ( "5" ) } ,
expectedEventsAfterEstablishingWatch : addEventsFromCreatedPods ,
} ,
{
name : "allowWatchBookmarks=false, sendInitialEvents=true, RV=1" ,
sendInitialEvents : & trueVal ,
resourceVersion : "1" ,
initialPods : [ ] * example . Pod { makePod ( "1" ) , makePod ( "2" ) , makePod ( "3" ) } ,
expectedInitialEventsInRandomOrder : addEventsFromCreatedPods ,
podsAfterEstablishingWatch : [ ] * example . Pod { makePod ( "4" ) , makePod ( "5" ) } ,
expectedEventsAfterEstablishingWatch : addEventsFromCreatedPods ,
} ,
{
name : "allowWatchBookmarks=true, sendInitialEvents=true, RV=useCurrentRV" ,
allowWatchBookmarks : true ,
sendInitialEvents : & trueVal ,
useCurrentRV : true ,
initialPods : [ ] * example . Pod { makePod ( "1" ) , makePod ( "2" ) , makePod ( "3" ) } ,
expectedInitialEventsInRandomOrder : addEventsFromCreatedPods ,
expectedInitialEventsInStrictOrder : func ( createdInitialPods [ ] * example . Pod ) [ ] watch . Event {
return [ ] watch . Event { initialEventsEndFromLastCreatedPod ( createdInitialPods ) }
} ,
podsAfterEstablishingWatch : [ ] * example . Pod { makePod ( "4" ) , makePod ( "5" ) } ,
expectedEventsAfterEstablishingWatch : addEventsFromCreatedPods ,
} ,
{
name : "allowWatchBookmarks=true, sendInitialEvents=false, RV=useCurrentRV" ,
allowWatchBookmarks : true ,
sendInitialEvents : & falseVal ,
useCurrentRV : true ,
initialPods : [ ] * example . Pod { makePod ( "1" ) , makePod ( "2" ) , makePod ( "3" ) } ,
podsAfterEstablishingWatch : [ ] * example . Pod { makePod ( "4" ) , makePod ( "5" ) } ,
expectedEventsAfterEstablishingWatch : addEventsFromCreatedPods ,
} ,
{
name : "allowWatchBookmarks=false, sendInitialEvents=false, RV=useCurrentRV" ,
sendInitialEvents : & falseVal ,
useCurrentRV : true ,
initialPods : [ ] * example . Pod { makePod ( "1" ) , makePod ( "2" ) , makePod ( "3" ) } ,
podsAfterEstablishingWatch : [ ] * example . Pod { makePod ( "4" ) , makePod ( "5" ) } ,
expectedEventsAfterEstablishingWatch : addEventsFromCreatedPods ,
} ,
{
name : "allowWatchBookmarks=false, sendInitialEvents=true, RV=useCurrentRV" ,
sendInitialEvents : & trueVal ,
useCurrentRV : true ,
initialPods : [ ] * example . Pod { makePod ( "1" ) , makePod ( "2" ) , makePod ( "3" ) } ,
expectedInitialEventsInRandomOrder : addEventsFromCreatedPods ,
podsAfterEstablishingWatch : [ ] * example . Pod { makePod ( "4" ) , makePod ( "5" ) } ,
expectedEventsAfterEstablishingWatch : addEventsFromCreatedPods ,
} ,
2024-10-07 04:01:53 -04:00
// Not Supported by unistore because there is no way to differentiate between:
// - SendInitialEvents=nil && resourceVersion=0
// - sendInitialEvents=false && resourceVersion=0
// This is a Legacy feature in k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go#196
// {
// name: "legacy, RV=0",
// resourceVersion: "0",
// initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
// expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
// podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
// expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
// },
// {
// name: "legacy, RV=unset",
// initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
// expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
// podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
// expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
// },
2024-07-05 14:17:20 -04:00
}
for idx , scenario := range scenarios {
t . Run ( scenario . name , func ( t * testing . T ) {
// set up env
2024-08-08 11:39:44 -04:00
defer featuregatetesting . SetFeatureGateDuringTest ( t , utilfeature . DefaultFeatureGate , features . WatchList , true )
2024-07-05 14:17:20 -04:00
if scenario . expectedInitialEventsInStrictOrder == nil {
scenario . expectedInitialEventsInStrictOrder = func ( _ [ ] * example . Pod ) [ ] watch . Event { return nil }
}
if scenario . expectedInitialEventsInRandomOrder == nil {
scenario . expectedInitialEventsInRandomOrder = func ( _ [ ] * example . Pod ) [ ] watch . Event { return nil }
}
if scenario . expectedEventsAfterEstablishingWatch == nil {
scenario . expectedEventsAfterEstablishingWatch = func ( _ [ ] * example . Pod ) [ ] watch . Event { return nil }
}
var createdPods [ ] * example . Pod
ns := fmt . Sprintf ( "ns-%v" , idx )
for _ , obj := range scenario . initialPods {
obj . Namespace = ns
out := & example . Pod { }
err := store . Create ( ctx , computePodKey ( obj ) , obj , out , 0 )
require . NoError ( t , err , "failed to add a pod: %v" , obj )
createdPods = append ( createdPods , out )
}
if scenario . useCurrentRV {
currentStorageRV , err := storage . GetCurrentResourceVersionFromStorage ( ctx , store , func ( ) runtime . Object { return & example . PodList { } } , KeyFunc ( "" , "" ) , "" )
require . NoError ( t , err )
scenario . resourceVersion = fmt . Sprintf ( "%d" , currentStorageRV )
}
opts := storage . ListOptions { Predicate : storage . Everything , Recursive : true }
opts . SendInitialEvents = scenario . sendInitialEvents
opts . Predicate . AllowWatchBookmarks = scenario . allowWatchBookmarks
if len ( scenario . resourceVersion ) > 0 {
opts . ResourceVersion = scenario . resourceVersion
}
2024-07-18 07:47:47 -07:00
w , err := store . Watch ( NewContext ( ) , KeyFunc ( ns , "" ) , opts )
2024-07-05 14:17:20 -04:00
require . NoError ( t , err , "failed to create watch: %v" )
defer w . Stop ( )
// make sure we only get initial events
testCheckResultsInRandomOrder ( t , w , scenario . expectedInitialEventsInRandomOrder ( createdPods ) )
testCheckResultsInStrictOrder ( t , w , scenario . expectedInitialEventsInStrictOrder ( createdPods ) )
testCheckNoMoreResults ( t , w )
createdPods = [ ] * example . Pod { }
// add a pod that is greater than the storage's RV when the watch was started
for _ , obj := range scenario . podsAfterEstablishingWatch {
obj . Namespace = ns
out := & example . Pod { }
err = store . Create ( ctx , computePodKey ( obj ) , obj , out , 0 )
require . NoError ( t , err , "failed to add a pod: %v" )
createdPods = append ( createdPods , out )
}
testCheckResultsInStrictOrder ( t , w , scenario . expectedEventsAfterEstablishingWatch ( createdPods ) )
2024-07-18 07:47:47 -07:00
2024-07-05 14:17:20 -04:00
testCheckNoMoreResults ( t , w )
} )
}
}
// RunWatchSemanticInitialEventsExtended checks if the bookmark event
// marking the end of the list stream contains the global RV.
//
// note that this scenario differs from the one in RunWatchSemantics
// by adding the pod to a different ns to advance the global RV
func RunWatchSemanticInitialEventsExtended ( ctx context . Context , t * testing . T , store storage . Interface ) {
trueVal := true
expectedInitialEventsInStrictOrder := func ( firstPod , secondPod * example . Pod ) [ ] watch . Event {
return [ ] watch . Event {
{ Type : watch . Added , Object : firstPod } ,
{ Type : watch . Bookmark , Object : & example . Pod {
ObjectMeta : metav1 . ObjectMeta {
ResourceVersion : secondPod . ResourceVersion ,
Annotations : map [ string ] string { "k8s.io/initial-events-end" : "true" } ,
} ,
} } ,
}
}
2024-08-08 11:39:44 -04:00
defer featuregatetesting . SetFeatureGateDuringTest ( t , utilfeature . DefaultFeatureGate , features . WatchList , true )
2024-07-05 14:17:20 -04:00
ns := "ns-foo"
pod := makePod ( "1" )
pod . Namespace = ns
firstPod := & example . Pod { }
err := store . Create ( ctx , computePodKey ( pod ) , pod , firstPod , 0 )
require . NoError ( t , err , "failed to add a pod: %v" )
// add the pod to a different ns to advance the global RV
pod = makePod ( "2" )
pod . Namespace = "other-ns-foo"
secondPod := & example . Pod { }
err = store . Create ( ctx , computePodKey ( pod ) , pod , secondPod , 0 )
require . NoError ( t , err , "failed to add a pod: %v" )
opts := storage . ListOptions { Predicate : storage . Everything , Recursive : true }
opts . SendInitialEvents = & trueVal
opts . Predicate . AllowWatchBookmarks = true
2024-07-18 07:47:47 -07:00
w , err := store . Watch ( NewContext ( ) , KeyFunc ( ns , "" ) , opts )
2024-07-05 14:17:20 -04:00
require . NoError ( t , err , "failed to create watch: %v" )
defer w . Stop ( )
// make sure we only get initial events from the first ns
// followed by the bookmark with the global RV
testCheckResultsInStrictOrder ( t , w , expectedInitialEventsInStrictOrder ( firstPod , secondPod ) )
testCheckNoMoreResults ( t , w )
}
func makePod ( namePrefix string ) * example . Pod {
return & example . Pod {
ObjectMeta : metav1 . ObjectMeta {
Name : fmt . Sprintf ( "pod-%s" , namePrefix ) ,
} ,
}
}
type testWatchStruct struct {
obj * example . Pod
expectEvent bool
watchType watch . EventType
}
func createPodPredicate ( field fields . Selector , namespaceScoped bool , indexField [ ] string ) storage . SelectionPredicate {
return storage . SelectionPredicate {
Label : labels . Everything ( ) ,
Field : field ,
GetAttrs : determinePodGetAttrFunc ( namespaceScoped , indexField ) ,
IndexFields : indexField ,
}
}
func determinePodGetAttrFunc ( namespaceScoped bool , indexField [ ] string ) storage . AttrFunc {
if indexField != nil {
if namespaceScoped {
return namespacedScopedNodeNameAttrFunc
}
return clusterScopedNodeNameAttrFunc
}
if namespaceScoped {
return storage . DefaultNamespaceScopedAttr
}
return storage . DefaultClusterScopedAttr
}
func namespacedScopedNodeNameAttrFunc ( obj runtime . Object ) ( labels . Set , fields . Set , error ) {
pod := obj . ( * example . Pod )
return nil , fields . Set {
"spec.nodeName" : pod . Spec . NodeName ,
"metadata.name" : pod . ObjectMeta . Name ,
"metadata.namespace" : pod . ObjectMeta . Namespace ,
} , nil
}
func clusterScopedNodeNameAttrFunc ( obj runtime . Object ) ( labels . Set , fields . Set , error ) {
pod := obj . ( * example . Pod )
return nil , fields . Set {
"spec.nodeName" : pod . Spec . NodeName ,
"metadata.name" : pod . ObjectMeta . Name ,
} , nil
}
func basePod ( podName string ) * example . Pod {
return baseNamespacedPod ( podName , "" )
}
func basePodUpdated ( podName string ) * example . Pod {
return baseNamespacedPodUpdated ( podName , "" )
}
func basePodAssigned ( podName , nodeName string ) * example . Pod {
return baseNamespacedPodAssigned ( podName , "" , nodeName )
}
func baseNamespacedPod ( podName , namespace string ) * example . Pod {
return & example . Pod {
ObjectMeta : metav1 . ObjectMeta { Name : podName , Namespace : namespace } ,
}
}
func baseNamespacedPodUpdated ( podName , namespace string ) * example . Pod {
return & example . Pod {
ObjectMeta : metav1 . ObjectMeta { Name : podName , Namespace : namespace } ,
Status : example . PodStatus { Phase : "Running" } ,
}
}
func baseNamespacedPodAssigned ( podName , namespace , nodeName string ) * example . Pod {
return & example . Pod {
ObjectMeta : metav1 . ObjectMeta { Name : podName , Namespace : namespace } ,
Spec : example . PodSpec { NodeName : nodeName } ,
}
}