Dual Writer simplification (#93852)

* All objects should have an UID

* Now with a different error message

* Simplify create on DW 2: use the same object to write to both storages

* Run only one test

* Add check for status code

* Add name if it's not present in mode2

* Populate UID in legacy

* Remove logs and commented code

* Change dualwriter1

* Remove commented code

* Fix list test

* remove get on update from dualwriter 2

* Get object before updating. Better var renaming

* Finish rebasing

* Comment test

* Uncomment tests

* Update legacy first. Add preconditions

* Remove preconditions

* Fix update test

* copy RV from unified to legacy objects

* revert changes to playlist xorm store

* Improve logging. Add go routines for mode3

* Add tests for async funcs in mode3

* Lint

* Lint

* Lint. Start to fix tests

* Fix watcher tests

* Fix store tests

* Fiinish fixing watcher tests

* Fix server tests

* add name check

* Update pkg/apiserver/rest/dualwriter_mode1.go

Co-authored-by: Bruno Abrantes <bruno.abrantes@grafana.com>

* All objects should have an UID

* Now with a different error message

* Simplify create on DW 2: use the same object to write to both storages

* Run only one test

* Add check for status code

* Add name if it's not present in mode2

* Populate UID in legacy

* Remove logs and commented code

* Change dualwriter1

* Remove commented code

* Fix list test

* remove get on update from dualwriter 2

* Get object before updating. Better var renaming

* Finish rebasing

* Comment test

* Uncomment tests

* Fix update test

* revert changes to playlist xorm store

* Improve logging. Add go routines for mode3

* Lint

* Fix watcher tests

* Fiinish fixing watcher tests

* Add mode 5 with etcd test case. Add early check to fail on populated RV in payload

* we can't set RV to the found object when updating

* Lint

* Don't fail on update playlists

* Name should not be different when updating and it should be not empty on creating

* Fix tests

* Update pkg/apiserver/rest/dualwriter_mode2.go

Co-authored-by: Todd Treece <360020+toddtreece@users.noreply.github.com>

* Lint

* Fix mode 5 tests

* Lint

* Add generateName condition on every mode. Fix tests

* Lint

* Add condition on where name or generate name have to be set

* Fix test

* Lint

* Fix folders test

* We dont need to send name for mode1

* Fail if UID is not present

* Remove change from not running test

* Remove unused line

* Lint

* Update pkg/storage/unified/apistore/store.go

Co-authored-by: Todd Treece <360020+toddtreece@users.noreply.github.com>

* Improve error message

* Fix broken watcher test

* Fail on name mismatch on update

* Remove log

* Make sure UIDs match on create in both stores

* Lint

* Write first to unified storage

* Remove uid setting

* Remove RV only in mode2

* Fix test. Remove log line

* test

* No need to asser on RV in mode3

* Remove RV check due to race condition

* Update dualwriter.go

Co-authored-by: Georges Chaudy <chaudyg@gmail.com>

* Update pkg/storage/unified/client.go

* remove unused parameter

* log an error for object is missing UID instead of returning an error

* remove obj.SetResourceVersion("")

* log an error for object is missing UID instead of returning an error

* FInalise merge

* Move RV check to where it was

* Remove name check

* Remove server check for backwards compatibility

* Remove unused fn

* Move test checks for another PR

* Dont commit go work sum changes

* Only log error if RV is present for now.

---------

Co-authored-by: Todd Treece <todd.treece@grafana.com>
Co-authored-by: Bruno Abrantes <bruno.abrantes@grafana.com>
Co-authored-by: Todd Treece <360020+toddtreece@users.noreply.github.com>
Co-authored-by: Georges Chaudy <chaudyg@gmail.com>
This commit is contained in:
Leonor Oliveira
2024-10-23 10:29:41 +02:00
committed by GitHub
parent 1696ac8ea5
commit a03652494c
15 changed files with 834 additions and 486 deletions

View File

@@ -10,7 +10,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/request"
@@ -138,27 +137,6 @@ func NewDualWriter(
}
}
type updateWrapper struct {
upstream rest.UpdatedObjectInfo
updated runtime.Object
}
// Returns preconditions built from the updated object, if applicable.
// May return nil, or a preconditions object containing nil fields,
// if no preconditions can be determined from the updated object.
func (u *updateWrapper) Preconditions() *metav1.Preconditions {
if u.upstream == nil {
return nil
}
return u.upstream.Preconditions()
}
// UpdatedObject returns the updated object, given a context and old object.
// The only time an empty oldObj should be passed in is if a "create on update" is occurring (there is no oldObj).
func (u *updateWrapper) UpdatedObject(ctx context.Context, oldObj runtime.Object) (newObj runtime.Object, err error) {
return u.updated, nil
}
type NamespacedKVStore interface {
Get(ctx context.Context, key string) (string, bool, error)
Set(ctx context.Context, key, value string) error

View File

@@ -3,9 +3,10 @@ package rest
import (
"context"
"errors"
"fmt"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@@ -41,29 +42,37 @@ func (d *DualWriterMode1) Mode() DualWriterMode {
}
// Create overrides the behavior of the generic DualWriter and writes only to LegacyStorage.
func (d *DualWriterMode1) Create(ctx context.Context, original runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
func (d *DualWriterMode1) Create(ctx context.Context, in runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
var method = "create"
log := d.Log.WithValues("method", method)
ctx = klog.NewContext(ctx, log)
accIn, err := meta.Accessor(in)
if err != nil {
return nil, err
}
if accIn.GetUID() != "" {
return nil, fmt.Errorf("UID should not be present:: %v", accIn.GetUID())
}
startLegacy := time.Now()
created, err := d.Legacy.Create(ctx, original, createValidation, options)
created, err := d.Legacy.Create(ctx, in, createValidation, options)
d.recordLegacyDuration(err != nil, mode1Str, d.resource, method, startLegacy)
if err != nil {
log.Error(err, "unable to create object in legacy storage")
d.recordLegacyDuration(true, mode1Str, d.resource, method, startLegacy)
return created, err
}
d.recordLegacyDuration(false, mode1Str, d.resource, method, startLegacy)
createdCopy := created.DeepCopyObject()
//nolint:errcheck
go d.createOnUnifiedStorage(ctx, original, createValidation, createdCopy, options)
go d.createOnUnifiedStorage(ctx, createValidation, createdCopy, options)
return created, err
}
func (d *DualWriterMode1) createOnUnifiedStorage(ctx context.Context, original runtime.Object, createValidation rest.ValidateObjectFunc, createdCopy runtime.Object, options *metav1.CreateOptions) error {
func (d *DualWriterMode1) createOnUnifiedStorage(ctx context.Context, createValidation rest.ValidateObjectFunc, createdCopy runtime.Object, options *metav1.CreateOptions) error {
var method = "create"
log := d.Log.WithValues("method", method)
@@ -71,14 +80,18 @@ func (d *DualWriterMode1) createOnUnifiedStorage(ctx context.Context, original r
ctx, cancel := context.WithTimeoutCause(context.WithoutCancel(ctx), time.Second*10, errors.New("storage create timeout"))
defer cancel()
if err := enrichLegacyObject(original, createdCopy); err != nil {
cancel()
accCreated, err := meta.Accessor(createdCopy)
if err != nil {
return err
}
accCreated.SetResourceVersion("")
startStorage := time.Now()
storageObj, errObjectSt := d.Storage.Create(ctx, createdCopy, createValidation, options)
d.recordStorageDuration(errObjectSt != nil, mode1Str, d.resource, method, startStorage)
if errObjectSt != nil {
log.Error(errObjectSt, "unable to create object in storage")
cancel()
}
areEqual := Compare(storageObj, createdCopy)
@@ -109,7 +122,7 @@ func (d *DualWriterMode1) Get(ctx context.Context, name string, options *metav1.
return res, errLegacy
}
func (d *DualWriterMode1) getFromUnifiedStorage(ctx context.Context, res runtime.Object, name string, options *metav1.GetOptions) error {
func (d *DualWriterMode1) getFromUnifiedStorage(ctx context.Context, objFromLegacy runtime.Object, name string, options *metav1.GetOptions) error {
var method = "get"
log := d.Log.WithValues("method", method, "name", name)
@@ -124,7 +137,7 @@ func (d *DualWriterMode1) getFromUnifiedStorage(ctx context.Context, res runtime
cancel()
}
areEqual := Compare(storageObj, res)
areEqual := Compare(storageObj, objFromLegacy)
d.recordOutcome(mode1Str, name, areEqual, method)
if !areEqual {
log.WithValues("name", name).Info("object from legacy and storage are not equal")
@@ -140,19 +153,19 @@ func (d *DualWriterMode1) List(ctx context.Context, options *metainternalversion
ctx = klog.NewContext(ctx, log)
startLegacy := time.Now()
res, errLegacy := d.Legacy.List(ctx, options)
if errLegacy != nil {
log.Error(errLegacy, "unable to list object in legacy storage")
res, err := d.Legacy.List(ctx, options)
d.recordLegacyDuration(err != nil, mode1Str, d.resource, method, startLegacy)
if err != nil {
log.Error(err, "unable to list object in legacy storage")
}
d.recordLegacyDuration(errLegacy != nil, mode1Str, d.resource, method, startLegacy)
//nolint:errcheck
go d.listFromUnifiedStorage(ctx, options, res)
return res, errLegacy
return res, err
}
func (d *DualWriterMode1) listFromUnifiedStorage(ctx context.Context, options *metainternalversion.ListOptions, res runtime.Object) error {
func (d *DualWriterMode1) listFromUnifiedStorage(ctx context.Context, options *metainternalversion.ListOptions, objFromLegacy runtime.Object) error {
var method = "list"
log := d.Log.WithValues("resourceVersion", options.ResourceVersion, "method", method)
@@ -160,14 +173,15 @@ func (d *DualWriterMode1) listFromUnifiedStorage(ctx context.Context, options *m
// Ignores cancellation signals from parent context. Will automatically be canceled after 10 seconds.
ctx, cancel := context.WithTimeoutCause(context.WithoutCancel(ctx), time.Second*10, errors.New("storage list timeout"))
defer cancel()
storageObj, err := d.Storage.List(ctx, options)
d.recordStorageDuration(err != nil, mode1Str, d.resource, method, startStorage)
if err != nil {
log.Error(err, "unable to list objects from unified storage")
cancel()
}
areEqual := Compare(storageObj, res)
d.recordOutcome(mode1Str, getName(res), areEqual, method)
areEqual := Compare(storageObj, objFromLegacy)
d.recordOutcome(mode1Str, getName(objFromLegacy), areEqual, method)
if !areEqual {
log.Info("object from legacy and storage are not equal")
}
@@ -182,12 +196,11 @@ func (d *DualWriterMode1) Delete(ctx context.Context, name string, deleteValidat
startLegacy := time.Now()
res, async, err := d.Legacy.Delete(ctx, name, deleteValidation, options)
d.recordLegacyDuration(err != nil, mode1Str, name, method, startLegacy)
if err != nil {
log.Error(err, "unable to delete object in legacy storage")
d.recordLegacyDuration(true, mode1Str, d.resource, method, startLegacy)
return res, async, err
}
d.recordLegacyDuration(false, mode1Str, name, method, startLegacy)
//nolint:errcheck
go d.deleteFromUnifiedStorage(ctx, res, name, deleteValidation, options)
@@ -206,6 +219,7 @@ func (d *DualWriterMode1) deleteFromUnifiedStorage(ctx context.Context, res runt
storageObj, _, err := d.Storage.Delete(ctx, name, deleteValidation, options)
d.recordStorageDuration(err != nil, mode1Str, d.resource, method, startStorage)
if err != nil {
log.Error(err, "unable to delete object from unified storage")
cancel()
}
areEqual := Compare(storageObj, res)
@@ -225,12 +239,11 @@ func (d *DualWriterMode1) DeleteCollection(ctx context.Context, deleteValidation
startLegacy := time.Now()
res, err := d.Legacy.DeleteCollection(ctx, deleteValidation, options, listOptions)
d.recordLegacyDuration(err != nil, mode1Str, d.resource, method, startLegacy)
if err != nil {
log.Error(err, "unable to delete collection in legacy storage")
d.recordLegacyDuration(true, mode1Str, d.resource, method, startLegacy)
return res, err
}
d.recordLegacyDuration(false, mode1Str, d.resource, method, startLegacy)
//nolint:errcheck
go d.deleteCollectionFromUnifiedStorage(ctx, res, deleteValidation, options, listOptions)
@@ -249,6 +262,7 @@ func (d *DualWriterMode1) deleteCollectionFromUnifiedStorage(ctx context.Context
storageObj, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions)
d.recordStorageDuration(err != nil, mode1Str, d.resource, method, startStorage)
if err != nil {
log.Error(err, "unable to delete collection object from unified storage")
cancel()
}
areEqual := Compare(storageObj, res)
@@ -266,69 +280,41 @@ func (d *DualWriterMode1) Update(ctx context.Context, name string, objInfo rest.
ctx = klog.NewContext(ctx, log)
startLegacy := time.Now()
res, async, err := d.Legacy.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
objLegacy, async, err := d.Legacy.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
d.recordLegacyDuration(err != nil, mode1Str, d.resource, method, startLegacy)
if err != nil {
log.Error(err, "unable to update in legacy storage")
d.recordLegacyDuration(true, mode1Str, d.resource, method, startLegacy)
return res, async, err
return objLegacy, async, err
}
d.recordLegacyDuration(false, mode1Str, d.resource, method, startLegacy)
//nolint:errcheck
go d.updateOnUnifiedStorage(ctx, res, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
go d.updateOnUnifiedStorage(ctx, objLegacy, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
return res, async, err
return objLegacy, async, err
}
func (d *DualWriterMode1) updateOnUnifiedStorage(ctx context.Context, res runtime.Object, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) error {
func (d *DualWriterMode1) updateOnUnifiedStorage(ctx context.Context, objLegacy runtime.Object, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) error {
var method = "update"
log := d.Log.WithValues("name", name, "method", method, "name", name)
// Ignores cancellation signals from parent context. Will automatically be canceled after 10 seconds.
ctx, cancel := context.WithTimeoutCause(context.WithoutCancel(ctx), time.Second*10, errors.New("storage update timeout"))
resCopy := res.DeepCopyObject()
// get the object to be updated
foundObj, err := d.Storage.Get(ctx, name, &metav1.GetOptions{})
if err != nil {
if !apierrors.IsNotFound(err) {
log.WithValues("object", foundObj).Error(err, "could not get object to update")
cancel()
}
log.Info("object not found for update, creating one")
}
updated, err := objInfo.UpdatedObject(ctx, resCopy)
if err != nil {
log.WithValues("object", updated).Error(err, "could not update or create object")
cancel()
}
// if the object is found, create a new updateWrapper with the object found
if foundObj != nil {
if err := enrichLegacyObject(foundObj, resCopy); err != nil {
log.Error(err, "could not enrich object")
cancel()
}
objInfo = &updateWrapper{
upstream: objInfo,
updated: resCopy,
}
}
startStorage := time.Now()
defer cancel()
storageObj, _, errObjectSt := d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
d.recordStorageDuration(errObjectSt != nil, mode1Str, d.resource, method, startStorage)
storageObj, _, err := d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
d.recordStorageDuration(err != nil, mode1Str, d.resource, method, startStorage)
if err != nil {
log.Error(err, "unable to update object from unified storage")
cancel()
}
areEqual := Compare(storageObj, res)
areEqual := Compare(storageObj, objLegacy)
d.recordOutcome(mode1Str, name, areEqual, method)
if !areEqual {
log.WithValues("name", name).Info("object from legacy and storage are not equal")
}
return errObjectSt
return err
}
func (d *DualWriterMode1) Destroy() {

View File

@@ -16,11 +16,12 @@ import (
"k8s.io/apiserver/pkg/apis/example"
)
var exampleObj = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
var exampleObjNoRV = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
var exampleObjDifferentRV = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "3"}, Spec: example.PodSpec{}, Status: example.PodStatus{}}
var anotherObj = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "bar", ResourceVersion: "2"}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
var failingObj = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "object-fail", ResourceVersion: "2"}, Spec: example.PodSpec{}, Status: example.PodStatus{}}
var now = time.Now()
var exampleObj = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1", CreationTimestamp: metav1.Time{}, GenerateName: "foo"}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: now}}}
var exampleObjNoRV = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "", CreationTimestamp: metav1.Time{}, GenerateName: "foo"}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: now}}}
var anotherObj = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "bar", ResourceVersion: "2", GenerateName: "foo"}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: now}}}
var failingObj = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "object-fail", ResourceVersion: "2", GenerateName: "object-fail"}, Spec: example.PodSpec{}, Status: example.PodStatus{}}
var exampleList = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{}, Items: []example.Pod{*exampleObj}}
var anotherList = &example.PodList{Items: []example.Pod{*anotherObj}}
@@ -142,7 +143,7 @@ func TestMode1_CreateOnUnifiedStorage(t *testing.T) {
}
dw := NewDualWriter(Mode1, ls, us, p, kind)
err := dw.(*DualWriterMode1).createOnUnifiedStorage(ctx, tt.input, func(context.Context, runtime.Object) error { return nil }, tt.input, &metav1.CreateOptions{})
err := dw.(*DualWriterMode1).createOnUnifiedStorage(ctx, func(context.Context, runtime.Object) error { return nil }, tt.input, &metav1.CreateOptions{})
assert.NoError(t, err)
})
}
@@ -609,7 +610,6 @@ func TestMode1_Update(t *testing.T) {
type testCase struct {
setupLegacyFn func(m *mock.Mock, input string)
setupStorageFn func(m *mock.Mock, input string)
setupGetFn func(m *mock.Mock, input string)
name string
input string
wantErr bool
@@ -625,9 +625,6 @@ func TestMode1_Update(t *testing.T) {
setupStorageFn: func(m *mock.Mock, input string) {
m.On("Update", mock.Anything, input, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(anotherObj, false, nil)
},
setupGetFn: func(m *mock.Mock, input string) {
m.On("Get", mock.Anything, input, mock.Anything).Return(exampleObj, nil)
},
},
{
name: "error updating an object in legacy",
@@ -635,9 +632,6 @@ func TestMode1_Update(t *testing.T) {
setupLegacyFn: func(m *mock.Mock, input string) {
m.On("Update", mock.Anything, input, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, false, errors.New("error"))
},
setupGetFn: func(m *mock.Mock, input string) {
m.On("Get", mock.Anything, input, mock.Anything).Return(exampleObj, nil)
},
setupStorageFn: func(m *mock.Mock, input string) {
m.On("Update", mock.Anything, input, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(anotherObj, false, nil)
},
@@ -661,10 +655,6 @@ func TestMode1_Update(t *testing.T) {
tt.setupStorageFn(m, tt.input)
}
if tt.setupGetFn != nil {
tt.setupGetFn(m, tt.input)
}
dw := NewDualWriter(Mode1, ls, us, p, kind)
obj, _, err := dw.Update(context.Background(), tt.input, updatedObjInfoObj{}, func(ctx context.Context, obj runtime.Object) error { return nil }, func(ctx context.Context, obj, old runtime.Object) error { return nil }, false, &metav1.UpdateOptions{})

View File

@@ -2,8 +2,10 @@ package rest
import (
"context"
"fmt"
"time"
"github.com/grafana/grafana/pkg/apimachinery/utils"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
@@ -11,8 +13,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/klog/v2"
"github.com/grafana/grafana/pkg/apimachinery/utils"
)
type DualWriterMode2 struct {
@@ -43,39 +43,54 @@ func (d *DualWriterMode2) Mode() DualWriterMode {
}
// Create overrides the behavior of the generic DualWriter and writes to LegacyStorage and Storage.
func (d *DualWriterMode2) Create(ctx context.Context, original runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
func (d *DualWriterMode2) Create(ctx context.Context, in runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
var method = "create"
log := d.Log.WithValues("method", method)
ctx = klog.NewContext(ctx, log)
accIn, err := meta.Accessor(in)
if err != nil {
return nil, err
}
if accIn.GetUID() != "" {
return nil, fmt.Errorf("UID should be empty: %v", accIn.GetUID())
}
startLegacy := time.Now()
created, err := d.Legacy.Create(ctx, original, createValidation, options)
createdFromLegacy, err := d.Legacy.Create(ctx, in, createValidation, options)
if err != nil {
log.Error(err, "unable to create object in legacy storage")
d.recordLegacyDuration(true, mode2Str, d.resource, method, startLegacy)
return created, err
return createdFromLegacy, err
}
d.recordLegacyDuration(false, mode2Str, d.resource, method, startLegacy)
if err := enrichLegacyObject(original, created); err != nil {
return created, err
createdCopy := createdFromLegacy.DeepCopyObject()
accCreated, err := meta.Accessor(createdCopy)
if err != nil {
return createdFromLegacy, err
}
accCreated.SetResourceVersion("")
startStorage := time.Now()
rsp, err := d.Storage.Create(ctx, created, createValidation, options)
createdFromStorage, err := d.Storage.Create(ctx, createdCopy, createValidation, options)
if err != nil {
log.WithValues("name").Error(err, "unable to create object in storage")
d.recordStorageDuration(true, mode2Str, d.resource, method, startStorage)
return rsp, err
return createdFromStorage, err
}
d.recordStorageDuration(false, mode2Str, d.resource, method, startStorage)
areEqual := Compare(rsp, created)
d.recordOutcome(mode2Str, getName(rsp), areEqual, method)
areEqual := Compare(createdFromStorage, createdFromLegacy)
d.recordOutcome(mode2Str, getName(createdFromStorage), areEqual, method)
if !areEqual {
log.Info("object from legacy and storage are not equal")
}
return rsp, err
return createdFromLegacy, err
}
// It retrieves an object from Storage if possible, and if not it falls back to LegacyStorage.
@@ -111,11 +126,13 @@ func (d *DualWriterMode2) Get(ctx context.Context, name string, options *metav1.
log.Info("object from legacy and storage are not equal")
}
// if there is no object in storage, we return the object from legacy
if objStorage == nil {
return objLegacy, nil
if objStorage != nil {
if err := updateRVOnLegacyObj(objStorage, objLegacy); err != nil {
log.WithValues("storageObject", objStorage, "legacyObject", objLegacy).Error(err, "could not update resource version")
}
}
return objStorage, err
return objLegacy, err
}
// List overrides the behavior of the generic DualWriter.
@@ -189,15 +206,15 @@ func (d *DualWriterMode2) DeleteCollection(ctx context.Context, deleteValidation
ctx = klog.NewContext(ctx, log)
startLegacy := time.Now()
deleted, err := d.Legacy.DeleteCollection(ctx, deleteValidation, options, listOptions)
deletedLegacy, err := d.Legacy.DeleteCollection(ctx, deleteValidation, options, listOptions)
if err != nil {
log.WithValues("deleted", deleted).Error(err, "failed to delete collection successfully from legacy storage")
log.WithValues("deleted", deletedLegacy).Error(err, "failed to delete collection successfully from legacy storage")
d.recordLegacyDuration(true, mode2Str, d.resource, method, startLegacy)
return deleted, err
return deletedLegacy, err
}
d.recordLegacyDuration(false, mode2Str, d.resource, method, startLegacy)
legacyList, err := meta.ExtractList(deleted)
legacyList, err := meta.ExtractList(deletedLegacy)
if err != nil {
log.Error(err, "unable to extract list from legacy storage")
return nil, err
@@ -210,21 +227,21 @@ func (d *DualWriterMode2) DeleteCollection(ctx context.Context, deleteValidation
}
startStorage := time.Now()
res, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions)
deletedStorage, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions)
if err != nil {
log.WithValues("deleted", res).Error(err, "failed to delete collection successfully from Storage")
log.WithValues("deleted", deletedStorage).Error(err, "failed to delete collection successfully from Storage")
d.recordStorageDuration(true, mode2Str, d.resource, method, startStorage)
return res, err
return deletedStorage, err
}
d.recordStorageDuration(false, mode2Str, d.resource, method, startStorage)
areEqual := Compare(res, deleted)
d.recordOutcome(mode2Str, getName(res), areEqual, method)
areEqual := Compare(deletedStorage, deletedLegacy)
d.recordOutcome(mode2Str, getName(deletedStorage), areEqual, method)
if !areEqual {
log.Info("object from legacy and storage are not equal")
}
return res, err
return deletedLegacy, err
}
func (d *DualWriterMode2) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
@@ -232,6 +249,17 @@ func (d *DualWriterMode2) Delete(ctx context.Context, name string, deleteValidat
log := d.Log.WithValues("name", name, "method", method)
ctx = klog.NewContext(ctx, log)
startStorage := time.Now()
deletedS, async, err := d.Storage.Delete(ctx, name, deleteValidation, options)
if err != nil {
if !apierrors.IsNotFound(err) {
log.WithValues("objectList", deletedS).Error(err, "could not delete from duplicate storage")
d.recordStorageDuration(true, mode2Str, d.resource, method, startStorage)
}
return deletedS, async, err
}
d.recordStorageDuration(false, mode2Str, d.resource, method, startStorage)
startLegacy := time.Now()
deletedLS, async, err := d.Legacy.Delete(ctx, name, deleteValidation, options)
@@ -244,24 +272,13 @@ func (d *DualWriterMode2) Delete(ctx context.Context, name string, deleteValidat
}
d.recordLegacyDuration(false, mode2Str, d.resource, method, startLegacy)
startStorage := time.Now()
deletedS, _, err := d.Storage.Delete(ctx, name, deleteValidation, options)
if err != nil {
if !apierrors.IsNotFound(err) {
log.WithValues("objectList", deletedS).Error(err, "could not delete from duplicate storage")
d.recordStorageDuration(true, mode2Str, d.resource, method, startStorage)
}
return deletedS, async, err
}
d.recordStorageDuration(false, mode2Str, d.resource, method, startStorage)
areEqual := Compare(deletedS, deletedLS)
d.recordOutcome(mode2Str, name, areEqual, method)
if !areEqual {
log.WithValues("name", name).Info("object from legacy and storage are not equal")
}
return deletedS, async, err
return deletedLS, async, err
}
// Update overrides the generic behavior of the Storage and writes first to the legacy storage and then to storage.
@@ -270,57 +287,50 @@ func (d *DualWriterMode2) Update(ctx context.Context, name string, objInfo rest.
log := d.Log.WithValues("name", name, "method", method)
ctx = klog.NewContext(ctx, log)
// get foundObj and (updated) object so they can be stored in legacy store
foundObj, err := d.Storage.Get(ctx, name, &metav1.GetOptions{})
if err != nil {
if !apierrors.IsNotFound(err) {
log.WithValues("object", foundObj).Error(err, "could not get object to update")
return nil, false, err
}
log.Info("object not found for update, creating one")
}
startLegacy := time.Now()
obj, created, err := d.Legacy.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
objFromLegacy, created, err := d.Legacy.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
if err != nil {
log.WithValues("object", obj).Error(err, "could not update in legacy storage")
log.WithValues("object", objFromLegacy).Error(err, "could not update in legacy storage")
d.recordLegacyDuration(true, mode2Str, d.resource, "update", startLegacy)
return obj, created, err
return objFromLegacy, created, err
}
d.recordLegacyDuration(false, mode2Str, d.resource, "update", startLegacy)
// if the object is found, create a new updateWrapper with the object found
if foundObj != nil {
err = enrichLegacyObject(foundObj, obj)
if err != nil {
return obj, false, err
}
} else {
acc, err := meta.Accessor(obj)
if err != nil {
return obj, false, err
}
acc.SetResourceVersion("")
acc.SetUID("")
forceAllowCreate = true
}
startStorage := time.Now()
res, created, err := d.Storage.Update(ctx, name, &updateWrapper{
updated: obj, // use the objected returned from legacy
}, createValidation, updateValidation, forceAllowCreate, options)
objFromStorage, created, err := d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
if err != nil {
log.WithValues("object", res).Error(err, "could not update in storage")
log.WithValues("object", objFromStorage).Error(err, "could not update in storage")
d.recordStorageDuration(true, mode2Str, d.resource, "update", startStorage)
return res, created, err
return objFromStorage, created, err
}
areEqual := Compare(res, obj)
areEqual := Compare(objFromStorage, objFromLegacy)
d.recordOutcome(mode2Str, name, areEqual, method)
if !areEqual {
log.WithValues("name", name).Info("object from legacy and storage are not equal")
}
return res, created, err
if objFromStorage != nil {
if err := updateRVOnLegacyObj(objFromStorage, objFromLegacy); err != nil {
log.WithValues("storageObject", objFromStorage, "legacyObject", objFromLegacy).Error(err, "could not update resource version")
}
}
return objFromLegacy, created, err
}
func updateRVOnLegacyObj(storageObj runtime.Object, legacyObj runtime.Object) error {
storageAccessor, err := utils.MetaAccessor(storageObj)
if err != nil {
return err
}
legacyAccessor, err := utils.MetaAccessor(legacyObj)
if err != nil {
return err
}
legacyAccessor.SetResourceVersion(storageAccessor.GetResourceVersion())
return nil
}
func (d *DualWriterMode2) Destroy() {
@@ -360,30 +370,3 @@ func parseList(legacyList []runtime.Object) (map[string]int, error) {
}
return indexMap, nil
}
func enrichLegacyObject(originalObj, returnedObj runtime.Object) error {
accessorReturned, err := meta.Accessor(returnedObj)
if err != nil {
return err
}
accessorOriginal, err := meta.Accessor(originalObj)
if err != nil {
return err
}
accessorReturned.SetLabels(accessorOriginal.GetLabels())
ac := accessorReturned.GetAnnotations()
if ac == nil {
ac = map[string]string{}
}
for k, v := range accessorOriginal.GetAnnotations() {
ac[k] = v
}
accessorReturned.SetAnnotations(ac)
accessorReturned.SetResourceVersion(accessorOriginal.GetResourceVersion())
accessorReturned.SetUID(accessorOriginal.GetUID())
return nil
}

View File

@@ -8,13 +8,10 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/apis/example"
)
var createFn = func(context.Context, runtime.Object) error { return nil }
@@ -35,10 +32,10 @@ func TestMode2_Create(t *testing.T) {
name: "creating an object in both the LegacyStorage and Storage",
input: exampleObj,
setupLegacyFn: func(m *mock.Mock, input runtime.Object) {
m.On("Create", mock.Anything, input, mock.Anything, mock.Anything).Return(exampleObj, nil)
m.On("Create", mock.Anything, exampleObjNoRV, mock.Anything, mock.Anything).Return(exampleObj, nil)
},
setupStorageFn: func(m *mock.Mock, input runtime.Object) {
m.On("Create", mock.Anything, input, mock.Anything, mock.Anything).Return(exampleObj, nil)
m.On("Create", mock.Anything, exampleObj, mock.Anything, mock.Anything).Return(exampleObj, nil)
},
},
{
@@ -77,9 +74,6 @@ func TestMode2_Create(t *testing.T) {
}
assert.Equal(t, exampleObj, obj)
accessor, err := meta.Accessor(obj)
assert.NoError(t, err)
assert.Equal(t, accessor.GetResourceVersion(), "1")
})
}
}
@@ -379,7 +373,6 @@ func TestMode2_Update(t *testing.T) {
expectedObj runtime.Object
setupLegacyFn func(m *mock.Mock, input string)
setupStorageFn func(m *mock.Mock, input string)
setupGetFn func(m *mock.Mock, input string)
name string
input string
wantErr bool
@@ -395,56 +388,14 @@ func TestMode2_Update(t *testing.T) {
setupStorageFn: func(m *mock.Mock, input string) {
m.On("Update", mock.Anything, input, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil)
},
setupGetFn: func(m *mock.Mock, input string) {
m.On("Get", mock.Anything, input, mock.Anything).Return(exampleObj, nil)
},
expectedObj: exampleObj,
},
{
name: "object is not found in storage",
input: "not-found",
setupLegacyFn: func(m *mock.Mock, input string) {
m.On("Update", mock.Anything, input, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil)
},
setupStorageFn: func(m *mock.Mock, input string) {
m.On("Update", mock.Anything, input, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil)
},
setupGetFn: func(m *mock.Mock, input string) {
m.On("Get", mock.Anything, input, mock.Anything).Return(nil, apierrors.NewNotFound(schema.GroupResource{Group: "", Resource: "pods"}, "not found"))
},
expectedObj: exampleObj,
},
{
name: "error finding object storage",
input: "object-fail",
setupGetFn: func(m *mock.Mock, input string) {
m.On("Get", mock.Anything, input, mock.Anything).Return(nil, errors.New("error"))
},
wantErr: true,
},
{
name: "error updating legacy store",
input: "object-fail",
setupLegacyFn: func(m *mock.Mock, input string) {
m.On("Update", mock.Anything, input, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, false, errors.New("error"))
},
setupGetFn: func(m *mock.Mock, input string) {
m.On("Get", mock.Anything, input, mock.Anything).Return(exampleObjDifferentRV, nil)
},
wantErr: true,
},
{
name: "error updating storage with not found object",
input: "not-found",
setupLegacyFn: func(m *mock.Mock, input string) {
m.On("Update", mock.Anything, input, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil)
},
setupStorageFn: func(m *mock.Mock, input string) {
m.On("Update", mock.Anything, input, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, false, errors.New("error"))
},
setupGetFn: func(m *mock.Mock, input string) {
m.On("Get", mock.Anything, input, mock.Anything).Return(nil, errors.New(""))
},
wantErr: true,
},
}
@@ -458,10 +409,6 @@ func TestMode2_Update(t *testing.T) {
ls := legacyStoreMock{m, l}
us := storageMock{m, s}
if tt.setupGetFn != nil {
tt.setupGetFn(m, tt.input)
}
if tt.setupLegacyFn != nil {
tt.setupLegacyFn(m, tt.input)
}
@@ -483,127 +430,3 @@ func TestMode2_Update(t *testing.T) {
})
}
}
func TestEnrichReturnedObject(t *testing.T) {
testCase := []struct {
inputOriginal runtime.Object
inputReturned runtime.Object
expectedObject runtime.Object
name string
isCreated bool
wantErr bool
}{
{
name: "original object does not have labels and annotations",
inputOriginal: &example.Pod{
TypeMeta: metav1.TypeMeta{Kind: "foo"},
ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1", UID: types.UID("5")},
Spec: example.PodSpec{}, Status: example.PodStatus{},
},
inputReturned: &example.Pod{
TypeMeta: metav1.TypeMeta{Kind: "foo"},
ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "2", UID: types.UID("6"), Labels: map[string]string{"label1": "1"}, Annotations: map[string]string{"annotation1": "1"}},
Spec: example.PodSpec{}, Status: example.PodStatus{},
},
expectedObject: &example.Pod{
TypeMeta: metav1.TypeMeta{Kind: "foo"},
ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1", UID: types.UID("5")},
Spec: example.PodSpec{}, Status: example.PodStatus{},
},
},
{
name: "returned object does not have labels and annotations",
inputOriginal: &example.Pod{
TypeMeta: metav1.TypeMeta{Kind: "foo"},
ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1", UID: types.UID("5"), Labels: map[string]string{"label1": "1"}, Annotations: map[string]string{"annotation1": "1"}},
Spec: example.PodSpec{}, Status: example.PodStatus{},
},
inputReturned: &example.Pod{
TypeMeta: metav1.TypeMeta{Kind: "foo"},
ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "2", UID: types.UID("6")},
Spec: example.PodSpec{}, Status: example.PodStatus{},
},
expectedObject: &example.Pod{
TypeMeta: metav1.TypeMeta{Kind: "foo"},
ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1", UID: types.UID("5"), Labels: map[string]string{"label1": "1"}, Annotations: map[string]string{"annotation1": "1"}},
Spec: example.PodSpec{}, Status: example.PodStatus{},
},
},
{
name: "both objects have labels and annotations",
inputOriginal: &example.Pod{
TypeMeta: metav1.TypeMeta{Kind: "foo"},
ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1", UID: types.UID("5"), Labels: map[string]string{"label1": "1"}, Annotations: map[string]string{"annotation1": "1"}},
Spec: example.PodSpec{}, Status: example.PodStatus{},
},
inputReturned: &example.Pod{
TypeMeta: metav1.TypeMeta{Kind: "foo"},
ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "2", UID: types.UID("6"), Labels: map[string]string{"label2": "2"}, Annotations: map[string]string{"annotation2": "2"}},
Spec: example.PodSpec{}, Status: example.PodStatus{},
},
expectedObject: &example.Pod{
TypeMeta: metav1.TypeMeta{Kind: "foo"},
ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1", UID: types.UID("5"), Labels: map[string]string{"label1": "1"}, Annotations: map[string]string{"annotation1": "1"}},
Spec: example.PodSpec{}, Status: example.PodStatus{},
},
},
{
name: "both objects have labels and annotations with duplicated keys",
inputOriginal: &example.Pod{
TypeMeta: metav1.TypeMeta{Kind: "foo"},
ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1", UID: types.UID("5"), Labels: map[string]string{"label1": "1"}, Annotations: map[string]string{"annotation1": "1"}},
Spec: example.PodSpec{}, Status: example.PodStatus{},
},
inputReturned: &example.Pod{
TypeMeta: metav1.TypeMeta{Kind: "foo"},
ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "2", UID: types.UID("6"), Labels: map[string]string{"label1": "11"}, Annotations: map[string]string{"annotation1": "11"}},
Spec: example.PodSpec{}, Status: example.PodStatus{},
},
expectedObject: &example.Pod{
TypeMeta: metav1.TypeMeta{Kind: "foo"},
ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1", UID: types.UID("5"), Labels: map[string]string{"label1": "1"}, Annotations: map[string]string{"annotation1": "1"}},
Spec: example.PodSpec{}, Status: example.PodStatus{},
},
},
{
name: "original object does not exist",
inputOriginal: nil,
inputReturned: &example.Pod{},
expectedObject: nil,
wantErr: true,
},
{
name: "returned object does not exist",
inputOriginal: &example.Pod{},
inputReturned: nil,
expectedObject: nil,
wantErr: true,
},
}
for _, tt := range testCase {
t.Run(tt.name, func(t *testing.T) {
err := enrichLegacyObject(tt.inputOriginal, tt.inputReturned)
if tt.wantErr {
assert.Error(t, err)
return
}
accessorReturned, err := meta.Accessor(tt.inputReturned)
assert.NoError(t, err)
accessorExpected, err := meta.Accessor(tt.expectedObject)
assert.NoError(t, err)
assert.Equal(t, accessorExpected.GetLabels(), accessorReturned.GetLabels())
returnedAnnotations := accessorReturned.GetAnnotations()
expectedAnnotations := accessorExpected.GetAnnotations()
for k, v := range expectedAnnotations {
assert.Equal(t, v, returnedAnnotations[k])
}
assert.Equal(t, accessorExpected.GetResourceVersion(), accessorReturned.GetResourceVersion())
assert.Equal(t, accessorExpected.GetUID(), accessorReturned.GetUID())
})
}
}

