mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Unistore: Block on legacy writes in mode3 (#98147)
This commit is contained in:
parent
02aded2743
commit
124440ccee
@ -6,6 +6,7 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@ -63,51 +64,47 @@ func (d *DualWriterMode3) Create(ctx context.Context, in runtime.Object, createV
|
||||
return nil, fmt.Errorf("name or generatename have to be set")
|
||||
}
|
||||
|
||||
// create in legacy first, and then unistore. if unistore fails, but legacy succeeds,
|
||||
// will try to cleanup the object in legacy.
|
||||
|
||||
startLegacy := time.Now()
|
||||
createdFromLegacy, err := d.Legacy.Create(ctx, in, createValidation, options)
|
||||
if err != nil {
|
||||
log.Error(err, "unable to create object in legacy storage")
|
||||
d.recordLegacyDuration(true, mode2Str, d.resource, method, startLegacy)
|
||||
return createdFromLegacy, err
|
||||
}
|
||||
d.recordLegacyDuration(false, mode2Str, d.resource, method, startLegacy)
|
||||
|
||||
createdCopy := createdFromLegacy.DeepCopyObject()
|
||||
accCreated, err := meta.Accessor(createdCopy)
|
||||
if err != nil {
|
||||
return createdFromLegacy, err
|
||||
}
|
||||
accCreated.SetResourceVersion("")
|
||||
|
||||
startStorage := time.Now()
|
||||
storageObj, errObjectSt := d.Storage.Create(ctx, in, createValidation, options)
|
||||
storageObj, errObjectSt := d.Storage.Create(ctx, createdCopy, createValidation, options)
|
||||
d.recordStorageDuration(errObjectSt != nil, mode3Str, d.resource, method, startStorage)
|
||||
if errObjectSt != nil {
|
||||
log.Error(err, "unable to create object in storage")
|
||||
|
||||
// if we cannot create in unistore, attempt to clean up legacy
|
||||
_, _, err = d.Legacy.Delete(ctx, accCreated.GetName(), nil, &metav1.DeleteOptions{})
|
||||
if err != nil {
|
||||
log.Error(err, "unable to cleanup object in legacy storage")
|
||||
}
|
||||
|
||||
return storageObj, errObjectSt
|
||||
}
|
||||
|
||||
createdCopy := storageObj.DeepCopyObject()
|
||||
|
||||
//nolint:errcheck
|
||||
go d.createOnLegacyStorage(ctx, in, createdCopy, createValidation, options)
|
||||
|
||||
return storageObj, errObjectSt
|
||||
}
|
||||
|
||||
func (d *DualWriterMode3) createOnLegacyStorage(ctx context.Context, in, storageObj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) error {
|
||||
var method = "create"
|
||||
log := d.Log.WithValues("method", method)
|
||||
ctx, cancel := context.WithTimeoutCause(context.WithoutCancel(ctx), time.Second*10, errors.New("legacy create timeout"))
|
||||
defer cancel()
|
||||
|
||||
accessor, err := meta.Accessor(storageObj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// clear the UID and ResourceVersion from the object before sending it to the legacy storage
|
||||
accessor.SetUID("")
|
||||
accessor.SetResourceVersion("")
|
||||
|
||||
startLegacy := time.Now()
|
||||
legacyObj, err := d.Legacy.Create(ctx, storageObj, createValidation, options)
|
||||
d.recordLegacyDuration(err != nil, mode3Str, d.resource, method, startLegacy)
|
||||
if err != nil {
|
||||
log.Error(err, "unable to create object in legacy storage")
|
||||
cancel()
|
||||
}
|
||||
|
||||
areEqual := Compare(legacyObj, storageObj)
|
||||
areEqual := Compare(createdFromLegacy, storageObj)
|
||||
d.recordOutcome(mode3Str, getName(storageObj), areEqual, method)
|
||||
if !areEqual {
|
||||
log.Info("object from legacy and storage are not equal")
|
||||
}
|
||||
return err
|
||||
|
||||
return storageObj, errObjectSt
|
||||
}
|
||||
|
||||
// Get overrides the behavior of the generic DualWriter and retrieves an object from Storage.
|
||||
@ -201,41 +198,42 @@ func (d *DualWriterMode3) Delete(ctx context.Context, name string, deleteValidat
|
||||
log := d.Log.WithValues("name", name, "method", method)
|
||||
ctx = klog.NewContext(ctx, d.Log)
|
||||
|
||||
startStorage := time.Now()
|
||||
objFromStorage, async, err := d.Storage.Delete(ctx, name, deleteValidation, options)
|
||||
d.recordStorageDuration(err != nil, mode3Str, name, method, startStorage)
|
||||
if err != nil {
|
||||
log.Error(err, "unable to delete object in storage")
|
||||
return objFromStorage, async, err
|
||||
}
|
||||
// delete from legacy first, and then unistore. Will return a failure if either fails,
|
||||
// unless its a 404.
|
||||
//
|
||||
// we want to delete from legacy first, otherwise if the delete from unistore was successful,
|
||||
// but legacy failed, the user would get a failure, but not be able to retry the delete
|
||||
// as they would not be able to see the object in unistore anymore.
|
||||
|
||||
//nolint:errcheck
|
||||
go d.deleteFromLegacyStorage(ctx, objFromStorage, name, deleteValidation, options)
|
||||
|
||||
return objFromStorage, async, err
|
||||
}
|
||||
|
||||
func (d *DualWriterMode3) deleteFromLegacyStorage(ctx context.Context, objFromStorage runtime.Object, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) error {
|
||||
var method = "delete"
|
||||
log := d.Log.WithValues("name", name, "method", method, "name", name)
|
||||
startLegacy := time.Now()
|
||||
|
||||
ctx, cancel := context.WithTimeoutCause(context.WithoutCancel(ctx), time.Second*10, errors.New("legacy delete timeout"))
|
||||
defer cancel()
|
||||
|
||||
objFromLegacy, _, err := d.Legacy.Delete(ctx, name, deleteValidation, options)
|
||||
d.recordLegacyDuration(err != nil, mode3Str, d.resource, method, startLegacy)
|
||||
objFromLegacy, asyncLegacy, err := d.Legacy.Delete(ctx, name, deleteValidation, options)
|
||||
if err != nil {
|
||||
log.Error(err, "unable to delete object in legacy storage")
|
||||
cancel()
|
||||
if !apierrors.IsNotFound(err) {
|
||||
log.WithValues("object", objFromLegacy).Error(err, "could not delete from legacy store")
|
||||
d.recordLegacyDuration(true, mode3Str, d.resource, method, startLegacy)
|
||||
return objFromLegacy, asyncLegacy, err
|
||||
}
|
||||
}
|
||||
d.recordLegacyDuration(false, mode3Str, d.resource, method, startLegacy)
|
||||
|
||||
startStorage := time.Now()
|
||||
objFromStorage, asyncStorage, err := d.Storage.Delete(ctx, name, deleteValidation, options)
|
||||
if err != nil {
|
||||
if !apierrors.IsNotFound(err) {
|
||||
log.WithValues("object", objFromStorage).Error(err, "could not delete from storage")
|
||||
d.recordStorageDuration(true, mode3Str, d.resource, method, startStorage)
|
||||
}
|
||||
return objFromStorage, asyncStorage, err
|
||||
}
|
||||
d.recordStorageDuration(false, mode3Str, d.resource, method, startStorage)
|
||||
|
||||
areEqual := Compare(objFromStorage, objFromLegacy)
|
||||
d.recordOutcome(mode3Str, name, areEqual, method)
|
||||
if !areEqual {
|
||||
log.Info("object from legacy and storage are not equal")
|
||||
log.WithValues("name", name).Info("object from legacy and storage are not equal")
|
||||
}
|
||||
|
||||
return err
|
||||
return objFromStorage, asyncStorage, err
|
||||
}
|
||||
|
||||
// Update overrides the behavior of the generic DualWriter and writes first to Storage and then to LegacyStorage.
|
||||
@ -243,45 +241,40 @@ func (d *DualWriterMode3) Update(ctx context.Context, name string, objInfo rest.
|
||||
var method = "update"
|
||||
log := d.Log.WithValues("name", name, "method", method)
|
||||
ctx = klog.NewContext(ctx, log)
|
||||
|
||||
startStorage := time.Now()
|
||||
objFromStorage, async, err := d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
|
||||
d.recordStorageDuration(err != nil, mode3Str, d.resource, method, startStorage)
|
||||
if err != nil {
|
||||
log.Error(err, "unable to update in storage")
|
||||
return objFromStorage, async, err
|
||||
}
|
||||
|
||||
//nolint:errcheck
|
||||
go d.updateOnLegacyStorageMode3(ctx, objFromStorage, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
|
||||
|
||||
return objFromStorage, async, err
|
||||
}
|
||||
|
||||
func (d *DualWriterMode3) updateOnLegacyStorageMode3(ctx context.Context, storageObj runtime.Object, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) error {
|
||||
// The incoming RV is from unified storage, so legacy can ignore it
|
||||
// The incoming RV is not stable -- it may be from legacy or storage!
|
||||
// This sets a flag in the context and our apistore is more lenient when it exists
|
||||
ctx = context.WithValue(ctx, dualWriteContextKey{}, true)
|
||||
|
||||
var method = "update"
|
||||
log := d.Log.WithValues("name", name, "method", method, "name", name)
|
||||
// update in legacy first, and then unistore. Will return a failure if either fails.
|
||||
//
|
||||
// we want to update in legacy first, otherwise if the update from unistore was successful,
|
||||
// but legacy failed, the user would get a failure, but see the update did apply to the source
|
||||
// of truth, and be less likely to retry to save (and get the stores in sync again)
|
||||
|
||||
ctx, cancel := context.WithTimeoutCause(context.WithoutCancel(ctx), time.Second*10, errors.New("legacy update timeout"))
|
||||
startLegacy := time.Now()
|
||||
defer cancel()
|
||||
|
||||
objLegacy, _, err := d.Legacy.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
|
||||
d.recordLegacyDuration(err != nil, mode3Str, d.resource, method, startLegacy)
|
||||
objFromLegacy, createdLegacy, err := d.Legacy.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
|
||||
if err != nil {
|
||||
log.Error(err, "unable to update object in legacy storage")
|
||||
cancel()
|
||||
log.WithValues("object", objFromLegacy).Error(err, "could not update in legacy storage")
|
||||
d.recordLegacyDuration(true, mode2Str, d.resource, "update", startLegacy)
|
||||
return objFromLegacy, createdLegacy, err
|
||||
}
|
||||
d.recordLegacyDuration(false, mode2Str, d.resource, "update", startLegacy)
|
||||
|
||||
startStorage := time.Now()
|
||||
objFromStorage, created, err := d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
|
||||
if err != nil {
|
||||
log.WithValues("object", objFromStorage).Error(err, "could not update in storage")
|
||||
d.recordStorageDuration(true, mode2Str, d.resource, "update", startStorage)
|
||||
return objFromStorage, created, err
|
||||
}
|
||||
|
||||
areEqual := Compare(storageObj, objLegacy)
|
||||
areEqual := Compare(objFromStorage, objFromLegacy)
|
||||
d.recordOutcome(mode3Str, name, areEqual, method)
|
||||
if !areEqual {
|
||||
log.WithValues("name", name).Info("object from legacy and storage are not equal")
|
||||
}
|
||||
return err
|
||||
|
||||
return objFromStorage, created, err
|
||||
}
|
||||
|
||||
// DeleteCollection overrides the behavior of the generic DualWriter and deletes from both LegacyStorage and Storage.
|
||||
@ -290,42 +283,49 @@ func (d *DualWriterMode3) DeleteCollection(ctx context.Context, deleteValidation
|
||||
log := d.Log.WithValues("resourceVersion", listOptions.ResourceVersion, "method", method)
|
||||
ctx = klog.NewContext(ctx, log)
|
||||
|
||||
startStorage := time.Now()
|
||||
storageObj, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions)
|
||||
d.recordStorageDuration(err != nil, mode3Str, d.resource, method, startStorage)
|
||||
if err != nil {
|
||||
log.Error(err, "unable to delete collection in storage")
|
||||
return storageObj, err
|
||||
}
|
||||
// delete from legacy first, and anything that is successful can be deleted in unistore too.
|
||||
//
|
||||
// we want to delete from legacy first, otherwise if the delete from unistore was successful,
|
||||
// but legacy failed, the user would get a failure, but not be able to retry the delete
|
||||
// as they would not be able to see the object in unistore anymore.
|
||||
|
||||
//nolint:errcheck
|
||||
go d.deleteCollectionFromLegacyStorage(ctx, storageObj, deleteValidation, options, listOptions)
|
||||
|
||||
return storageObj, err
|
||||
}
|
||||
|
||||
func (d *DualWriterMode3) deleteCollectionFromLegacyStorage(ctx context.Context, storageObj runtime.Object, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) error {
|
||||
var method = "delete-collection"
|
||||
log := d.Log.WithValues("resourceVersion", listOptions.ResourceVersion, "method", method)
|
||||
startLegacy := time.Now()
|
||||
|
||||
ctx, cancel := context.WithTimeoutCause(context.WithoutCancel(ctx), time.Second*10, errors.New("legacy deletecollection timeout"))
|
||||
defer cancel()
|
||||
|
||||
legacyObj, err := d.Legacy.DeleteCollection(ctx, deleteValidation, options, listOptions)
|
||||
d.recordLegacyDuration(err != nil, mode3Str, d.resource, method, startLegacy)
|
||||
deletedLegacy, err := d.Legacy.DeleteCollection(ctx, deleteValidation, options, listOptions)
|
||||
if err != nil {
|
||||
log.Error(err, "unable to delete collection in legacy storage")
|
||||
cancel()
|
||||
log.WithValues("deleted", deletedLegacy).Error(err, "failed to delete collection successfully from legacy storage")
|
||||
d.recordLegacyDuration(true, mode3Str, d.resource, method, startLegacy)
|
||||
return deletedLegacy, err
|
||||
}
|
||||
d.recordLegacyDuration(false, mode3Str, d.resource, method, startLegacy)
|
||||
|
||||
legacyList, err := meta.ExtractList(deletedLegacy)
|
||||
if err != nil {
|
||||
log.Error(err, "unable to extract list from legacy storage")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
areEqual := Compare(storageObj, legacyObj)
|
||||
d.recordOutcome(mode3Str, getName(legacyObj), areEqual, method)
|
||||
// Only the items deleted by the legacy DeleteCollection call are selected for deletion by Storage.
|
||||
_, err = parseList(legacyList)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
startStorage := time.Now()
|
||||
deletedStorage, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions)
|
||||
if err != nil {
|
||||
log.WithValues("deleted", deletedStorage).Error(err, "failed to delete collection successfully from Storage")
|
||||
d.recordStorageDuration(true, mode3Str, d.resource, method, startStorage)
|
||||
return deletedStorage, err
|
||||
}
|
||||
d.recordStorageDuration(false, mode3Str, d.resource, method, startStorage)
|
||||
|
||||
areEqual := Compare(deletedStorage, deletedLegacy)
|
||||
d.recordOutcome(mode3Str, getName(deletedLegacy), areEqual, method)
|
||||
if !areEqual {
|
||||
log.Info("object from legacy and storage are not equal")
|
||||
}
|
||||
|
||||
return err
|
||||
return deletedStorage, err
|
||||
}
|
||||
|
||||
func (d *DualWriterMode3) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {
|
||||
|
@ -8,14 +8,17 @@ import (
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
)
|
||||
|
||||
func TestMode3_Create(t *testing.T) {
|
||||
type testCase struct {
|
||||
input runtime.Object
|
||||
setupLegacyFn func(m *mock.Mock, input runtime.Object)
|
||||
setupStorageFn func(m *mock.Mock, input runtime.Object)
|
||||
name string
|
||||
wantErr bool
|
||||
@ -23,17 +26,32 @@ func TestMode3_Create(t *testing.T) {
|
||||
tests :=
|
||||
[]testCase{
|
||||
{
|
||||
name: "creating an object only in the unified store",
|
||||
name: "creating an object in both the LegacyStorage and Storage",
|
||||
input: exampleObj,
|
||||
setupLegacyFn: func(m *mock.Mock, input runtime.Object) {
|
||||
m.On("Create", mock.Anything, exampleObjNoRV, mock.Anything, mock.Anything).Return(exampleObj, nil).Once()
|
||||
},
|
||||
setupStorageFn: func(m *mock.Mock, input runtime.Object) {
|
||||
m.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, nil)
|
||||
m.On("Create", mock.Anything, exampleObj, mock.Anything, mock.Anything).Return(exampleObj, nil).Once()
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "error when creating object in the unified store fails",
|
||||
name: "error when creating object in the legacy store fails",
|
||||
input: failingObj,
|
||||
setupLegacyFn: func(m *mock.Mock, input runtime.Object) {
|
||||
m.On("Create", mock.Anything, input, mock.Anything, mock.Anything).Return(nil, errors.New("error")).Once()
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "error when creating object in the unistore fails - legacy delete should be called",
|
||||
input: exampleObj,
|
||||
setupLegacyFn: func(m *mock.Mock, input runtime.Object) {
|
||||
m.On("Delete", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, true, nil).Once()
|
||||
m.On("Create", mock.Anything, exampleObjNoRV, mock.Anything, mock.Anything).Return(exampleObj, nil).Once()
|
||||
},
|
||||
setupStorageFn: func(m *mock.Mock, input runtime.Object) {
|
||||
m.On("Create", mock.Anything, failingObj, mock.Anything, mock.Anything).Return(nil, errors.New("error"))
|
||||
m.On("Create", mock.Anything, exampleObj, mock.Anything, mock.Anything).Return(exampleObj, errors.New("error")).Once()
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
@ -48,75 +66,23 @@ func TestMode3_Create(t *testing.T) {
|
||||
ls := legacyStoreMock{m, l}
|
||||
us := storageMock{m, s}
|
||||
|
||||
if tt.setupLegacyFn != nil {
|
||||
tt.setupLegacyFn(m, tt.input)
|
||||
}
|
||||
if tt.setupStorageFn != nil {
|
||||
tt.setupStorageFn(m, tt.input)
|
||||
}
|
||||
|
||||
dw := NewDualWriter(Mode3, ls, us, p, kind)
|
||||
|
||||
obj, err := dw.Create(context.Background(), tt.input, func(context.Context, runtime.Object) error { return nil }, &metav1.CreateOptions{})
|
||||
obj, err := dw.Create(context.Background(), tt.input, createFn, &metav1.CreateOptions{})
|
||||
|
||||
if tt.wantErr {
|
||||
assert.Error(t, err)
|
||||
return
|
||||
}
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, obj, anotherObj)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestMode1_CreateOnLegacyStorage(t *testing.T) {
|
||||
ctxCanceled, cancel := context.WithCancel(context.TODO())
|
||||
cancel()
|
||||
|
||||
type testCase struct {
|
||||
name string
|
||||
input runtime.Object
|
||||
ctx *context.Context
|
||||
setupLegacyFn func(m *mock.Mock)
|
||||
}
|
||||
tests :=
|
||||
[]testCase{
|
||||
{
|
||||
name: "Create on legacy storage",
|
||||
input: exampleObj,
|
||||
setupLegacyFn: func(m *mock.Mock) {
|
||||
m.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObjNoRV, nil)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Create on legacy storage works even if parent context is canceled",
|
||||
input: exampleObj,
|
||||
ctx: &ctxCanceled,
|
||||
setupLegacyFn: func(m *mock.Mock) {
|
||||
m.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObjNoRV, nil)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
l := (LegacyStorage)(nil)
|
||||
s := (Storage)(nil)
|
||||
m := &mock.Mock{}
|
||||
|
||||
ls := legacyStoreMock{m, l}
|
||||
us := storageMock{m, s}
|
||||
|
||||
if tt.setupLegacyFn != nil {
|
||||
tt.setupLegacyFn(m)
|
||||
}
|
||||
|
||||
ctx := context.TODO()
|
||||
if tt.ctx != nil {
|
||||
ctx = *tt.ctx
|
||||
}
|
||||
|
||||
dw := NewDualWriter(Mode3, ls, us, p, kind)
|
||||
err := dw.(*DualWriterMode3).createOnLegacyStorage(ctx, tt.input, exampleObj, func(context.Context, runtime.Object) error { return nil }, &metav1.CreateOptions{})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, exampleObj, obj)
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -338,7 +304,8 @@ func TestMode1_ListFromLegacyStorage(t *testing.T) {
|
||||
|
||||
func TestMode3_Delete(t *testing.T) {
|
||||
type testCase struct {
|
||||
setupStorageFn func(m *mock.Mock, name string)
|
||||
setupLegacyFn func(m *mock.Mock, input string)
|
||||
setupStorageFn func(m *mock.Mock, input string)
|
||||
name string
|
||||
input string
|
||||
wantErr bool
|
||||
@ -346,74 +313,56 @@ func TestMode3_Delete(t *testing.T) {
|
||||
tests :=
|
||||
[]testCase{
|
||||
{
|
||||
name: "deleting an object in the unified store",
|
||||
name: "delete in legacy and storage",
|
||||
input: "foo",
|
||||
setupStorageFn: func(m *mock.Mock, name string) {
|
||||
m.On("Delete", mock.Anything, name, mock.Anything, mock.Anything).Return(exampleObj, false, nil)
|
||||
setupLegacyFn: func(m *mock.Mock, input string) {
|
||||
m.On("Delete", mock.Anything, input, mock.Anything, mock.Anything).Return(exampleObj, false, nil)
|
||||
},
|
||||
setupStorageFn: func(m *mock.Mock, input string) {
|
||||
m.On("Delete", mock.Anything, input, mock.Anything, mock.Anything).Return(exampleObj, false, nil)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "error when deleting an object in the unified store",
|
||||
name: "object delete in legacy not found, but found in storage",
|
||||
input: "foo",
|
||||
setupLegacyFn: func(m *mock.Mock, input string) {
|
||||
m.On("Delete", mock.Anything, "not-found-legacy", mock.Anything, mock.Anything).Return(nil, false, apierrors.NewNotFound(schema.GroupResource{Group: "", Resource: "pods"}, "not-found"))
|
||||
},
|
||||
setupStorageFn: func(m *mock.Mock, input string) {
|
||||
m.On("Delete", mock.Anything, input, mock.Anything, mock.Anything).Return(exampleObj, false, nil)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: " object delete in storage not found, but found in legacy",
|
||||
input: "foo",
|
||||
setupLegacyFn: func(m *mock.Mock, input string) {
|
||||
m.On("Delete", mock.Anything, input, mock.Anything, mock.Anything).Return(exampleObj, false, nil)
|
||||
},
|
||||
setupStorageFn: func(m *mock.Mock, input string) {
|
||||
m.On("Delete", mock.Anything, "not-found-storage", mock.Anything, mock.Anything).Return(nil, false, apierrors.NewNotFound(schema.GroupResource{Group: "", Resource: "pods"}, "not-found"))
|
||||
},
|
||||
},
|
||||
{
|
||||
name: " object not found in both",
|
||||
input: "object-fail",
|
||||
setupStorageFn: func(m *mock.Mock, name string) {
|
||||
m.On("Delete", mock.Anything, name, mock.Anything, mock.Anything).Return(nil, false, errors.New("error"))
|
||||
setupLegacyFn: func(m *mock.Mock, input string) {
|
||||
m.On("Delete", mock.Anything, input, mock.Anything, mock.Anything).Return(nil, false, apierrors.NewNotFound(schema.GroupResource{Group: "", Resource: "pods"}, "not-found"))
|
||||
},
|
||||
setupStorageFn: func(m *mock.Mock, input string) {
|
||||
m.On("Delete", mock.Anything, input, mock.Anything, mock.Anything).Return(nil, false, apierrors.NewNotFound(schema.GroupResource{Group: "", Resource: "pods"}, "not-found"))
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
l := (LegacyStorage)(nil)
|
||||
s := (Storage)(nil)
|
||||
m := &mock.Mock{}
|
||||
|
||||
ls := legacyStoreMock{m, l}
|
||||
us := storageMock{m, s}
|
||||
|
||||
if tt.setupStorageFn != nil {
|
||||
tt.setupStorageFn(m, tt.input)
|
||||
}
|
||||
|
||||
dw := NewDualWriter(Mode3, ls, us, p, kind)
|
||||
|
||||
obj, _, err := dw.Delete(context.Background(), tt.input, func(ctx context.Context, obj runtime.Object) error { return nil }, &metav1.DeleteOptions{})
|
||||
|
||||
if tt.wantErr {
|
||||
assert.Error(t, err)
|
||||
return
|
||||
}
|
||||
|
||||
assert.Equal(t, obj, exampleObj)
|
||||
assert.NotEqual(t, obj, anotherObj)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestMode1_DeleteFromLegacyStorage(t *testing.T) {
|
||||
ctxCanceled, cancel := context.WithCancel(context.TODO())
|
||||
cancel()
|
||||
|
||||
type testCase struct {
|
||||
ctx *context.Context
|
||||
setupLegacyFn func(m *mock.Mock, name string)
|
||||
name string
|
||||
input string
|
||||
}
|
||||
tests :=
|
||||
[]testCase{
|
||||
{
|
||||
name: "Delete from legacy storage",
|
||||
name: " object delete error",
|
||||
input: "object-fail",
|
||||
setupLegacyFn: func(m *mock.Mock, input string) {
|
||||
m.On("Delete", mock.Anything, input, mock.Anything, mock.Anything).Return(exampleObj, false, nil)
|
||||
m.On("Delete", mock.Anything, input, mock.Anything, mock.Anything).Return(nil, false, errors.New("error"))
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Delete from unified legacy works even if parent context is canceled",
|
||||
ctx: &ctxCanceled,
|
||||
setupLegacyFn: func(m *mock.Mock, input string) {
|
||||
m.On("Delete", mock.Anything, input, mock.Anything, mock.Anything).Return(exampleObj, false, nil)
|
||||
setupStorageFn: func(m *mock.Mock, input string) {
|
||||
m.On("Delete", mock.Anything, input, mock.Anything, mock.Anything).Return(nil, false, errors.New("error"))
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
@ -429,62 +378,13 @@ func TestMode1_DeleteFromLegacyStorage(t *testing.T) {
|
||||
if tt.setupLegacyFn != nil {
|
||||
tt.setupLegacyFn(m, tt.input)
|
||||
}
|
||||
|
||||
ctx := context.TODO()
|
||||
if tt.ctx != nil {
|
||||
ctx = *tt.ctx
|
||||
}
|
||||
|
||||
dw := NewDualWriter(Mode3, ls, us, p, kind)
|
||||
|
||||
err := dw.(*DualWriterMode3).deleteFromLegacyStorage(ctx, exampleObj, tt.input, func(ctx context.Context, obj runtime.Object) error { return nil }, &metav1.DeleteOptions{})
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestMode3_DeleteCollection(t *testing.T) {
|
||||
type testCase struct {
|
||||
input *metav1.DeleteOptions
|
||||
setupStorageFn func(m *mock.Mock, input *metav1.DeleteOptions)
|
||||
name string
|
||||
wantErr bool
|
||||
}
|
||||
tests :=
|
||||
[]testCase{
|
||||
{
|
||||
name: "deleting a collection in the unified store",
|
||||
input: &metav1.DeleteOptions{TypeMeta: metav1.TypeMeta{Kind: "foo"}},
|
||||
setupStorageFn: func(m *mock.Mock, input *metav1.DeleteOptions) {
|
||||
m.On("DeleteCollection", mock.Anything, mock.Anything, input, mock.Anything).Return(exampleObj, nil)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "error deleting a collection in the unified store",
|
||||
input: &metav1.DeleteOptions{TypeMeta: metav1.TypeMeta{Kind: "fail"}},
|
||||
setupStorageFn: func(m *mock.Mock, input *metav1.DeleteOptions) {
|
||||
m.On("DeleteCollection", mock.Anything, mock.Anything, input, mock.Anything).Return(nil, errors.New("error"))
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
l := (LegacyStorage)(nil)
|
||||
s := (Storage)(nil)
|
||||
m := &mock.Mock{}
|
||||
|
||||
ls := legacyStoreMock{m, l}
|
||||
us := storageMock{m, s}
|
||||
|
||||
if tt.setupStorageFn != nil {
|
||||
tt.setupStorageFn(m, tt.input)
|
||||
}
|
||||
|
||||
dw := NewDualWriter(Mode3, ls, us, p, kind)
|
||||
|
||||
obj, err := dw.DeleteCollection(context.Background(), func(ctx context.Context, obj runtime.Object) error { return nil }, tt.input, &metainternalversion.ListOptions{})
|
||||
obj, _, err := dw.Delete(context.Background(), tt.input, func(context.Context, runtime.Object) error { return nil }, &metav1.DeleteOptions{})
|
||||
|
||||
if tt.wantErr {
|
||||
assert.Error(t, err)
|
||||
@ -497,32 +397,44 @@ func TestMode3_DeleteCollection(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMode1_DeleteCollectionFromLegacyStorage(t *testing.T) {
|
||||
ctxCanceled, cancel := context.WithCancel(context.TODO())
|
||||
cancel()
|
||||
|
||||
func TestMode3_DeleteCollection(t *testing.T) {
|
||||
type testCase struct {
|
||||
ctx *context.Context
|
||||
setupLegacyFn func(m *mock.Mock)
|
||||
name string
|
||||
input *metav1.DeleteOptions
|
||||
setupLegacyFn func(m *mock.Mock)
|
||||
setupStorageFn func(m *mock.Mock)
|
||||
name string
|
||||
input string
|
||||
wantErr bool
|
||||
}
|
||||
tests :=
|
||||
[]testCase{
|
||||
{
|
||||
name: "Delete Collection from legacy storage",
|
||||
input: &metav1.DeleteOptions{TypeMeta: metav1.TypeMeta{Kind: "foo"}},
|
||||
name: "deleting a collection in both stores",
|
||||
input: "foo",
|
||||
setupLegacyFn: func(m *mock.Mock) {
|
||||
m.On("DeleteCollection", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, nil)
|
||||
m.On("DeleteCollection", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleList, nil)
|
||||
},
|
||||
setupStorageFn: func(m *mock.Mock) {
|
||||
m.On("DeleteCollection", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleList, nil)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Delete Collection from legacy storage works even if parent context is canceled",
|
||||
input: &metav1.DeleteOptions{TypeMeta: metav1.TypeMeta{Kind: "foo"}},
|
||||
ctx: &ctxCanceled,
|
||||
name: "error deleting a collection in the storage when legacy store is successful",
|
||||
input: "foo",
|
||||
setupLegacyFn: func(m *mock.Mock) {
|
||||
m.On("DeleteCollection", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, nil)
|
||||
},
|
||||
setupStorageFn: func(m *mock.Mock) {
|
||||
m.On("DeleteCollection", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("error"))
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "error deleting a collection legacy store",
|
||||
input: "fail",
|
||||
setupLegacyFn: func(m *mock.Mock) {
|
||||
m.On("DeleteCollection", mock.Anything, mock.Anything, &metav1.DeleteOptions{TypeMeta: metav1.TypeMeta{Kind: "fail"}}, mock.Anything).Return(nil, errors.New("error"))
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
@ -538,22 +450,26 @@ func TestMode1_DeleteCollectionFromLegacyStorage(t *testing.T) {
|
||||
if tt.setupLegacyFn != nil {
|
||||
tt.setupLegacyFn(m)
|
||||
}
|
||||
|
||||
ctx := context.TODO()
|
||||
if tt.ctx != nil {
|
||||
ctx = *tt.ctx
|
||||
if tt.setupStorageFn != nil {
|
||||
tt.setupStorageFn(m)
|
||||
}
|
||||
|
||||
dw := NewDualWriter(Mode3, ls, us, p, kind)
|
||||
|
||||
err := dw.(*DualWriterMode3).deleteCollectionFromLegacyStorage(ctx, exampleObj, func(ctx context.Context, obj runtime.Object) error { return nil }, tt.input, &metainternalversion.ListOptions{})
|
||||
assert.NoError(t, err)
|
||||
obj, err := dw.DeleteCollection(context.Background(), func(ctx context.Context, obj runtime.Object) error { return nil }, &metav1.DeleteOptions{TypeMeta: metav1.TypeMeta{Kind: tt.input}}, &metainternalversion.ListOptions{})
|
||||
|
||||
if tt.wantErr {
|
||||
assert.Error(t, err)
|
||||
return
|
||||
}
|
||||
assert.Equal(t, exampleList, obj)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestMode3_Update(t *testing.T) {
|
||||
type testCase struct {
|
||||
expectedObj runtime.Object
|
||||
setupLegacyFn func(m *mock.Mock, input string)
|
||||
setupStorageFn func(m *mock.Mock, input string)
|
||||
name string
|
||||
@ -563,20 +479,32 @@ func TestMode3_Update(t *testing.T) {
|
||||
tests :=
|
||||
[]testCase{
|
||||
{
|
||||
name: "update an object in unified store",
|
||||
name: "update an object in both stores",
|
||||
input: "foo",
|
||||
setupStorageFn: func(m *mock.Mock, input string) {
|
||||
m.On("Update", mock.Anything, input, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil)
|
||||
},
|
||||
setupLegacyFn: func(m *mock.Mock, input string) {
|
||||
m.On("Update", mock.Anything, input, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil)
|
||||
m.On("Update", mock.Anything, input, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil).Once()
|
||||
},
|
||||
setupStorageFn: func(m *mock.Mock, input string) {
|
||||
m.On("Update", mock.Anything, input, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil).Once()
|
||||
},
|
||||
expectedObj: exampleObj,
|
||||
},
|
||||
{
|
||||
name: "error updating an object in unified store",
|
||||
name: "error updating legacy store",
|
||||
input: "object-fail",
|
||||
setupLegacyFn: func(m *mock.Mock, input string) {
|
||||
m.On("Update", mock.Anything, input, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, false, errors.New("error")).Once()
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "error updating unistore",
|
||||
input: "object-fail",
|
||||
setupLegacyFn: func(m *mock.Mock, input string) {
|
||||
m.On("Update", mock.Anything, input, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil).Once()
|
||||
},
|
||||
setupStorageFn: func(m *mock.Mock, input string) {
|
||||
m.On("Update", mock.Anything, input, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, false, errors.New("error"))
|
||||
m.On("Update", mock.Anything, input, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, false, errors.New("error")).Once()
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
@ -607,76 +535,8 @@ func TestMode3_Update(t *testing.T) {
|
||||
return
|
||||
}
|
||||
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, obj, exampleObj)
|
||||
assert.NotEqual(t, obj, anotherObj)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestMode1_UpdateOnLegacyStorage(t *testing.T) {
|
||||
ctxCanceled, cancel := context.WithCancel(context.TODO())
|
||||
cancel()
|
||||
|
||||
type testCase struct {
|
||||
ctx *context.Context
|
||||
setupLegacyFn func(m *mock.Mock, input string)
|
||||
setupGetFn func(m *mock.Mock, input string)
|
||||
name string
|
||||
input string
|
||||
}
|
||||
tests :=
|
||||
[]testCase{
|
||||
{
|
||||
name: "Update on legacy storage",
|
||||
input: "foo",
|
||||
setupLegacyFn: func(m *mock.Mock, input string) {
|
||||
m.On("Update", mock.Anything, input, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(anotherObj, false, nil)
|
||||
},
|
||||
setupGetFn: func(m *mock.Mock, input string) {
|
||||
m.On("Get", mock.Anything, input, mock.Anything).Return(exampleObj, nil)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Update on legacy storage works even if parent context is canceled",
|
||||
ctx: &ctxCanceled,
|
||||
input: "foo",
|
||||
setupLegacyFn: func(m *mock.Mock, input string) {
|
||||
m.On("Update", mock.Anything, input, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(anotherObj, false, nil)
|
||||
},
|
||||
setupGetFn: func(m *mock.Mock, input string) {
|
||||
m.On("Get", mock.Anything, input, mock.Anything).Return(exampleObj, nil)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
l := (LegacyStorage)(nil)
|
||||
s := (Storage)(nil)
|
||||
m := &mock.Mock{}
|
||||
|
||||
ls := legacyStoreMock{m, l}
|
||||
us := storageMock{m, s}
|
||||
|
||||
if tt.setupLegacyFn != nil {
|
||||
tt.setupLegacyFn(m, tt.input)
|
||||
}
|
||||
|
||||
if tt.setupGetFn != nil {
|
||||
tt.setupGetFn(m, tt.input)
|
||||
}
|
||||
|
||||
ctx := context.TODO()
|
||||
if tt.ctx != nil {
|
||||
ctx = *tt.ctx
|
||||
}
|
||||
|
||||
dw := NewDualWriter(Mode3, ls, us, p, kind)
|
||||
|
||||
err := dw.(*DualWriterMode3).updateOnLegacyStorageMode3(ctx, exampleObj, tt.input, updatedObjInfoObj{}, func(ctx context.Context, obj runtime.Object) error { return nil }, func(ctx context.Context, obj, old runtime.Object) error { return nil }, false, &metav1.UpdateOptions{})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, tt.expectedObj, obj)
|
||||
assert.NotEqual(t, anotherObj, obj)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user