diff --git a/pkg/apiserver/rest/dualwriter_mode1.go b/pkg/apiserver/rest/dualwriter_mode1.go index fc03211a5a1..2ce24e52008 100644 --- a/pkg/apiserver/rest/dualwriter_mode1.go +++ b/pkg/apiserver/rest/dualwriter_mode1.go @@ -288,12 +288,15 @@ func (d *DualWriterMode1) Update(ctx context.Context, name string, objInfo rest. } //nolint:errcheck - go d.updateOnUnifiedStorage(ctx, objLegacy, name, objInfo, createValidation, updateValidation, forceAllowCreate, options) + go d.updateOnUnifiedStorageMode1(ctx, objLegacy, name, objInfo, createValidation, updateValidation, forceAllowCreate, options) return objLegacy, async, err } -func (d *DualWriterMode1) updateOnUnifiedStorage(ctx context.Context, objLegacy runtime.Object, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) error { +func (d *DualWriterMode1) updateOnUnifiedStorageMode1(ctx context.Context, objLegacy runtime.Object, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) error { + // The incoming RV is from legacy storage, so we can ignore it + ctx = context.WithValue(ctx, dualWriteContextKey{}, true) + var method = "update" log := d.Log.WithValues("name", name, "method", method, "name", name) diff --git a/pkg/apiserver/rest/dualwriter_mode1_test.go b/pkg/apiserver/rest/dualwriter_mode1_test.go index 0438eb45cdc..fdae37d76ca 100644 --- a/pkg/apiserver/rest/dualwriter_mode1_test.go +++ b/pkg/apiserver/rest/dualwriter_mode1_test.go @@ -734,7 +734,7 @@ func TestMode1_UpdateOnUnifiedStorage(t *testing.T) { dw := NewDualWriter(Mode1, ls, us, p, kind) - err := dw.(*DualWriterMode1).updateOnUnifiedStorage(ctx, exampleObj, tt.input, updatedObjInfoObj{}, func(ctx context.Context, obj runtime.Object) error { return nil }, func(ctx context.Context, obj, old runtime.Object) error { return nil }, false, &metav1.UpdateOptions{}) + err := dw.(*DualWriterMode1).updateOnUnifiedStorageMode1(ctx, exampleObj, tt.input, updatedObjInfoObj{}, func(ctx context.Context, obj runtime.Object) error { return nil }, func(ctx context.Context, obj, old runtime.Object) error { return nil }, false, &metav1.UpdateOptions{}) assert.NoError(t, err) }) } diff --git a/pkg/apiserver/rest/dualwriter_mode2.go b/pkg/apiserver/rest/dualwriter_mode2.go index f28737aab93..c60f49c47b1 100644 --- a/pkg/apiserver/rest/dualwriter_mode2.go +++ b/pkg/apiserver/rest/dualwriter_mode2.go @@ -5,7 +5,6 @@ import ( "fmt" "time" - "github.com/grafana/grafana/pkg/apimachinery/utils" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" @@ -13,8 +12,16 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apiserver/pkg/registry/rest" "k8s.io/klog/v2" + + "github.com/grafana/grafana/pkg/apimachinery/utils" ) +type dualWriteContextKey struct{} + +func IsDualWriteUpdate(ctx context.Context) bool { + return ctx.Value(dualWriteContextKey{}) == true +} + type DualWriterMode2 struct { Storage Storage Legacy LegacyStorage @@ -26,7 +33,9 @@ type DualWriterMode2 struct { const mode2Str = "2" // NewDualWriterMode2 returns a new DualWriter in mode 2. -// Mode 2 represents writing to LegacyStorage and Storage and reading from LegacyStorage. +// Mode 2 represents writing to LegacyStorage first, then to Storage +// When reading, values from storage will be returned if they exist +// otherwise the value from legacy will be used func newDualWriterMode2(legacy LegacyStorage, storage Storage, dwm *dualWriterMetrics, resource string) *DualWriterMode2 { return &DualWriterMode2{ Legacy: legacy, @@ -127,9 +136,7 @@ func (d *DualWriterMode2) Get(ctx context.Context, name string, options *metav1. } if objStorage != nil { - if err := updateRVOnLegacyObj(objStorage, objLegacy); err != nil { - log.WithValues("storageObject", objStorage, "legacyObject", objLegacy).Error(err, "could not update resource version") - } + return objStorage, err } return objLegacy, err @@ -287,6 +294,10 @@ func (d *DualWriterMode2) Update(ctx context.Context, name string, objInfo rest. log := d.Log.WithValues("name", name, "method", method) ctx = klog.NewContext(ctx, log) + // The incoming RV is not stable -- it may be from legacy or storage! + // This sets a flag in the context and our apistore is more lenient when it exists + ctx = context.WithValue(ctx, dualWriteContextKey{}, true) + startLegacy := time.Now() objFromLegacy, created, err := d.Legacy.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options) if err != nil { @@ -311,28 +322,12 @@ func (d *DualWriterMode2) Update(ctx context.Context, name string, objInfo rest. } if objFromStorage != nil { - if err := updateRVOnLegacyObj(objFromStorage, objFromLegacy); err != nil { - log.WithValues("storageObject", objFromStorage, "legacyObject", objFromLegacy).Error(err, "could not update resource version") - } + return objFromStorage, created, err } return objFromLegacy, created, err } -func updateRVOnLegacyObj(storageObj runtime.Object, legacyObj runtime.Object) error { - storageAccessor, err := utils.MetaAccessor(storageObj) - if err != nil { - return err - } - legacyAccessor, err := utils.MetaAccessor(legacyObj) - if err != nil { - return err - } - - legacyAccessor.SetResourceVersion(storageAccessor.GetResourceVersion()) - return nil -} - func (d *DualWriterMode2) Destroy() { d.Storage.Destroy() d.Legacy.Destroy() diff --git a/pkg/apiserver/rest/dualwriter_mode3.go b/pkg/apiserver/rest/dualwriter_mode3.go index e74ad5de2e7..654d038e37e 100644 --- a/pkg/apiserver/rest/dualwriter_mode3.go +++ b/pkg/apiserver/rest/dualwriter_mode3.go @@ -244,12 +244,15 @@ func (d *DualWriterMode3) Update(ctx context.Context, name string, objInfo rest. } //nolint:errcheck - go d.updateOnLegacyStorage(ctx, objFromStorage, name, objInfo, createValidation, updateValidation, forceAllowCreate, options) + go d.updateOnLegacyStorageMode3(ctx, objFromStorage, name, objInfo, createValidation, updateValidation, forceAllowCreate, options) return objFromStorage, async, err } -func (d *DualWriterMode3) updateOnLegacyStorage(ctx context.Context, storageObj runtime.Object, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) error { +func (d *DualWriterMode3) updateOnLegacyStorageMode3(ctx context.Context, storageObj runtime.Object, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) error { + // The incoming RV is from unified storage, so legacy can ignore it + ctx = context.WithValue(ctx, dualWriteContextKey{}, true) + var method = "update" log := d.Log.WithValues("name", name, "method", method, "name", name) diff --git a/pkg/apiserver/rest/dualwriter_mode3_test.go b/pkg/apiserver/rest/dualwriter_mode3_test.go index 320c462e732..7aa1d0b75c3 100644 --- a/pkg/apiserver/rest/dualwriter_mode3_test.go +++ b/pkg/apiserver/rest/dualwriter_mode3_test.go @@ -675,7 +675,7 @@ func TestMode1_UpdateOnLegacyStorage(t *testing.T) { dw := NewDualWriter(Mode3, ls, us, p, kind) - err := dw.(*DualWriterMode3).updateOnLegacyStorage(ctx, exampleObj, tt.input, updatedObjInfoObj{}, func(ctx context.Context, obj runtime.Object) error { return nil }, func(ctx context.Context, obj, old runtime.Object) error { return nil }, false, &metav1.UpdateOptions{}) + err := dw.(*DualWriterMode3).updateOnLegacyStorageMode3(ctx, exampleObj, tt.input, updatedObjInfoObj{}, func(ctx context.Context, obj runtime.Object) error { return nil }, func(ctx context.Context, obj, old runtime.Object) error { return nil }, false, &metav1.UpdateOptions{}) assert.NoError(t, err) }) } diff --git a/pkg/storage/unified/apistore/prepare.go b/pkg/storage/unified/apistore/prepare.go index d7993b23b9b..877a96bff00 100644 --- a/pkg/storage/unified/apistore/prepare.go +++ b/pkg/storage/unified/apistore/prepare.go @@ -80,16 +80,18 @@ func (s *Storage) prepareObjectForUpdate(ctx context.Context, updateObject runti if previous.GetUID() == "" { klog.Errorf("object is missing UID: %s, %s", obj.GetGroupVersionKind().String(), obj.GetName()) + } else if obj.GetUID() != previous.GetUID() { + klog.Errorf("object UID mismatch: %s, was:%s, now: %s", obj.GetGroupVersionKind().String(), previous.GetName(), obj.GetUID()) + obj.SetUID(previous.GetUID()) } if obj.GetName() != previous.GetName() { return nil, fmt.Errorf("name mismatch between existing and updated object") } - obj.SetUID(previous.GetUID()) obj.SetCreatedBy(previous.GetCreatedBy()) obj.SetCreationTimestamp(previous.GetCreationTimestamp()) - obj.SetResourceVersion("") + obj.SetResourceVersion("") // removed from saved JSON because the RV is not yet calculated // Read+write will verify that origin format is accurate origin, err := obj.GetOriginInfo() diff --git a/pkg/storage/unified/apistore/store.go b/pkg/storage/unified/apistore/store.go index b220a32ba21..86a6563e854 100644 --- a/pkg/storage/unified/apistore/store.go +++ b/pkg/storage/unified/apistore/store.go @@ -29,6 +29,7 @@ import ( "github.com/grafana/grafana/pkg/apimachinery/utils" grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic" + "github.com/grafana/grafana/pkg/apiserver/rest" "github.com/grafana/grafana/pkg/storage/unified/resource" ) @@ -468,11 +469,16 @@ func (s *Storage) GuaranteedUpdate( mmm.SetResourceVersionInt64(rsp.ResourceVersion) res.ResourceVersion = uint64(rsp.ResourceVersion) - if err := preconditions.Check(key, existingObj); err != nil { - if attempt >= MaxUpdateAttempts { - return fmt.Errorf("precondition failed: %w", err) + if rest.IsDualWriteUpdate(ctx) { + // Ignore the RV when updating legacy values + mmm.SetResourceVersion("") + } else { + if err := preconditions.Check(key, existingObj); err != nil { + if attempt >= MaxUpdateAttempts { + return fmt.Errorf("precondition failed: %w", err) + } + continue } - continue } } else if !ignoreNotFound { return apierrors.NewNotFound(s.gr, req.Key.Name) diff --git a/pkg/tests/apis/dashboard/dashboards_test.go b/pkg/tests/apis/dashboard/dashboards_test.go index 9031f2d94b9..b442ba3b040 100644 --- a/pkg/tests/apis/dashboard/dashboards_test.go +++ b/pkg/tests/apis/dashboard/dashboards_test.go @@ -5,16 +5,17 @@ import ( "strings" "testing" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "github.com/grafana/grafana/pkg/services/apiserver/options" "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/tests/apis" "github.com/grafana/grafana/pkg/tests/testinfra" "github.com/grafana/grafana/pkg/tests/testsuite" - "github.com/stretchr/testify/require" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime/schema" ) var gvr = schema.GroupVersionResource{ @@ -34,7 +35,7 @@ func TestIntegrationRequiresDevMode(t *testing.T) { helper := apis.NewK8sTestHelper(t, testinfra.GrafanaOpts{ AppModeProduction: true, // should fail DisableAnonymous: true, - APIServerStorageType: options.StorageTypeUnifiedGrpc, // tests remote connection + APIServerStorageType: options.StorageTypeUnified, // tests local unified storage connection EnableFeatureToggles: []string{ featuremgmt.FlagGrafanaAPIServerWithExperimentalAPIs, // Required to start the example service }, diff --git a/pkg/tests/apis/helper.go b/pkg/tests/apis/helper.go index c039bb26d31..bed20f97f19 100644 --- a/pkg/tests/apis/helper.go +++ b/pkg/tests/apis/helper.go @@ -20,7 +20,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer/yaml" yamlutil "k8s.io/apimachinery/pkg/util/yaml" - "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" @@ -33,6 +32,7 @@ import ( "github.com/grafana/grafana/pkg/services/accesscontrol/ossaccesscontrol" "github.com/grafana/grafana/pkg/services/accesscontrol/resourcepermissions" "github.com/grafana/grafana/pkg/services/apiserver/endpoints/request" + "github.com/grafana/grafana/pkg/services/apiserver/options" "github.com/grafana/grafana/pkg/services/datasources" "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/services/org" @@ -63,6 +63,12 @@ type K8sTestHelper struct { func NewK8sTestHelper(t *testing.T, opts testinfra.GrafanaOpts) *K8sTestHelper { t.Helper() + + // Use GRPC server when not configured + if opts.APIServerStorageType == "" && opts.GRPCServerAddress == "" { + opts.APIServerStorageType = options.StorageTypeUnifiedGrpc + } + // Always enable `FlagAppPlatformGrpcClientAuth` for k8s integration tests, as this is the desired behavior. // The flag only exists to support the transition from the old to the new behavior in dev/ops/prod. opts.EnableFeatureToggles = append(opts.EnableFeatureToggles, featuremgmt.FlagAppPlatformGrpcClientAuth)