fix(unified-storage): return legacy data in mode 2 (#100353)

This commit is contained in:
Jean-Philippe Quéméner 2025-02-11 10:34:56 +01:00 committed by GitHub
parent b6c0db31d9
commit 6eaf702e96
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 96 additions and 99 deletions

View File

@ -94,7 +94,7 @@ const (
Mode1 Mode1
// Mode2 is the dual writing mode that represents writing to LegacyStorage and Storage and reading from LegacyStorage. // Mode2 is the dual writing mode that represents writing to LegacyStorage and Storage and reading from LegacyStorage.
// The objects written to storage will include any labels and annotations. // The objects written to storage will include any labels and annotations.
// When reading values, the results will be from Storage when they exist, otherwise from legacy storage // When reading values, the results will be from LegacyStorage.
Mode2 Mode2
// Mode3 represents writing to LegacyStorage and Storage and reading from Storage. // Mode3 represents writing to LegacyStorage and Storage and reading from Storage.
// NOTE: Requesting mode3 will only happen when after a background sync job succeeds // NOTE: Requesting mode3 will only happen when after a background sync job succeeds

View File

@ -32,10 +32,9 @@ type DualWriterMode2 struct {
const mode2Str = "2" const mode2Str = "2"
// NewDualWriterMode2 returns a new DualWriter in mode 2. // newDualWriterMode2 returns a new DualWriter in mode 2.
// Mode 2 represents writing to LegacyStorage first, then to Storage // Mode 2 represents writing to LegacyStorage first, then to Storage.
// When reading, values from storage will be returned if they exist // When reading, values from LegacyStorage will be returned.
// otherwise the value from legacy will be used
func newDualWriterMode2(legacy LegacyStorage, storage Storage, dwm *dualWriterMetrics, resource string) *DualWriterMode2 { func newDualWriterMode2(legacy LegacyStorage, storage Storage, dwm *dualWriterMetrics, resource string) *DualWriterMode2 {
return &DualWriterMode2{ return &DualWriterMode2{
Legacy: legacy, Legacy: legacy,
@ -71,7 +70,7 @@ func (d *DualWriterMode2) Create(ctx context.Context, in runtime.Object, createV
if err != nil { if err != nil {
log.Error(err, "unable to create object in legacy storage") log.Error(err, "unable to create object in legacy storage")
d.recordLegacyDuration(true, mode2Str, d.resource, method, startLegacy) d.recordLegacyDuration(true, mode2Str, d.resource, method, startLegacy)
return createdFromLegacy, err return nil, err
} }
d.recordLegacyDuration(false, mode2Str, d.resource, method, startLegacy) d.recordLegacyDuration(false, mode2Str, d.resource, method, startLegacy)
@ -79,7 +78,7 @@ func (d *DualWriterMode2) Create(ctx context.Context, in runtime.Object, createV
accCreated, err := meta.Accessor(createdCopy) accCreated, err := meta.Accessor(createdCopy)
if err != nil { if err != nil {
return createdFromLegacy, err return nil, err
} }
accCreated.SetResourceVersion("") accCreated.SetResourceVersion("")
@ -93,51 +92,50 @@ func (d *DualWriterMode2) Create(ctx context.Context, in runtime.Object, createV
} }
d.recordStorageDuration(false, mode2Str, d.resource, method, startStorage) d.recordStorageDuration(false, mode2Str, d.resource, method, startStorage)
go func() {
areEqual := Compare(createdFromStorage, createdFromLegacy) areEqual := Compare(createdFromStorage, createdFromLegacy)
d.recordOutcome(mode2Str, getName(createdFromStorage), areEqual, method) d.recordOutcome(mode2Str, getName(createdFromStorage), areEqual, method)
if !areEqual { if !areEqual {
log.Info("object from legacy and storage are not equal") log.Info("object from legacy and storage are not equal")
} }
}()
return createdFromLegacy, err return createdFromLegacy, err
} }
// It retrieves an object from Storage if possible, and if not it falls back to LegacyStorage. // Get retrieves an object from Storage if possible, and if not it falls back to LegacyStorage.
func (d *DualWriterMode2) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { func (d *DualWriterMode2) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
var method = "get" var method = "get"
log := d.Log.WithValues("name", name, "resourceVersion", options.ResourceVersion, "method", method) log := d.Log.WithValues("name", name, "resourceVersion", options.ResourceVersion, "method", method)
ctx = klog.NewContext(ctx, log) ctx = klog.NewContext(ctx, log)
startStorage := time.Now()
objStorage, err := d.Storage.Get(ctx, name, options)
d.recordStorageDuration(err != nil, mode2Str, d.resource, method, startStorage)
if err != nil {
// if it errors because it's not found, we try to fetch it from the legacy storage
if !apierrors.IsNotFound(err) {
log.Error(err, "unable to fetch object from storage")
return objStorage, err
}
log.Info("object not found in storage, fetching from legacy")
}
startLegacy := time.Now() startLegacy := time.Now()
objLegacy, err := d.Legacy.Get(ctx, name, options) objLegacy, err := d.Legacy.Get(ctx, name, options)
if err != nil { if err != nil {
log.Error(err, "unable to fetch object from legacy") log.Error(err, "unable to fetch object from legacy")
d.recordLegacyDuration(true, mode2Str, d.resource, method, startLegacy) d.recordLegacyDuration(true, mode2Str, d.resource, method, startLegacy)
return objLegacy, err return nil, err
} }
d.recordLegacyDuration(false, mode2Str, d.resource, method, startLegacy) d.recordLegacyDuration(false, mode2Str, d.resource, method, startLegacy)
startStorage := time.Now()
objStorage, err := d.Storage.Get(ctx, name, options)
d.recordStorageDuration(err != nil, mode2Str, d.resource, method, startStorage)
if err != nil {
if !apierrors.IsNotFound(err) {
log.Error(err, "unable to fetch object from storage")
return nil, err
}
log.Info("object not found in storage, dual write or migration didn't happen yet")
}
go func() {
areEqual := Compare(objStorage, objLegacy) areEqual := Compare(objStorage, objLegacy)
d.recordOutcome(mode2Str, name, areEqual, method) d.recordOutcome(mode2Str, name, areEqual, method)
if !areEqual { if !areEqual {
log.Info("object from legacy and storage are not equal") log.Info("object from legacy and storage are not equal")
} }
}()
if objStorage != nil {
return objStorage, err
}
return objLegacy, err return objLegacy, err
} }
@ -156,6 +154,16 @@ func (d *DualWriterMode2) List(ctx context.Context, options *metainternalversion
return ll, err return ll, err
} }
d.recordLegacyDuration(false, mode2Str, d.resource, method, startLegacy) d.recordLegacyDuration(false, mode2Str, d.resource, method, startLegacy)
// Even if we don't compare, we want to fetch from unified storage and check that it doesn't error.
startStorage := time.Now()
if _, err := d.Storage.List(ctx, options); err != nil {
log.Error(err, "unable to list objects from storage")
d.recordStorageDuration(true, mode2Str, d.resource, method, startStorage)
return nil, err
}
d.recordStorageDuration(false, mode2Str, d.resource, method, startStorage)
return ll, nil return ll, nil
} }
@ -170,36 +178,26 @@ func (d *DualWriterMode2) DeleteCollection(ctx context.Context, deleteValidation
if err != nil { if err != nil {
log.WithValues("deleted", deletedLegacy).Error(err, "failed to delete collection successfully from legacy storage") log.WithValues("deleted", deletedLegacy).Error(err, "failed to delete collection successfully from legacy storage")
d.recordLegacyDuration(true, mode2Str, d.resource, method, startLegacy) d.recordLegacyDuration(true, mode2Str, d.resource, method, startLegacy)
return deletedLegacy, err return nil, err
} }
d.recordLegacyDuration(false, mode2Str, d.resource, method, startLegacy) d.recordLegacyDuration(false, mode2Str, d.resource, method, startLegacy)
legacyList, err := meta.ExtractList(deletedLegacy)
if err != nil {
log.Error(err, "unable to extract list from legacy storage")
return nil, err
}
// Only the items deleted by the legacy DeleteCollection call are selected for deletion by Storage.
_, err = parseList(legacyList)
if err != nil {
return nil, err
}
startStorage := time.Now() startStorage := time.Now()
deletedStorage, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions) deletedStorage, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions)
if err != nil { if err != nil {
log.WithValues("deleted", deletedStorage).Error(err, "failed to delete collection successfully from Storage") log.WithValues("deleted", deletedStorage).Error(err, "failed to delete collection successfully from Storage")
d.recordStorageDuration(true, mode2Str, d.resource, method, startStorage) d.recordStorageDuration(true, mode2Str, d.resource, method, startStorage)
return deletedStorage, err return nil, err
} }
d.recordStorageDuration(false, mode2Str, d.resource, method, startStorage) d.recordStorageDuration(false, mode2Str, d.resource, method, startStorage)
go func() {
areEqual := Compare(deletedStorage, deletedLegacy) areEqual := Compare(deletedStorage, deletedLegacy)
d.recordOutcome(mode2Str, getName(deletedStorage), areEqual, method) d.recordOutcome(mode2Str, getName(deletedStorage), areEqual, method)
if !areEqual { if !areEqual {
log.Info("object from legacy and storage are not equal") log.Info("object from legacy and storage are not equal")
} }
}()
return deletedLegacy, err return deletedLegacy, err
} }
@ -209,17 +207,6 @@ func (d *DualWriterMode2) Delete(ctx context.Context, name string, deleteValidat
log := d.Log.WithValues("name", name, "method", method) log := d.Log.WithValues("name", name, "method", method)
ctx = klog.NewContext(ctx, log) ctx = klog.NewContext(ctx, log)
startStorage := time.Now()
deletedS, async, err := d.Storage.Delete(ctx, name, deleteValidation, options)
if err != nil {
if !apierrors.IsNotFound(err) {
log.WithValues("objectList", deletedS).Error(err, "could not delete from duplicate storage")
d.recordStorageDuration(true, mode2Str, d.resource, method, startStorage)
}
return deletedS, async, err
}
d.recordStorageDuration(false, mode2Str, d.resource, method, startStorage)
startLegacy := time.Now() startLegacy := time.Now()
deletedLS, async, err := d.Legacy.Delete(ctx, name, deleteValidation, options) deletedLS, async, err := d.Legacy.Delete(ctx, name, deleteValidation, options)
@ -232,11 +219,24 @@ func (d *DualWriterMode2) Delete(ctx context.Context, name string, deleteValidat
} }
d.recordLegacyDuration(false, mode2Str, d.resource, method, startLegacy) d.recordLegacyDuration(false, mode2Str, d.resource, method, startLegacy)
startStorage := time.Now()
deletedS, async, err := d.Storage.Delete(ctx, name, deleteValidation, options)
if err != nil {
if !apierrors.IsNotFound(err) {
log.WithValues("objectList", deletedS).Error(err, "could not delete from duplicate storage")
d.recordStorageDuration(true, mode2Str, d.resource, method, startStorage)
}
return deletedS, async, err
}
d.recordStorageDuration(false, mode2Str, d.resource, method, startStorage)
go func() {
areEqual := Compare(deletedS, deletedLS) areEqual := Compare(deletedS, deletedLS)
d.recordOutcome(mode2Str, name, areEqual, method) d.recordOutcome(mode2Str, name, areEqual, method)
if !areEqual { if !areEqual {
log.WithValues("name", name).Info("object from legacy and storage are not equal") log.WithValues("name", name).Info("object from legacy and storage are not equal")
} }
}()
return deletedLS, async, err return deletedLS, async, err
} }
@ -268,15 +268,13 @@ func (d *DualWriterMode2) Update(ctx context.Context, name string, objInfo rest.
return objFromStorage, created, err return objFromStorage, created, err
} }
go func() {
areEqual := Compare(objFromStorage, objFromLegacy) areEqual := Compare(objFromStorage, objFromLegacy)
d.recordOutcome(mode2Str, name, areEqual, method) d.recordOutcome(mode2Str, name, areEqual, method)
if !areEqual { if !areEqual {
log.WithValues("name", name).Info("object from legacy and storage are not equal") log.WithValues("name", name).Info("object from legacy and storage are not equal")
} }
}()
if objFromStorage != nil {
return objFromStorage, created, err
}
return objFromLegacy, created, err return objFromLegacy, created, err
} }