View File

@@ -3,8 +3,10 @@ package rest
import (
"context"
"errors"
"fmt"
"time"
"k8s.io/apimachinery/pkg/api/meta"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@@ -42,30 +44,61 @@ func (d *DualWriterMode3) Mode() DualWriterMode {
const mode3Str = "3"
// Create overrides the behavior of the generic DualWriter and writes to LegacyStorage and Storage.
func (d *DualWriterMode3) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
func (d *DualWriterMode3) Create(ctx context.Context, in runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
var method = "create"
log := d.Log.WithValues("method", method)
ctx = klog.NewContext(ctx, log)
startStorage := time.Now()
created, err := d.Storage.Create(ctx, obj, createValidation, options)
accIn, err := meta.Accessor(in)
if err != nil {
log.Error(err, "unable to create object in storage")
d.recordLegacyDuration(true, mode3Str, d.resource, method, startStorage)
return created, err
return nil, err
}
d.recordStorageDuration(false, mode3Str, d.resource, method, startStorage)
go func() {
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("legacy create timeout"))
defer cancel()
if accIn.GetUID() != "" {
return nil, fmt.Errorf("UID should not be: %v", accIn.GetUID())
}
startLegacy := time.Now()
_, errObjectSt := d.Legacy.Create(ctx, obj, createValidation, options)
d.recordLegacyDuration(errObjectSt != nil, mode3Str, d.resource, method, startLegacy)
}()
if accIn.GetName() == "" && accIn.GetGenerateName() == "" {
return nil, fmt.Errorf("name or generatename have to be set")
}
return created, err
startStorage := time.Now()
storageObj, errObjectSt := d.Storage.Create(ctx, in, createValidation, options)
d.recordStorageDuration(errObjectSt != nil, mode3Str, d.resource, method, startStorage)
if errObjectSt != nil {
log.Error(err, "unable to create object in storage")
return storageObj, errObjectSt
}
createdCopy := storageObj.DeepCopyObject()
//nolint:errcheck
go d.createOnLegacyStorage(ctx, in, createdCopy, createValidation, options)
return storageObj, errObjectSt
}
func (d *DualWriterMode3) createOnLegacyStorage(ctx context.Context, in, storageObj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) error {
var method = "create"
log := d.Log.WithValues("method", method)
ctx, cancel := context.WithTimeoutCause(context.WithoutCancel(ctx), time.Second*10, errors.New("legacy create timeout"))
defer cancel()
startLegacy := time.Now()
legacyObj, err := d.Legacy.Create(ctx, storageObj, createValidation, options)
d.recordLegacyDuration(err != nil, mode3Str, d.resource, method, startLegacy)
if err != nil {
log.Error(err, "unable to create object in legacy storage")
cancel()
}
areEqual := Compare(legacyObj, storageObj)
d.recordOutcome(mode3Str, getName(storageObj), areEqual, method)
if !areEqual {
log.Info("object from legacy and storage are not equal")
}
return err
}
// Get overrides the behavior of the generic DualWriter and retrieves an object from Storage.
@@ -75,13 +108,40 @@ func (d *DualWriterMode3) Get(ctx context.Context, name string, options *metav1.
ctx = klog.NewContext(ctx, log)
startStorage := time.Now()
res, err := d.Storage.Get(ctx, name, options)
storageObj, err := d.Storage.Get(ctx, name, options)
d.recordStorageDuration(err != nil, mode3Str, d.resource, method, startStorage)
if err != nil {
log.Error(err, "unable to get object in storage")
}
d.recordStorageDuration(err != nil, mode3Str, d.resource, method, startStorage)
return res, err
//nolint:errcheck
go d.getFromLegacyStorage(ctx, storageObj, name, options)
return storageObj, err
}
func (d *DualWriterMode3) getFromLegacyStorage(ctx context.Context, storageObj runtime.Object, name string, options *metav1.GetOptions) error {
var method = "get"
log := d.Log.WithValues("method", method, "name", name)
startLegacy := time.Now()
// Ignores cancellation signals from parent context. Will automatically be canceled after 10 seconds.
ctx, cancel := context.WithTimeoutCause(context.WithoutCancel(ctx), time.Second*10, errors.New("legacy get timeout"))
defer cancel()
objFromLegacy, err := d.Legacy.Get(ctx, name, options)
d.recordLegacyDuration(err != nil, mode3Str, d.resource, method, startLegacy)
if err != nil {
log.Error(err, "unable to get object in legacy storage")
cancel()
}
areEqual := Compare(storageObj, objFromLegacy)
d.recordOutcome(mode3Str, name, areEqual, method)
if !areEqual {
log.WithValues("name", name).Info("object from legacy and storage are not equal")
}
return err
}
// List overrides the behavior of the generic DualWriter and reads only from Unified Store.
@@ -91,13 +151,40 @@ func (d *DualWriterMode3) List(ctx context.Context, options *metainternalversion
ctx = klog.NewContext(ctx, log)
startStorage := time.Now()
res, err := d.Storage.List(ctx, options)
objFromStorage, err := d.Storage.List(ctx, options)
d.recordStorageDuration(err != nil, mode3Str, d.resource, method, startStorage)
if err != nil {
log.Error(err, "unable to list object in storage")
}
d.recordStorageDuration(err != nil, mode3Str, d.resource, method, startStorage)
return res, err
//nolint:errcheck
go d.listFromLegacyStorage(ctx, options, objFromStorage)
return objFromStorage, err
}
func (d *DualWriterMode3) listFromLegacyStorage(ctx context.Context, options *metainternalversion.ListOptions, objFromStorage runtime.Object) error {
var method = "list"
log := d.Log.WithValues("resourceVersion", options.ResourceVersion, "method", method)
startLegacy := time.Now()
ctx, cancel := context.WithTimeoutCause(context.WithoutCancel(ctx), time.Second*10, errors.New("legacy list timeout"))
defer cancel()
objFromLegacy, err := d.Legacy.List(ctx, options)
d.recordLegacyDuration(err != nil, mode3Str, d.resource, method, startLegacy)
if err != nil {
log.Error(err, "unable to list object in legacy storage")
cancel()
}
areEqual := Compare(objFromStorage, objFromLegacy)
d.recordOutcome(mode3Str, getName(objFromStorage), areEqual, method)
if !areEqual {
log.WithValues("name", getName(objFromStorage)).Info("object from legacy and storage are not equal")
}
return err
}
func (d *DualWriterMode3) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
@@ -106,23 +193,40 @@ func (d *DualWriterMode3) Delete(ctx context.Context, name string, deleteValidat
ctx = klog.NewContext(ctx, d.Log)
startStorage := time.Now()
res, async, err := d.Storage.Delete(ctx, name, deleteValidation, options)
objFromStorage, async, err := d.Storage.Delete(ctx, name, deleteValidation, options)
d.recordStorageDuration(err != nil, mode3Str, name, method, startStorage)
if err != nil {
log.Error(err, "unable to delete object in storage")
d.recordStorageDuration(true, mode3Str, d.resource, method, startStorage)
return res, async, err
return objFromStorage, async, err
}
d.recordStorageDuration(false, mode3Str, name, method, startStorage)
go func() {
startLegacy := time.Now()
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("legacy delete timeout"))
defer cancel()
_, _, err := d.Legacy.Delete(ctx, name, deleteValidation, options)
d.recordLegacyDuration(err != nil, mode3Str, d.resource, method, startLegacy)
}()
//nolint:errcheck
go d.deleteFromLegacyStorage(ctx, objFromStorage, name, deleteValidation, options)
return res, async, err
return objFromStorage, async, err
}
func (d *DualWriterMode3) deleteFromLegacyStorage(ctx context.Context, objFromStorage runtime.Object, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) error {
var method = "delete"
log := d.Log.WithValues("name", name, "method", method, "name", name)
startLegacy := time.Now()
ctx, cancel := context.WithTimeoutCause(context.WithoutCancel(ctx), time.Second*10, errors.New("legacy delete timeout"))
defer cancel()
objFromLegacy, _, err := d.Legacy.Delete(ctx, name, deleteValidation, options)
d.recordLegacyDuration(err != nil, mode3Str, d.resource, method, startLegacy)
if err != nil {
log.Error(err, "unable to delete object in legacy storage")
cancel()
}
areEqual := Compare(objFromStorage, objFromLegacy)
d.recordOutcome(mode3Str, name, areEqual, method)
if !areEqual {
log.Info("object from legacy and storage are not equal")
}
return err
}
// Update overrides the behavior of the generic DualWriter and writes first to Storage and then to LegacyStorage.
@@ -132,24 +236,40 @@ func (d *DualWriterMode3) Update(ctx context.Context, name string, objInfo rest.
ctx = klog.NewContext(ctx, log)
startStorage := time.Now()
res, async, err := d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
objFromStorage, async, err := d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
d.recordStorageDuration(err != nil, mode3Str, d.resource, method, startStorage)
if err != nil {
log.Error(err, "unable to update in storage")
d.recordLegacyDuration(true, mode3Str, d.resource, method, startStorage)
return res, async, err
return objFromStorage, async, err
}
d.recordStorageDuration(false, mode3Str, d.resource, method, startStorage)
go func() {
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("legacy update timeout"))
//nolint:errcheck
go d.updateOnLegacyStorage(ctx, objFromStorage, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
startLegacy := time.Now()
defer cancel()
_, _, errObjectSt := d.Legacy.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
d.recordLegacyDuration(errObjectSt != nil, mode3Str, d.resource, method, startLegacy)
}()
return objFromStorage, async, err
}
return res, async, err
func (d *DualWriterMode3) updateOnLegacyStorage(ctx context.Context, storageObj runtime.Object, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) error {
var method = "update"
log := d.Log.WithValues("name", name, "method", method, "name", name)
ctx, cancel := context.WithTimeoutCause(context.WithoutCancel(ctx), time.Second*10, errors.New("legacy update timeout"))
startLegacy := time.Now()
defer cancel()
objLegacy, _, err := d.Legacy.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
d.recordLegacyDuration(err != nil, mode3Str, d.resource, method, startLegacy)
if err != nil {
log.Error(err, "unable to update object in legacy storage")
cancel()
}
areEqual := Compare(storageObj, objLegacy)
d.recordOutcome(mode3Str, name, areEqual, method)
if !areEqual {
log.WithValues("name", name).Info("object from legacy and storage are not equal")
}
return err
}
// DeleteCollection overrides the behavior of the generic DualWriter and deletes from both LegacyStorage and Storage.
@@ -159,23 +279,41 @@ func (d *DualWriterMode3) DeleteCollection(ctx context.Context, deleteValidation
ctx = klog.NewContext(ctx, log)
startStorage := time.Now()
res, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions)
storageObj, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions)
d.recordStorageDuration(err != nil, mode3Str, d.resource, method, startStorage)
if err != nil {
log.Error(err, "unable to delete collection in storage")
d.recordStorageDuration(true, mode3Str, d.resource, method, startStorage)
return res, err
return storageObj, err
}
d.recordStorageDuration(false, mode3Str, d.resource, method, startStorage)
go func() {
startLegacy := time.Now()
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("legacy deletecollection timeout"))
defer cancel()
_, err := d.Legacy.DeleteCollection(ctx, deleteValidation, options, listOptions)
d.recordStorageDuration(err != nil, mode3Str, d.resource, method, startLegacy)
}()
//nolint:errcheck
go d.deleteCollectionFromLegacyStorage(ctx, storageObj, deleteValidation, options, listOptions)
return res, err
return storageObj, err
}
func (d *DualWriterMode3) deleteCollectionFromLegacyStorage(ctx context.Context, storageObj runtime.Object, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) error {
var method = "delete-collection"
log := d.Log.WithValues("resourceVersion", listOptions.ResourceVersion, "method", method)
startLegacy := time.Now()
ctx, cancel := context.WithTimeoutCause(context.WithoutCancel(ctx), time.Second*10, errors.New("legacy deletecollection timeout"))
defer cancel()
legacyObj, err := d.Legacy.DeleteCollection(ctx, deleteValidation, options, listOptions)
d.recordLegacyDuration(err != nil, mode3Str, d.resource, method, startLegacy)
if err != nil {
log.Error(err, "unable to delete collection in legacy storage")
cancel()
}
areEqual := Compare(storageObj, legacyObj)
d.recordOutcome(mode3Str, getName(legacyObj), areEqual, method)
if !areEqual {
log.Info("object from legacy and storage are not equal")
}
return err
}
func (d *DualWriterMode3) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {

View File

@@ -8,7 +8,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"k8s.io/apimachinery/pkg/api/meta"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@@ -17,8 +16,7 @@ import (
func TestMode3_Create(t *testing.T) {
type testCase struct {
input runtime.Object
setupLegacyFn func(m *mock.Mock, input runtime.Object)
setupStorageFn func(m *mock.Mock)
setupStorageFn func(m *mock.Mock, input runtime.Object)
name string
wantErr bool
}
@@ -27,17 +25,14 @@ func TestMode3_Create(t *testing.T) {
{
name: "creating an object only in the unified store",
input: exampleObj,
setupLegacyFn: func(m *mock.Mock, input runtime.Object) {
m.On("Create", mock.Anything, input, mock.Anything, mock.Anything).Return(exampleObj, nil)
},
setupStorageFn: func(m *mock.Mock) {
setupStorageFn: func(m *mock.Mock, input runtime.Object) {
m.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, nil)
},
},
{
name: "error when creating object in the unified store fails",
input: failingObj,
setupLegacyFn: func(m *mock.Mock, input runtime.Object) {
setupStorageFn: func(m *mock.Mock, input runtime.Object) {
m.On("Create", mock.Anything, failingObj, mock.Anything, mock.Anything).Return(nil, errors.New("error"))
},
wantErr: true,
@@ -53,11 +48,8 @@ func TestMode3_Create(t *testing.T) {
ls := legacyStoreMock{m, l}
us := storageMock{m, s}
if tt.setupLegacyFn != nil {
tt.setupLegacyFn(m, tt.input)
}
if tt.setupStorageFn != nil {
tt.setupStorageFn(m)
tt.setupStorageFn(m, tt.input)
}
dw := NewDualWriter(Mode3, ls, us, p, kind)
@@ -69,14 +61,66 @@ func TestMode3_Create(t *testing.T) {
return
}
acc, err := meta.Accessor(obj)
assert.NoError(t, err)
assert.Equal(t, acc.GetResourceVersion(), "")
assert.NotEqual(t, obj, anotherObj)
})
}
}
func TestMode1_CreateOnLegacyStorage(t *testing.T) {
ctxCanceled, cancel := context.WithCancel(context.TODO())
cancel()
type testCase struct {
name string
input runtime.Object
ctx *context.Context
setupLegacyFn func(m *mock.Mock)
}
tests :=
[]testCase{
{
name: "Create on legacy storage",
input: exampleObj,
setupLegacyFn: func(m *mock.Mock) {
m.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObjNoRV, nil)
},
},
{
name: "Create on legacy storage works even if parent context is canceled",
input: exampleObj,
ctx: &ctxCanceled,
setupLegacyFn: func(m *mock.Mock) {
m.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObjNoRV, nil)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := (LegacyStorage)(nil)
s := (Storage)(nil)
m := &mock.Mock{}
ls := legacyStoreMock{m, l}
us := storageMock{m, s}
if tt.setupLegacyFn != nil {
tt.setupLegacyFn(m)
}
ctx := context.TODO()
if tt.ctx != nil {
ctx = *tt.ctx
}
dw := NewDualWriter(Mode3, ls, us, p, kind)
err := dw.(*DualWriterMode3).createOnLegacyStorage(ctx, tt.input, exampleObj, func(context.Context, runtime.Object) error { return nil }, &metav1.CreateOptions{})
assert.NoError(t, err)
})
}
}
func TestMode3_Get(t *testing.T) {
type testCase struct {
setupStorageFn func(m *mock.Mock, name string)
@@ -132,6 +176,60 @@ func TestMode3_Get(t *testing.T) {
}
}
func TestMode1_GetFromLegacyStorage(t *testing.T) {
ctxCanceled, cancel := context.WithCancel(context.TODO())
cancel()
type testCase struct {
setupLegacyFn func(m *mock.Mock, name string)
ctx *context.Context
name string
input string
}
tests :=
[]testCase{
{
name: "Get from legacy storage",
input: "foo",
setupLegacyFn: func(m *mock.Mock, name string) {
m.On("Get", mock.Anything, name, mock.Anything).Return(exampleObj, nil)
},
},
{
name: "Get from legacy storage works even if parent context is canceled",
input: "foo",
ctx: &ctxCanceled,
setupLegacyFn: func(m *mock.Mock, name string) {
m.On("Get", mock.Anything, name, mock.Anything).Return(exampleObj, nil)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := (LegacyStorage)(nil)
s := (Storage)(nil)
m := &mock.Mock{}
ls := legacyStoreMock{m, l}
us := storageMock{m, s}
if tt.setupLegacyFn != nil {
tt.setupLegacyFn(m, tt.input)
}
ctx := context.TODO()
if tt.ctx != nil {
ctx = *tt.ctx
}
dw := NewDualWriter(Mode3, ls, us, p, kind)
err := dw.(*DualWriterMode3).getFromLegacyStorage(ctx, exampleObj, tt.input, &metav1.GetOptions{})
assert.NoError(t, err)
})
}
}
func TestMode3_List(t *testing.T) {
type testCase struct {
setupStorageFn func(m *mock.Mock, options *metainternalversion.ListOptions)
@@ -186,6 +284,58 @@ func TestMode3_List(t *testing.T) {
}
}
func TestMode1_ListFromLegacyStorage(t *testing.T) {
ctxCanceled, cancel := context.WithCancel(context.TODO())
cancel()
type testCase struct {
ctx *context.Context
name string
setupLegacyFn func(m *mock.Mock)
}
tests :=
[]testCase{
{
name: "list from legacy storage",
setupLegacyFn: func(m *mock.Mock) {
m.On("List", mock.Anything, mock.Anything).Return(anotherList, nil)
},
},
{
name: "list from legacy storage works even if parent context is canceled",
ctx: &ctxCanceled,
setupLegacyFn: func(m *mock.Mock) {
m.On("List", mock.Anything, mock.Anything).Return(anotherList, nil)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := (LegacyStorage)(nil)
s := (Storage)(nil)
m := &mock.Mock{}
ls := legacyStoreMock{m, l}
us := storageMock{m, s}
if tt.setupLegacyFn != nil {
tt.setupLegacyFn(m)
}
ctx := context.TODO()
if tt.ctx != nil {
ctx = *tt.ctx
}
dw := NewDualWriter(Mode3, ls, us, p, kind)
err := dw.(*DualWriterMode3).listFromLegacyStorage(ctx, &metainternalversion.ListOptions{}, anotherList)
assert.NoError(t, err)
})
}
}
func TestMode3_Delete(t *testing.T) {
type testCase struct {
setupStorageFn func(m *mock.Mock, name string)
@@ -240,6 +390,59 @@ func TestMode3_Delete(t *testing.T) {
}
}
func TestMode1_DeleteFromLegacyStorage(t *testing.T) {
ctxCanceled, cancel := context.WithCancel(context.TODO())
cancel()
type testCase struct {
ctx *context.Context
setupLegacyFn func(m *mock.Mock, name string)
name string
input string
}
tests :=
[]testCase{
{
name: "Delete from legacy storage",
setupLegacyFn: func(m *mock.Mock, input string) {
m.On("Delete", mock.Anything, input, mock.Anything, mock.Anything).Return(exampleObj, false, nil)
},
},
{
name: "Delete from unified legacy works even if parent context is canceled",
ctx: &ctxCanceled,
setupLegacyFn: func(m *mock.Mock, input string) {
m.On("Delete", mock.Anything, input, mock.Anything, mock.Anything).Return(exampleObj, false, nil)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := (LegacyStorage)(nil)
s := (Storage)(nil)
m := &mock.Mock{}
ls := legacyStoreMock{m, l}
us := storageMock{m, s}
if tt.setupLegacyFn != nil {
tt.setupLegacyFn(m, tt.input)
}
ctx := context.TODO()
if tt.ctx != nil {
ctx = *tt.ctx
}
dw := NewDualWriter(Mode3, ls, us, p, kind)
err := dw.(*DualWriterMode3).deleteFromLegacyStorage(ctx, exampleObj, tt.input, func(ctx context.Context, obj runtime.Object) error { return nil }, &metav1.DeleteOptions{})
assert.NoError(t, err)
})
}
}
func TestMode3_DeleteCollection(t *testing.T) {
type testCase struct {
input *metav1.DeleteOptions
@@ -294,6 +497,61 @@ func TestMode3_DeleteCollection(t *testing.T) {
}
}
func TestMode1_DeleteCollectionFromLegacyStorage(t *testing.T) {
ctxCanceled, cancel := context.WithCancel(context.TODO())
cancel()
type testCase struct {
ctx *context.Context
setupLegacyFn func(m *mock.Mock)
name string
input *metav1.DeleteOptions
}
tests :=
[]testCase{
{
name: "Delete Collection from legacy storage",
input: &metav1.DeleteOptions{TypeMeta: metav1.TypeMeta{Kind: "foo"}},
setupLegacyFn: func(m *mock.Mock) {
m.On("DeleteCollection", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, nil)
},
},
{
name: "Delete Collection from legacy storage works even if parent context is canceled",
input: &metav1.DeleteOptions{TypeMeta: metav1.TypeMeta{Kind: "foo"}},
ctx: &ctxCanceled,
setupLegacyFn: func(m *mock.Mock) {
m.On("DeleteCollection", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, nil)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := (LegacyStorage)(nil)
s := (Storage)(nil)
m := &mock.Mock{}
ls := legacyStoreMock{m, l}
us := storageMock{m, s}
if tt.setupLegacyFn != nil {
tt.setupLegacyFn(m)
}
ctx := context.TODO()
if tt.ctx != nil {
ctx = *tt.ctx
}
dw := NewDualWriter(Mode3, ls, us, p, kind)
err := dw.(*DualWriterMode3).deleteCollectionFromLegacyStorage(ctx, exampleObj, func(ctx context.Context, obj runtime.Object) error { return nil }, tt.input, &metainternalversion.ListOptions{})
assert.NoError(t, err)
})
}
}
func TestMode3_Update(t *testing.T) {
type testCase struct {
setupLegacyFn func(m *mock.Mock, input string)
@@ -349,11 +607,76 @@ func TestMode3_Update(t *testing.T) {
return
}
assert.NoError(t, err)
assert.Equal(t, obj, exampleObj)
assert.NotEqual(t, obj, anotherObj)
acc, err := meta.Accessor(obj)
assert.NoError(t, err)
assert.Equal(t, acc.GetResourceVersion(), "")
})
}
}
func TestMode1_UpdateOnLegacyStorage(t *testing.T) {
ctxCanceled, cancel := context.WithCancel(context.TODO())
cancel()
type testCase struct {
ctx *context.Context
setupLegacyFn func(m *mock.Mock, input string)
setupGetFn func(m *mock.Mock, input string)
name string
input string
}
tests :=
[]testCase{
{
name: "Update on legacy storage",
input: "foo",
setupLegacyFn: func(m *mock.Mock, input string) {
m.On("Update", mock.Anything, input, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(anotherObj, false, nil)
},
setupGetFn: func(m *mock.Mock, input string) {
m.On("Get", mock.Anything, input, mock.Anything).Return(exampleObj, nil)
},
},
{
name: "Update on legacy storage works even if parent context is canceled",
ctx: &ctxCanceled,
input: "foo",
setupLegacyFn: func(m *mock.Mock, input string) {
m.On("Update", mock.Anything, input, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(anotherObj, false, nil)
},
setupGetFn: func(m *mock.Mock, input string) {
m.On("Get", mock.Anything, input, mock.Anything).Return(exampleObj, nil)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := (LegacyStorage)(nil)
s := (Storage)(nil)
m := &mock.Mock{}
ls := legacyStoreMock{m, l}
us := storageMock{m, s}
if tt.setupLegacyFn != nil {
tt.setupLegacyFn(m, tt.input)
}
if tt.setupGetFn != nil {
tt.setupGetFn(m, tt.input)
}
ctx := context.TODO()
if tt.ctx != nil {
ctx = *tt.ctx
}
dw := NewDualWriter(Mode3, ls, us, p, kind)
err := dw.(*DualWriterMode3).updateOnLegacyStorage(ctx, exampleObj, tt.input, updatedObjInfoObj{}, func(ctx context.Context, obj runtime.Object) error { return nil }, func(ctx context.Context, obj, old runtime.Object) error { return nil }, false, &metav1.UpdateOptions{})
assert.NoError(t, err)
})
}
}

View File

@@ -23,6 +23,12 @@ type storageMock struct {
}
func (m legacyStoreMock) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
select {
case <-ctx.Done():
return nil, errors.New("context canceled")
default:
}
args := m.Called(ctx, name, options)
if name == "object-fail" {
return nil, args.Error(1)
@@ -31,6 +37,12 @@ func (m legacyStoreMock) Get(ctx context.Context, name string, options *metav1.G
}
func (m legacyStoreMock) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
select {
case <-ctx.Done():
return nil, errors.New("context canceled")
default:
}
args := m.Called(ctx, obj, createValidation, options)
acc, err := meta.Accessor(obj)
if err != nil {
@@ -44,6 +56,12 @@ func (m legacyStoreMock) Create(ctx context.Context, obj runtime.Object, createV
}
func (m legacyStoreMock) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
select {
case <-ctx.Done():
return nil, errors.New("context canceled")
default:
}
args := m.Called(ctx, options)
if options.Kind == "fail" {
return nil, args.Error(1)
@@ -56,6 +74,11 @@ func (m legacyStoreMock) NewList() runtime.Object {
}
func (m legacyStoreMock) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
select {
case <-ctx.Done():
return nil, false, errors.New("context canceled")
default:
}
args := m.Called(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
if name == "object-fail" {
return nil, false, args.Error(2)
@@ -64,6 +87,12 @@ func (m legacyStoreMock) Update(ctx context.Context, name string, objInfo rest.U
}
func (m legacyStoreMock) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
select {
case <-ctx.Done():
return nil, false, errors.New("context canceled")
default:
}
args := m.Called(ctx, name, deleteValidation, options)
if name == "object-fail" {
return nil, false, args.Error(2)
@@ -75,6 +104,11 @@ func (m legacyStoreMock) Delete(ctx context.Context, name string, deleteValidati
}
func (m legacyStoreMock) DeleteCollection(ctx context.Context, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) (runtime.Object, error) {
select {
case <-ctx.Done():
return nil, errors.New("context canceled")
default:
}
args := m.Called(ctx, deleteValidation, options, listOptions)
if options.Kind == "fail" {
return nil, args.Error(1)

View File

@@ -17,6 +17,7 @@ import (
gapiutil "github.com/grafana/grafana/pkg/services/apiserver/utils"
"github.com/grafana/grafana/pkg/services/dashboards"
"github.com/grafana/grafana/pkg/services/folder"
"github.com/grafana/grafana/pkg/util"
)
func LegacyCreateCommandToUnstructured(cmd folder.CreateFolderCommand) (unstructured.Unstructured, error) {
@@ -29,6 +30,9 @@ func LegacyCreateCommandToUnstructured(cmd folder.CreateFolderCommand) (unstruct
},
}
// #TODO: let's see if we need to set the json field to "-"
if cmd.UID == "" {
cmd.UID = util.GenerateShortUID()
}
obj.SetName(cmd.UID)
if err := setParentUID(&obj, cmd.ParentUID); err != nil {

View File

@@ -8,6 +8,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/storage"
"k8s.io/klog/v2"
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/apimachinery/utils"
@@ -26,11 +27,12 @@ func (s *Storage) prepareObjectForStorage(ctx context.Context, newObject runtime
return nil, err
}
if obj.GetName() == "" {
return nil, fmt.Errorf("new object must have a name")
return nil, storage.ErrResourceVersionSetOnCreate
}
if obj.GetResourceVersion() != "" {
return nil, storage.ErrResourceVersionSetOnCreate
}
obj.SetGenerateName("") // Clear the random name field
obj.SetResourceVersion("")
obj.SetSelfLink("")
@@ -75,9 +77,19 @@ func (s *Storage) prepareObjectForUpdate(ctx context.Context, updateObject runti
if err != nil {
return nil, err
}
if previous.GetUID() == "" {
klog.Errorf("object is missing UID: %s, %s", obj.GetGroupVersionKind().String(), obj.GetName())
}
if obj.GetName() != previous.GetName() {
return nil, fmt.Errorf("name mismatch between existing and updated object")
}
obj.SetUID(previous.GetUID())
obj.SetCreatedBy(previous.GetCreatedBy())
obj.SetCreationTimestamp(previous.GetCreationTimestamp())
obj.SetResourceVersion("")
// Read+write will verify that origin format is accurate
origin, err := obj.GetOriginInfo()

View File

@@ -460,11 +460,13 @@ func (s *Storage) GuaranteedUpdate(
if err != nil {
return err
}
mmm, err := utils.MetaAccessor(existingObj)
if err != nil {
return err
}
mmm.SetResourceVersionInt64(rsp.ResourceVersion)
res.ResourceVersion = uint64(rsp.ResourceVersion)
if err := preconditions.Check(key, existingObj); err != nil {
if attempt >= MaxUpdateAttempts {

View File

@@ -280,6 +280,14 @@ func (s *server) newEvent(ctx context.Context, user claims.AuthInfo, key *Resour
return nil, AsErrorResult(err)
}
if obj.GetUID() == "" {
s.log.Error("object is missing UID", "key", key)
}
if obj.GetResourceVersion() != "" {
s.log.Error("object must not include a resource version", "key", key)
}
event := &WriteEvent{
Value: value,
Key: key,

View File

@@ -52,7 +52,7 @@ func TestSimpleServer(t *testing.T) {
t.Run("playlist happy CRUD paths", func(t *testing.T) {
raw := []byte(`{
"apiVersion": "playlist.grafana.app/v0alpha1",
"apiVersion": "playlist.grafana.app/v0alpha1",
"kind": "Playlist",
"metadata": {
"name": "fdgsv37qslr0ga",
@@ -167,7 +167,7 @@ func TestSimpleServer(t *testing.T) {
t.Run("playlist update optimistic concurrency check", func(t *testing.T) {
raw := []byte(`{
"apiVersion": "playlist.grafana.app/v0alpha1",
"apiVersion": "playlist.grafana.app/v0alpha1",
"kind": "Playlist",
"metadata": {
"name": "fdgsv37qslr0ga",

View File

@@ -190,7 +190,6 @@ func (c *K8sResourceClient) SanitizeJSON(v *unstructured.Unstructured, replaceMe
copy := c.sanitizeObject(v, replaceMeta...)
out, err := json.MarshalIndent(copy, "", " ")
// fmt.Printf("%s", out)
require.NoError(c.t, err)
return string(out)
}

View File

@@ -10,6 +10,7 @@ import (
"testing"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -358,6 +359,35 @@ func TestIntegrationPlaylist(t *testing.T) {
doPlaylistTests(t, helper)
})
t.Run("with dual write (etcd, mode 5)", func(t *testing.T) {
// NOTE: running local etcd, that will be wiped clean!
t.Skip("local etcd testing")
helper := apis.NewK8sTestHelper(t, testinfra.GrafanaOpts{
AppModeProduction: true,
DisableAnonymous: true,
APIServerStorageType: "etcd", // requires etcd running on localhost:2379
EnableFeatureToggles: []string{
featuremgmt.FlagKubernetesPlaylists, // Required so that legacy calls are also written
},
UnifiedStorageConfig: map[string]setting.UnifiedStorageConfig{
RESOURCEGROUP: {
DualWriterMode: grafanarest.Mode5,
},
},
})
// Clear the collection before starting (etcd)
client := helper.GetResourceClient(apis.ResourceClientArgs{
User: helper.Org1.Admin,
GVR: gvr,
})
err := client.Resource.DeleteCollection(context.Background(), metav1.DeleteOptions{}, metav1.ListOptions{})
require.NoError(t, err)
doPlaylistTests(t, helper)
})
}
func doPlaylistTests(t *testing.T, helper *apis.K8sTestHelper) *apis.K8sTestHelper {
@@ -460,31 +490,31 @@ func doPlaylistTests(t *testing.T, helper *apis.K8sTestHelper) *apis.K8sTestHelp
require.NotEmpty(t, uid)
expectedResult := `{
"apiVersion": "playlist.grafana.app/v0alpha1",
"kind": "Playlist",
"metadata": {
"creationTimestamp": "${creationTimestamp}",
"name": "` + uid + `",
"namespace": "default",
"resourceVersion": "${resourceVersion}",
"uid": "${uid}"
},
"spec": {
"interval": "20s",
"items": [
{
"type": "dashboard_by_uid",
"value": "xCmMwXdVz"
},
{
"type": "dashboard_by_tag",
"value": "graph-ng"
}
],
"title": "Test"
},
"status": {}
}`
"apiVersion": "playlist.grafana.app/v0alpha1",
"kind": "Playlist",
"metadata": {
"creationTimestamp": "${creationTimestamp}",
"name": "` + uid + `",
"namespace": "default",
"resourceVersion": "${resourceVersion}",
"uid": "${uid}"
},
"spec": {
"interval": "20s",
"items": [
{
"type": "dashboard_by_uid",
"value": "xCmMwXdVz"
},
{
"type": "dashboard_by_tag",
"value": "graph-ng"
}
],
"title": "Test"
},
"status": {}
}`
// List includes the expected result
k8sList, err := client.Resource.List(context.Background(), metav1.ListOptions{})
@@ -500,7 +530,7 @@ func doPlaylistTests(t *testing.T, helper *apis.K8sTestHelper) *apis.K8sTestHelp
// Now modify the interval
updatedInterval := `"interval": "10m"`
legacyPayload = strings.Replace(legacyPayload, `"interval": "20s"`, updatedInterval, 1)
expectedResult = strings.Replace(expectedResult, `"interval": "20s"`, updatedInterval, 1)
require.JSONEq(t, expectedResult, client.SanitizeJSON(&k8sList.Items[0]))
dtoResponse := apis.DoRequest(helper, apis.RequestParams{
User: client.Args.User,
Method: http.MethodPut,
@@ -511,10 +541,48 @@ func doPlaylistTests(t *testing.T, helper *apis.K8sTestHelper) *apis.K8sTestHelp
require.Equal(t, uid, dtoResponse.Result.Uid)
require.Equal(t, "10m", dtoResponse.Result.Interval)
expectedUnstructuredResult := &unstructured.Unstructured{
Object: map[string]any{
"apiVersion": "playlist.grafana.app/v0alpha1",
"kind": "Playlist",
"metadata": map[string]any{
"creationTimestamp": "123",
"name": uid,
"namespace": "default",
"resourceVersion": "123",
"uid": uid,
},
"spec": map[string]any{
"interval": "10m",
"items": []interface{}{
map[string]any{
"type": "dashboard_by_uid",
"value": "xCmMwXdVz",
},
map[string]any{
"type": "dashboard_by_tag",
"value": "graph-ng",
},
},
"title": "Test",
},
"status": map[string]any{},
},
}
accExpected, err := meta.Accessor(expectedUnstructuredResult)
require.NoError(t, err)
expectedSpec, _, err := unstructured.NestedMap(expectedUnstructuredResult.Object, "spec")
require.NoError(t, err)
// Make sure the changed interval is now returned from k8s
found, err = client.Resource.Get(context.Background(), uid, metav1.GetOptions{})
require.NoError(t, err)
require.JSONEq(t, expectedResult, client.SanitizeJSON(found))
foundSpec, _, err := unstructured.NestedMap(found.Object, "spec")
require.NoError(t, err)
require.Equal(t, accExpected.GetName(), found.GetName())
require.Equal(t, expectedSpec, foundSpec)
// Delete does not return anything
deleteResponse := apis.DoRequest(helper, apis.RequestParams{