grafana/pkg/apiserver/storage/file/watcher_test.go

231 lines
6.8 KiB
Go

// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Kubernetes Authors.
package file
import (
"context"
"io/fs"
"os"
"path"
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/api/apitesting"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
storagetesting "k8s.io/apiserver/pkg/storage/testing"
)
var scheme = runtime.NewScheme()
var codecs = serializer.NewCodecFactory(scheme)
func init() {
metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion)
utilruntime.Must(example.AddToScheme(scheme))
utilruntime.Must(examplev1.AddToScheme(scheme))
}
type setupOptions struct {
codec runtime.Codec
newFunc func() runtime.Object
newListFunc func() runtime.Object
prefix string
resourcePrefix string
groupResource schema.GroupResource
}
type setupOption func(*setupOptions, testing.TB)
func withDefaults(options *setupOptions, t testing.TB) {
options.codec = apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
options.newFunc = newPod
options.newListFunc = newPodList
options.prefix = t.TempDir()
options.resourcePrefix = "/pods"
options.groupResource = schema.GroupResource{Resource: "pods"}
}
var _ setupOption = withDefaults
func testSetup(t testing.TB, opts ...setupOption) (context.Context, storage.Interface, factory.DestroyFunc, error) {
setupOpts := setupOptions{}
opts = append([]setupOption{withDefaults}, opts...)
for _, opt := range opts {
opt(&setupOpts, t)
}
config := storagebackend.NewDefaultConfig(setupOpts.prefix, setupOpts.codec)
store, destroyFunc, err := NewStorage(
config.ForResource(setupOpts.groupResource),
setupOpts.resourcePrefix,
func(obj runtime.Object) (string, error) {
return storage.NamespaceKeyFunc(setupOpts.resourcePrefix, obj)
},
setupOpts.newFunc,
setupOpts.newListFunc,
storage.DefaultNamespaceScopedAttr,
make(map[string]storage.IndexerFunc, 0),
nil,
)
// Some tests may start reading before writing
if err := os.MkdirAll(path.Join(setupOpts.prefix, "pods", "test-ns"), fs.ModePerm); err != nil {
return nil, nil, nil, err
}
if err != nil {
return nil, nil, nil, err
}
ctx := context.Background()
return ctx, store, destroyFunc, nil
}
func TestWatch(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunTestWatch(ctx, t, store)
}
func TestClusterScopedWatch(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunTestClusterScopedWatch(ctx, t, store)
}
func TestNamespaceScopedWatch(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunTestNamespaceScopedWatch(ctx, t, store)
}
func TestDeleteTriggerWatch(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunTestDeleteTriggerWatch(ctx, t, store)
}
func TestWatchFromZero(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunTestWatchFromZero(ctx, t, store, nil)
}
// TestWatchFromNonZero tests that
// - watch from non-0 should just watch changes after given version
func TestWatchFromNoneZero(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunTestWatchFromNonZero(ctx, t, store)
}
func TestDelayedWatchDelivery(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunTestDelayedWatchDelivery(ctx, t, store)
}
/* func TestWatchError(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunTestWatchError(ctx, t, &storeWithPrefixTransformer{store})
} */
func TestWatchContextCancel(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunTestWatchContextCancel(ctx, t, store)
}
func TestWatcherTimeout(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunTestWatcherTimeout(ctx, t, store)
}
func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunTestWatchDeleteEventObjectHaveLatestRV(ctx, t, store)
}
// TODO: enable when we support flow control and priority fairness
/* func TestWatchInitializationSignal(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunTestWatchInitializationSignal(ctx, t, store)
} */
/* func TestProgressNotify(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunOptionalTestProgressNotify(ctx, t, store)
} */
// TestWatchDispatchBookmarkEvents makes sure that
// setting allowWatchBookmarks query param against
// etcd implementation doesn't have any effect.
func TestWatchDispatchBookmarkEvents(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunTestWatchDispatchBookmarkEvents(ctx, t, store, false)
}
func TestSendInitialEventsBackwardCompatibility(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunSendInitialEventsBackwardCompatibility(ctx, t, store)
}
func TestEtcdWatchSemantics(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunWatchSemantics(ctx, t, store)
}
// TODO: determine if this test case is useful to pass
// If we simply generate Snowflakes for List RVs (when none is passed in) as opposed to maxRV calculation, it makes
// our watch implementation and comparing items against the requested RV much more reliable.
// There is no guarantee that maxRV+1 won't end up being a future item's RV.
/*
func TestEtcdWatchSemanticInitialEventsExtended(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunWatchSemanticInitialEventsExtended(ctx, t, store)
}
*/
func newPod() runtime.Object {
return &example.Pod{}
}
func newPodList() runtime.Object {
return &example.PodList{}
}