View File

@ -5,8 +5,8 @@ import (
"errors" "errors"
"testing" "testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -69,11 +69,11 @@ func TestMode2_Create(t *testing.T) {
obj, err := dw.Create(context.Background(), tt.input, createFn, &metav1.CreateOptions{}) obj, err := dw.Create(context.Background(), tt.input, createFn, &metav1.CreateOptions{})
if tt.wantErr { if tt.wantErr {
assert.Error(t, err) require.Error(t, err)
return return
} }
assert.Equal(t, exampleObj, obj) require.Equal(t, exampleObj, obj)
}) })
} }
} }
@ -142,12 +142,12 @@ func TestMode2_Get(t *testing.T) {
obj, err := dw.Get(context.Background(), tt.input, &metav1.GetOptions{}) obj, err := dw.Get(context.Background(), tt.input, &metav1.GetOptions{})
if tt.wantErr { if tt.wantErr {
assert.Error(t, err) require.Error(t, err)
return return
} }
assert.Equal(t, obj, exampleObj) require.Equal(t, obj, exampleObj)
assert.NotEqual(t, obj, anotherObj) require.NotEqual(t, obj, anotherObj)
}) })
} }
} }
@ -195,10 +195,10 @@ func TestMode2_List(t *testing.T) {
obj, err := dw.List(context.Background(), &metainternalversion.ListOptions{}) obj, err := dw.List(context.Background(), &metainternalversion.ListOptions{})
if tt.wantErr { if tt.wantErr {
assert.Error(t, err) require.Error(t, err)
return return
} }
assert.Equal(t, exampleList, obj) require.Equal(t, exampleList, obj)
}) })
} }
} }
@ -288,12 +288,12 @@ func TestMode2_Delete(t *testing.T) {
obj, _, err := dw.Delete(context.Background(), tt.input, func(context.Context, runtime.Object) error { return nil }, &metav1.DeleteOptions{}) obj, _, err := dw.Delete(context.Background(), tt.input, func(context.Context, runtime.Object) error { return nil }, &metav1.DeleteOptions{})
if tt.wantErr { if tt.wantErr {
assert.Error(t, err) require.Error(t, err)
return return
} }
assert.Equal(t, obj, exampleObj) require.Equal(t, obj, exampleObj)
assert.NotEqual(t, obj, anotherObj) require.NotEqual(t, obj, anotherObj)
}) })
} }
} }
@ -320,7 +320,7 @@ func TestMode2_DeleteCollection(t *testing.T) {
}, },
{ {
name: "error deleting a collection in the storage when legacy store is successful", name: "error deleting a collection in the storage when legacy store is successful",
input: "foo", input: "fail",
setupLegacyFn: func(m *mock.Mock) { setupLegacyFn: func(m *mock.Mock) {
m.On("DeleteCollection", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, nil) m.On("DeleteCollection", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, nil)
}, },
@ -333,7 +333,7 @@ func TestMode2_DeleteCollection(t *testing.T) {
name: "deleting a collection when error in legacy store", name: "deleting a collection when error in legacy store",
input: "fail", input: "fail",
setupLegacyFn: func(m *mock.Mock) { setupLegacyFn: func(m *mock.Mock) {
m.On("DeleteCollection", mock.Anything, mock.Anything, &metav1.DeleteOptions{TypeMeta: metav1.TypeMeta{Kind: "fail"}}, mock.Anything).Return(nil, errors.New("error")) m.On("DeleteCollection", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("error"))
}, },
wantErr: true, wantErr: true,
}, },
@ -343,16 +343,15 @@ func TestMode2_DeleteCollection(t *testing.T) {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
l := (LegacyStorage)(nil) l := (LegacyStorage)(nil)
s := (Storage)(nil) s := (Storage)(nil)
m := &mock.Mock{}
ls := legacyStoreMock{m, l} ls := legacyStoreMock{&mock.Mock{}, l}
us := storageMock{m, s} us := storageMock{&mock.Mock{}, s}
if tt.setupLegacyFn != nil { if tt.setupLegacyFn != nil {
tt.setupLegacyFn(m) tt.setupLegacyFn(ls.Mock)
} }
if tt.setupStorageFn != nil { if tt.setupStorageFn != nil {
tt.setupStorageFn(m) tt.setupStorageFn(us.Mock)
} }
dw := NewDualWriter(Mode2, ls, us, p, kind) dw := NewDualWriter(Mode2, ls, us, p, kind)
@ -360,10 +359,10 @@ func TestMode2_DeleteCollection(t *testing.T) {
obj, err := dw.DeleteCollection(context.Background(), func(ctx context.Context, obj runtime.Object) error { return nil }, &metav1.DeleteOptions{TypeMeta: metav1.TypeMeta{Kind: tt.input}}, &metainternalversion.ListOptions{}) obj, err := dw.DeleteCollection(context.Background(), func(ctx context.Context, obj runtime.Object) error { return nil }, &metav1.DeleteOptions{TypeMeta: metav1.TypeMeta{Kind: tt.input}}, &metainternalversion.ListOptions{})
if tt.wantErr { if tt.wantErr {
assert.Error(t, err) require.Error(t, err)
return return
} }
assert.Equal(t, exampleList, obj) require.Equal(t, exampleList, obj)
}) })
} }
} }
@ -421,12 +420,12 @@ func TestMode2_Update(t *testing.T) {
obj, _, err := dw.Update(context.Background(), tt.input, updatedObjInfoObj{}, func(ctx context.Context, obj runtime.Object) error { return nil }, func(ctx context.Context, obj, old runtime.Object) error { return nil }, false, &metav1.UpdateOptions{}) obj, _, err := dw.Update(context.Background(), tt.input, updatedObjInfoObj{}, func(ctx context.Context, obj runtime.Object) error { return nil }, func(ctx context.Context, obj, old runtime.Object) error { return nil }, false, &metav1.UpdateOptions{})
if tt.wantErr { if tt.wantErr {
assert.Error(t, err) require.Error(t, err)
return return
} }
assert.Equal(t, tt.expectedObj, obj) require.Equal(t, tt.expectedObj, obj)
assert.NotEqual(t, anotherObj, obj) require.NotEqual(t, anotherObj, obj)
}) })
} }
} }