mirror of
https://github.com/grafana/grafana.git
synced 2025-01-09 15:43:23 -06:00
K8s: Generic watch tests (#90023)
This commit is contained in:
parent
e9fd191065
commit
5f9ce12542
@ -27,6 +27,8 @@ import (
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
|
||||
)
|
||||
|
||||
const MaxUpdateAttempts = 30
|
||||
@ -37,13 +39,6 @@ var _ storage.Interface = (*Storage)(nil)
|
||||
// When we upgrade to 1.29
|
||||
var errResourceVersionSetOnCreate = errors.New("resourceVersion should not be set on objects to be created")
|
||||
|
||||
type parsedKey struct {
|
||||
group string
|
||||
resource string
|
||||
namespace string
|
||||
name string
|
||||
}
|
||||
|
||||
// Storage implements storage.Interface and storage resources as JSON files on disk.
|
||||
type Storage struct {
|
||||
root string
|
||||
@ -291,14 +286,14 @@ func (s *Storage) Watch(ctx context.Context, key string, opts storage.ListOption
|
||||
return nil, apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
|
||||
}
|
||||
|
||||
parsedkey, err := s.convertToParsedKey(key, p)
|
||||
parsedkey, err := grafanaregistry.ParseKey(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var namespace *string
|
||||
if parsedkey.namespace != "" {
|
||||
namespace = &parsedkey.namespace
|
||||
if parsedkey.Namespace != "" {
|
||||
namespace = &parsedkey.Namespace
|
||||
}
|
||||
|
||||
if (opts.SendInitialEvents == nil && requestedRV == 0) || (opts.SendInitialEvents != nil && *opts.SendInitialEvents) {
|
||||
@ -391,10 +386,15 @@ func (s *Storage) Get(ctx context.Context, key string, opts storage.GetOptions,
|
||||
// No RV generation locking in single item get since its read from the disk
|
||||
fpath := s.filePath(key)
|
||||
|
||||
rv, err := s.versioner.ParseResourceVersion(opts.ResourceVersion)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Since it's a get, check if the dir exists and return early as needed
|
||||
dirname := filepath.Dir(fpath)
|
||||
if !exists(dirname) {
|
||||
return apierrors.NewNotFound(s.gr, s.nameFromKey(key))
|
||||
return storage.NewKeyNotFoundError(key, int64(rv))
|
||||
}
|
||||
|
||||
obj, err := readFile(s.codec, fpath, func() runtime.Object {
|
||||
@ -404,10 +404,6 @@ func (s *Storage) Get(ctx context.Context, key string, opts storage.GetOptions,
|
||||
if opts.IgnoreNotFound {
|
||||
return runtime.SetZeroValue(objPtr)
|
||||
}
|
||||
rv, err := s.versioner.ParseResourceVersion(opts.ResourceVersion)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return storage.NewKeyNotFoundError(key, int64(rv))
|
||||
}
|
||||
|
||||
@ -603,10 +599,6 @@ func (s *Storage) GuaranteedUpdate(
|
||||
|
||||
s.rvMutex.Lock()
|
||||
generatedRV := s.getNewResourceVersion()
|
||||
if err != nil {
|
||||
s.rvMutex.Unlock()
|
||||
return err
|
||||
}
|
||||
s.rvMutex.Unlock()
|
||||
|
||||
if err := s.versioner.UpdateObject(updatedObj, generatedRV); err != nil {
|
||||
@ -683,70 +675,6 @@ func (s *Storage) nameFromKey(key string) string {
|
||||
return strings.Replace(key, s.resourcePrefix+"/", "", 1)
|
||||
}
|
||||
|
||||
// While this is an inefficient way to differentiate the ambiguous keys,
|
||||
// we only need it for initial namespace calculation in watch
|
||||
// This helps us with watcher tests that don't always set up requestcontext correctly
|
||||
func (s *Storage) convertToParsedKey(key string, p storage.SelectionPredicate) (*parsedKey, error) {
|
||||
// NOTE: the following supports the watcher tests that run against v1/pods
|
||||
// Other than that, there are ambiguities in the key format that only field selector
|
||||
// when set to use metadata.name can be used to bring clarity in the 3-segment case
|
||||
|
||||
// Cases handled below:
|
||||
// namespace scoped:
|
||||
// /<group>/<resource>/[<namespace>]/[<name>]
|
||||
// /<group>/<resource>/[<namespace>]
|
||||
//
|
||||
// cluster scoped:
|
||||
// /<group>/<resource>/[<name>]
|
||||
// /<group>/<resource>
|
||||
parts := strings.SplitN(key, "/", 5)
|
||||
if len(parts) < 3 && s.gr.Group != "" {
|
||||
return nil, fmt.Errorf("invalid key (expecting at least 2 parts): %s", key)
|
||||
}
|
||||
|
||||
if len(parts) < 2 && s.gr.Group == "" {
|
||||
return nil, fmt.Errorf("invalid key (expecting at least 1 part): %s", key)
|
||||
}
|
||||
|
||||
// beware this empty "" as the first separated part for the rest of the parsing below
|
||||
if parts[0] != "" {
|
||||
return nil, fmt.Errorf("invalid key (expecting leading slash): %s", key)
|
||||
}
|
||||
|
||||
k := &parsedKey{}
|
||||
|
||||
// for v1/pods that tests use, Group is empty
|
||||
if len(parts) > 1 && s.gr.Group == "" {
|
||||
k.resource = parts[1]
|
||||
}
|
||||
|
||||
if len(parts) > 2 {
|
||||
// for v1/pods that tests use, Group is empty
|
||||
if parts[1] == s.gr.Resource {
|
||||
k.resource = parts[1]
|
||||
if _, found := p.Field.RequiresExactMatch("metadata.name"); !found {
|
||||
k.namespace = parts[2]
|
||||
}
|
||||
} else {
|
||||
k.group = parts[1]
|
||||
k.resource = parts[2]
|
||||
}
|
||||
}
|
||||
|
||||
if len(parts) > 3 {
|
||||
// for v1/pods that tests use, Group is empty
|
||||
if parts[1] == s.gr.Resource {
|
||||
k.name = parts[3]
|
||||
} else {
|
||||
if _, found := p.Field.RequiresExactMatch("metadata.name"); !found {
|
||||
k.namespace = parts[3]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return k, nil
|
||||
}
|
||||
|
||||
func copyModifiedObjectToDestination(updatedObj runtime.Object, destination runtime.Object) error {
|
||||
u, err := conversion.EnforcePtr(updatedObj)
|
||||
if err != nil {
|
||||
|
@ -71,7 +71,7 @@ func (r *RESTOptionsGetter) GetRESTOptions(resource schema.GroupResource) (gener
|
||||
DeleteCollectionWorkers: 0,
|
||||
EnableGarbageCollection: false,
|
||||
// k8s expects forward slashes here, we'll convert them to os path separators in the storage
|
||||
ResourcePrefix: "/" + resource.Group + "/" + resource.Resource,
|
||||
ResourcePrefix: "/group/" + resource.Group + "/resource/" + resource.Resource,
|
||||
CountMetricPollPeriod: 1 * time.Second,
|
||||
StorageObjectCountTracker: storageConfig.Config.StorageObjectCountTracker,
|
||||
}
|
||||
|
@ -14,6 +14,7 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"k8s.io/apimachinery/pkg/api/apitesting"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
@ -24,7 +25,8 @@ import (
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
|
||||
storagetesting "k8s.io/apiserver/pkg/storage/testing"
|
||||
|
||||
storagetesting "github.com/grafana/grafana/pkg/apiserver/storage/testing"
|
||||
)
|
||||
|
||||
var scheme = runtime.NewScheme()
|
||||
@ -52,7 +54,7 @@ func withDefaults(options *setupOptions, t testing.TB) {
|
||||
options.newFunc = newPod
|
||||
options.newListFunc = newPodList
|
||||
options.prefix = t.TempDir()
|
||||
options.resourcePrefix = "/pods"
|
||||
options.resourcePrefix = storagetesting.KeyFunc("", "")
|
||||
options.groupResource = schema.GroupResource{Resource: "pods"}
|
||||
}
|
||||
|
||||
@ -70,7 +72,11 @@ func testSetup(t testing.TB, opts ...setupOption) (context.Context, storage.Inte
|
||||
config.ForResource(setupOpts.groupResource),
|
||||
setupOpts.resourcePrefix,
|
||||
func(obj runtime.Object) (string, error) {
|
||||
return storage.NamespaceKeyFunc(setupOpts.resourcePrefix, obj)
|
||||
accessor, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return storagetesting.KeyFunc(accessor.GetNamespace(), accessor.GetName()), nil
|
||||
},
|
||||
setupOpts.newFunc,
|
||||
setupOpts.newListFunc,
|
||||
@ -80,7 +86,7 @@ func testSetup(t testing.TB, opts ...setupOption) (context.Context, storage.Inte
|
||||
)
|
||||
|
||||
// Some tests may start reading before writing
|
||||
if err := os.MkdirAll(path.Join(setupOpts.prefix, "pods", "test-ns"), fs.ModePerm); err != nil {
|
||||
if err := os.MkdirAll(path.Join(setupOpts.prefix, storagetesting.KeyFunc("test-ns", "")), fs.ModePerm); err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
@ -95,7 +101,6 @@ func TestWatch(t *testing.T) {
|
||||
ctx, store, destroyFunc, err := testSetup(t)
|
||||
defer destroyFunc()
|
||||
assert.NoError(t, err)
|
||||
|
||||
storagetesting.RunTestWatch(ctx, t, store)
|
||||
}
|
||||
|
||||
|
2691
pkg/apiserver/storage/testing/store_tests.go
Normal file
2691
pkg/apiserver/storage/testing/store_tests.go
Normal file
File diff suppressed because it is too large
Load Diff
378
pkg/apiserver/storage/testing/utils.go
Normal file
378
pkg/apiserver/storage/testing/utils.go
Normal file
@ -0,0 +1,378 @@
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
// Provenance-includes-location: https://github.com/kubernetes/apiserver/blob/master/pkg/storage/testing/utils.go
|
||||
// Provenance-includes-license: Apache-2.0
|
||||
// Provenance-includes-copyright: The Kubernetes Authors.
|
||||
|
||||
package testing
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"path"
|
||||
"reflect"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/apis/example"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
"k8s.io/apiserver/pkg/storage/value"
|
||||
|
||||
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
|
||||
)
|
||||
|
||||
// KeyFunc is a function that generates keys for tests.
|
||||
// All tests use the "pods" resource, so the resource is hardcoded to "pods".
|
||||
var KeyFunc = func(namespace, name string) string {
|
||||
k := grafanaregistry.Key{
|
||||
Resource: "pods",
|
||||
Namespace: namespace,
|
||||
Name: name,
|
||||
}
|
||||
return k.String()
|
||||
}
|
||||
|
||||
// CreateObjList will create a list from the array of objects.
|
||||
func CreateObjList(prefix string, helper storage.Interface, items []runtime.Object) error {
|
||||
for i := range items {
|
||||
obj := items[i]
|
||||
meta, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = helper.Create(context.Background(), path.Join(prefix, meta.GetName()), obj, obj, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
items[i] = obj
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// CreateList will properly create a list using the storage interface.
|
||||
func CreateList(prefix string, helper storage.Interface, list runtime.Object) error {
|
||||
items, err := meta.ExtractList(list)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = CreateObjList(prefix, helper, items)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return meta.SetList(list, items)
|
||||
}
|
||||
|
||||
// DeepEqualSafePodSpec returns an example.PodSpec safe for deep-equal operations.
|
||||
func DeepEqualSafePodSpec() example.PodSpec {
|
||||
grace := int64(30)
|
||||
return example.PodSpec{
|
||||
RestartPolicy: "Always",
|
||||
TerminationGracePeriodSeconds: &grace,
|
||||
SchedulerName: "default-scheduler",
|
||||
}
|
||||
}
|
||||
|
||||
func computePodKey(obj *example.Pod) string {
|
||||
return KeyFunc(obj.Namespace, obj.Name)
|
||||
}
|
||||
|
||||
// testPropagateStore helps propagates store with objects, automates key generation, and returns
|
||||
// keys and stored objects.
|
||||
func testPropagateStore(ctx context.Context, t *testing.T, store storage.Interface, obj *example.Pod) (string, *example.Pod) {
|
||||
// Setup store with a key and grab the output for returning.
|
||||
key := computePodKey(obj)
|
||||
|
||||
// Setup store with the specified key and grab the output for returning.
|
||||
err := store.Delete(ctx, key, &example.Pod{}, nil, storage.ValidateAllObjectFunc, nil)
|
||||
if err != nil && !storage.IsNotFound(err) {
|
||||
t.Fatalf("Cleanup failed: %v", err)
|
||||
}
|
||||
setOutput := &example.Pod{}
|
||||
if err := store.Create(ctx, key, obj, setOutput, 0); err != nil {
|
||||
t.Fatalf("Set failed: %v", err)
|
||||
}
|
||||
return key, setOutput
|
||||
}
|
||||
|
||||
func expectNoDiff(t *testing.T, msg string, expected, actual interface{}) {
|
||||
t.Helper()
|
||||
if !reflect.DeepEqual(expected, actual) {
|
||||
if diff := cmp.Diff(expected, actual); diff != "" {
|
||||
t.Errorf("%s: %s", msg, diff)
|
||||
} else {
|
||||
t.Errorf("%s:\nexpected: %#v\ngot: %#v", msg, expected, actual)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func ExpectContains(t *testing.T, msg string, expectedList []interface{}, got interface{}) {
|
||||
t.Helper()
|
||||
for _, expected := range expectedList {
|
||||
if reflect.DeepEqual(expected, got) {
|
||||
return
|
||||
}
|
||||
}
|
||||
if len(expectedList) == 0 {
|
||||
t.Errorf("%s: empty expectedList", msg)
|
||||
return
|
||||
}
|
||||
if diff := cmp.Diff(expectedList[0], got); diff != "" {
|
||||
t.Errorf("%s: differs from all items, with first: %s", msg, diff)
|
||||
} else {
|
||||
t.Errorf("%s: differs from all items, first: %#v\ngot: %#v", msg, expectedList[0], got)
|
||||
}
|
||||
}
|
||||
|
||||
const dummyPrefix = "adapter"
|
||||
|
||||
func encodeContinueOrDie(key string, resourceVersion int64) string {
|
||||
token, err := storage.EncodeContinue(dummyPrefix+key, dummyPrefix, resourceVersion)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return token
|
||||
}
|
||||
|
||||
func testCheckEventType(t *testing.T, w watch.Interface, expectEventType watch.EventType) {
|
||||
select {
|
||||
case res := <-w.ResultChan():
|
||||
if res.Type != expectEventType {
|
||||
t.Errorf("event type want=%v, get=%v", expectEventType, res.Type)
|
||||
}
|
||||
case <-time.After(wait.ForeverTestTimeout):
|
||||
t.Errorf("time out after waiting %v on ResultChan", wait.ForeverTestTimeout)
|
||||
}
|
||||
}
|
||||
|
||||
func testCheckResult(t *testing.T, w watch.Interface, expectEvent watch.Event) {
|
||||
testCheckResultFunc(t, w, func(actualEvent watch.Event) {
|
||||
expectNoDiff(t, "incorrect event", expectEvent, actualEvent)
|
||||
})
|
||||
}
|
||||
|
||||
func testCheckResultFunc(t *testing.T, w watch.Interface, check func(actualEvent watch.Event)) {
|
||||
select {
|
||||
case res := <-w.ResultChan():
|
||||
obj := res.Object
|
||||
if co, ok := obj.(runtime.CacheableObject); ok {
|
||||
res.Object = co.GetObject()
|
||||
}
|
||||
check(res)
|
||||
case <-time.After(wait.ForeverTestTimeout):
|
||||
t.Errorf("time out after waiting %v on ResultChan", wait.ForeverTestTimeout)
|
||||
}
|
||||
}
|
||||
|
||||
func testCheckStop(t *testing.T, w watch.Interface) {
|
||||
select {
|
||||
case e, ok := <-w.ResultChan():
|
||||
if ok {
|
||||
var obj string
|
||||
switch e.Object.(type) {
|
||||
case *example.Pod:
|
||||
obj = e.Object.(*example.Pod).Name
|
||||
case *v1.Status:
|
||||
obj = e.Object.(*v1.Status).Message
|
||||
}
|
||||
t.Errorf("ResultChan should have been closed. Event: %s. Object: %s", e.Type, obj)
|
||||
}
|
||||
case <-time.After(wait.ForeverTestTimeout):
|
||||
t.Errorf("time out after waiting 1s on ResultChan")
|
||||
}
|
||||
}
|
||||
|
||||
func testCheckResultsInStrictOrder(t *testing.T, w watch.Interface, expectedEvents []watch.Event) {
|
||||
for _, expectedEvent := range expectedEvents {
|
||||
testCheckResult(t, w, expectedEvent)
|
||||
}
|
||||
}
|
||||
|
||||
func testCheckResultsInRandomOrder(t *testing.T, w watch.Interface, expectedEvents []watch.Event) {
|
||||
for range expectedEvents {
|
||||
testCheckResultFunc(t, w, func(actualEvent watch.Event) {
|
||||
ExpectContains(t, "unexpected event", toInterfaceSlice(expectedEvents), actualEvent)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testCheckNoMoreResults(t *testing.T, w watch.Interface) {
|
||||
select {
|
||||
case e := <-w.ResultChan():
|
||||
t.Errorf("Unexpected: %#v event received, expected no events", e)
|
||||
case <-time.After(time.Second):
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func toInterfaceSlice[T any](s []T) []interface{} {
|
||||
result := make([]interface{}, len(s))
|
||||
for i, v := range s {
|
||||
result[i] = v
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// resourceVersionNotOlderThan returns a function to validate resource versions. Resource versions
|
||||
// referring to points in logical time before the sentinel generate an error. All logical times as
|
||||
// new as the sentinel or newer generate no error.
|
||||
func resourceVersionNotOlderThan(sentinel string) func(string) error {
|
||||
return func(resourceVersion string) error {
|
||||
objectVersioner := storage.APIObjectVersioner{}
|
||||
actualRV, err := objectVersioner.ParseResourceVersion(resourceVersion)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
expectedRV, err := objectVersioner.ParseResourceVersion(sentinel)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if actualRV < expectedRV {
|
||||
return fmt.Errorf("expected a resourceVersion no smaller than than %d, but got %d", expectedRV, actualRV)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// StorageInjectingListErrors injects a dummy error for first N GetList calls.
|
||||
type StorageInjectingListErrors struct {
|
||||
storage.Interface
|
||||
|
||||
lock sync.Mutex
|
||||
Errors int
|
||||
}
|
||||
|
||||
func (s *StorageInjectingListErrors) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
||||
err := func() error {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
if s.Errors > 0 {
|
||||
s.Errors--
|
||||
return fmt.Errorf("injected error")
|
||||
}
|
||||
return nil
|
||||
}()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return s.Interface.GetList(ctx, key, opts, listObj)
|
||||
}
|
||||
|
||||
func (s *StorageInjectingListErrors) ErrorsConsumed() (bool, error) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
return s.Errors == 0, nil
|
||||
}
|
||||
|
||||
type Compaction func(ctx context.Context, t *testing.T, resourceVersion string)
|
||||
|
||||
type PrefixTransformerModifier func(*PrefixTransformer) value.Transformer
|
||||
|
||||
type InterfaceWithPrefixTransformer interface {
|
||||
storage.Interface
|
||||
|
||||
UpdatePrefixTransformer(PrefixTransformerModifier) func()
|
||||
}
|
||||
|
||||
// PrefixTransformer adds and verifies that all data has the correct prefix on its way in and out.
|
||||
type PrefixTransformer struct {
|
||||
prefix []byte
|
||||
stale bool
|
||||
err error
|
||||
reads uint64
|
||||
}
|
||||
|
||||
func NewPrefixTransformer(prefix []byte, stale bool) *PrefixTransformer {
|
||||
return &PrefixTransformer{
|
||||
prefix: prefix,
|
||||
stale: stale,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PrefixTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, bool, error) {
|
||||
atomic.AddUint64(&p.reads, 1)
|
||||
if dataCtx == nil {
|
||||
panic("no context provided")
|
||||
}
|
||||
if !bytes.HasPrefix(data, p.prefix) {
|
||||
return nil, false, fmt.Errorf("value does not have expected prefix %q: %s,", p.prefix, string(data))
|
||||
}
|
||||
return bytes.TrimPrefix(data, p.prefix), p.stale, p.err
|
||||
}
|
||||
func (p *PrefixTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, error) {
|
||||
if dataCtx == nil {
|
||||
panic("no context provided")
|
||||
}
|
||||
if len(data) > 0 {
|
||||
return append(append([]byte{}, p.prefix...), data...), p.err
|
||||
}
|
||||
return data, p.err
|
||||
}
|
||||
|
||||
func (p *PrefixTransformer) GetReadsAndReset() uint64 {
|
||||
return atomic.SwapUint64(&p.reads, 0)
|
||||
}
|
||||
|
||||
// reproducingTransformer is a custom test-only transformer used purely
|
||||
// for testing consistency.
|
||||
// It allows for creating predefined objects on TransformFromStorage operations,
|
||||
// which allows for precise in time injection of new objects in the middle of
|
||||
// read operations.
|
||||
type reproducingTransformer struct {
|
||||
wrapped value.Transformer
|
||||
store storage.Interface
|
||||
|
||||
index uint32
|
||||
nextObject func(uint32) (string, *example.Pod)
|
||||
}
|
||||
|
||||
func (rt *reproducingTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, bool, error) {
|
||||
if err := rt.createObject(ctx); err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
return rt.wrapped.TransformFromStorage(ctx, data, dataCtx)
|
||||
}
|
||||
|
||||
func (rt *reproducingTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, error) {
|
||||
return rt.wrapped.TransformToStorage(ctx, data, dataCtx)
|
||||
}
|
||||
|
||||
func (rt *reproducingTransformer) createObject(ctx context.Context) error {
|
||||
key, obj := rt.nextObject(atomic.AddUint32(&rt.index, 1))
|
||||
out := &example.Pod{}
|
||||
return rt.store.Create(ctx, key, obj, out, 0)
|
||||
}
|
||||
|
||||
// failingTransformer is a custom test-only transformer that always returns
|
||||
// an error on transforming data from storage.
|
||||
type failingTransformer struct {
|
||||
}
|
||||
|
||||
func (ft *failingTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, bool, error) {
|
||||
return nil, false, fmt.Errorf("failed transformation")
|
||||
}
|
||||
|
||||
func (ft *failingTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, error) {
|
||||
return data, nil
|
||||
}
|
||||
|
||||
type sortablePodList []example.Pod
|
||||
|
||||
func (s sortablePodList) Len() int {
|
||||
return len(s)
|
||||
}
|
||||
|
||||
func (s sortablePodList) Less(i, j int) bool {
|
||||
return computePodKey(&s[i]) < computePodKey(&s[j])
|
||||
}
|
||||
|
||||
func (s sortablePodList) Swap(i, j int) {
|
||||
s[i], s[j] = s[j], s[i]
|
||||
}
|
1620
pkg/apiserver/storage/testing/watcher_tests.go
Normal file
1620
pkg/apiserver/storage/testing/watcher_tests.go
Normal file
File diff suppressed because it is too large
Load Diff
@ -81,8 +81,8 @@ func (o *AggregatorServerOptions) ApplyTo(aggregatorConfig *aggregatorapiserver.
|
||||
}
|
||||
// override the RESTOptionsGetter to use the file storage options getter
|
||||
restOptionsGetter, err := filestorage.NewRESTOptionsGetter(dataPath, etcdOptions.StorageConfig,
|
||||
"apiregistration.k8s.io/apiservices",
|
||||
"service.grafana.app/externalnames",
|
||||
"/group/apiregistration.k8s.io/resource/apiservices",
|
||||
"/group/service.grafana.app/resource/externalnames",
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -9,7 +9,6 @@ import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"k8s.io/apimachinery/pkg/api/apitesting"
|
||||
@ -21,13 +20,13 @@ import (
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apiserver/pkg/apis/example"
|
||||
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
||||
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
|
||||
storagetesting "k8s.io/apiserver/pkg/storage/testing"
|
||||
|
||||
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
|
||||
"github.com/grafana/grafana/pkg/apimachinery/identity"
|
||||
storagetesting "github.com/grafana/grafana/pkg/apiserver/storage/testing"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/storage/entity"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||
@ -109,7 +108,7 @@ func withDefaults(options *setupOptions, t *testing.T) {
|
||||
options.newFunc = newPod
|
||||
options.newListFunc = newPodList
|
||||
options.prefix = t.TempDir()
|
||||
options.resourcePrefix = "/pods"
|
||||
options.resourcePrefix = "/resource/pods"
|
||||
options.groupResource = schema.GroupResource{Resource: "pods"}
|
||||
}
|
||||
|
||||
@ -136,8 +135,7 @@ func testSetup(t *testing.T, opts ...setupOption) (context.Context, storage.Inte
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
keyFn := grafanaregistry.NamespaceKeyFunc(setupOpts.groupResource)
|
||||
return keyFn(genericapirequest.WithNamespace(genericapirequest.NewContext(), accessor.GetNamespace()), accessor.GetName())
|
||||
return storagetesting.KeyFunc(accessor.GetNamespace(), accessor.GetName()), nil
|
||||
},
|
||||
setupOpts.newFunc,
|
||||
setupOpts.newListFunc,
|
||||
@ -147,7 +145,15 @@ func testSetup(t *testing.T, opts ...setupOption) (context.Context, storage.Inte
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
// Test with an admin identity
|
||||
ctx := identity.WithRequester(context.Background(), &identity.StaticRequester{
|
||||
Namespace: identity.NamespaceUser,
|
||||
Login: "testuser",
|
||||
UserID: 123,
|
||||
UserUID: "u123",
|
||||
OrgRole: identity.RoleAdmin,
|
||||
IsGrafanaAdmin: true, // can do anything
|
||||
})
|
||||
|
||||
return ctx, store, destroyFunc, nil
|
||||
}
|
||||
@ -327,7 +333,6 @@ func TestIntegrationSendInitialEventsBackwardCompatibility(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping integration test")
|
||||
}
|
||||
t.Skip("In maintenance")
|
||||
|
||||
ctx, store, destroyFunc, err := testSetup(t)
|
||||
defer destroyFunc()
|
||||
|
Loading…
Reference in New Issue
Block a user