mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
UnifiedStorage: Default to running unified-grpc in integration tests (#93492)
This commit is contained in:
parent
a93de5f0d8
commit
4e1f0dadbd
@ -288,12 +288,15 @@ func (d *DualWriterMode1) Update(ctx context.Context, name string, objInfo rest.
|
||||
}
|
||||
|
||||
//nolint:errcheck
|
||||
go d.updateOnUnifiedStorage(ctx, objLegacy, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
|
||||
go d.updateOnUnifiedStorageMode1(ctx, objLegacy, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
|
||||
|
||||
return objLegacy, async, err
|
||||
}
|
||||
|
||||
func (d *DualWriterMode1) updateOnUnifiedStorage(ctx context.Context, objLegacy runtime.Object, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) error {
|
||||
func (d *DualWriterMode1) updateOnUnifiedStorageMode1(ctx context.Context, objLegacy runtime.Object, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) error {
|
||||
// The incoming RV is from legacy storage, so we can ignore it
|
||||
ctx = context.WithValue(ctx, dualWriteContextKey{}, true)
|
||||
|
||||
var method = "update"
|
||||
log := d.Log.WithValues("name", name, "method", method, "name", name)
|
||||
|
||||
|
@ -734,7 +734,7 @@ func TestMode1_UpdateOnUnifiedStorage(t *testing.T) {
|
||||
|
||||
dw := NewDualWriter(Mode1, ls, us, p, kind)
|
||||
|
||||
err := dw.(*DualWriterMode1).updateOnUnifiedStorage(ctx, exampleObj, tt.input, updatedObjInfoObj{}, func(ctx context.Context, obj runtime.Object) error { return nil }, func(ctx context.Context, obj, old runtime.Object) error { return nil }, false, &metav1.UpdateOptions{})
|
||||
err := dw.(*DualWriterMode1).updateOnUnifiedStorageMode1(ctx, exampleObj, tt.input, updatedObjInfoObj{}, func(ctx context.Context, obj runtime.Object) error { return nil }, func(ctx context.Context, obj, old runtime.Object) error { return nil }, false, &metav1.UpdateOptions{})
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
@ -5,7 +5,6 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
|
||||
@ -13,8 +12,16 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||
)
|
||||
|
||||
type dualWriteContextKey struct{}
|
||||
|
||||
func IsDualWriteUpdate(ctx context.Context) bool {
|
||||
return ctx.Value(dualWriteContextKey{}) == true
|
||||
}
|
||||
|
||||
type DualWriterMode2 struct {
|
||||
Storage Storage
|
||||
Legacy LegacyStorage
|
||||
@ -26,7 +33,9 @@ type DualWriterMode2 struct {
|
||||
const mode2Str = "2"
|
||||
|
||||
// NewDualWriterMode2 returns a new DualWriter in mode 2.
|
||||
// Mode 2 represents writing to LegacyStorage and Storage and reading from LegacyStorage.
|
||||
// Mode 2 represents writing to LegacyStorage first, then to Storage
|
||||
// When reading, values from storage will be returned if they exist
|
||||
// otherwise the value from legacy will be used
|
||||
func newDualWriterMode2(legacy LegacyStorage, storage Storage, dwm *dualWriterMetrics, resource string) *DualWriterMode2 {
|
||||
return &DualWriterMode2{
|
||||
Legacy: legacy,
|
||||
@ -127,9 +136,7 @@ func (d *DualWriterMode2) Get(ctx context.Context, name string, options *metav1.
|
||||
}
|
||||
|
||||
if objStorage != nil {
|
||||
if err := updateRVOnLegacyObj(objStorage, objLegacy); err != nil {
|
||||
log.WithValues("storageObject", objStorage, "legacyObject", objLegacy).Error(err, "could not update resource version")
|
||||
}
|
||||
return objStorage, err
|
||||
}
|
||||
|
||||
return objLegacy, err
|
||||
@ -287,6 +294,10 @@ func (d *DualWriterMode2) Update(ctx context.Context, name string, objInfo rest.
|
||||
log := d.Log.WithValues("name", name, "method", method)
|
||||
ctx = klog.NewContext(ctx, log)
|
||||
|
||||
// The incoming RV is not stable -- it may be from legacy or storage!
|
||||
// This sets a flag in the context and our apistore is more lenient when it exists
|
||||
ctx = context.WithValue(ctx, dualWriteContextKey{}, true)
|
||||
|
||||
startLegacy := time.Now()
|
||||
objFromLegacy, created, err := d.Legacy.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
|
||||
if err != nil {
|
||||
@ -311,28 +322,12 @@ func (d *DualWriterMode2) Update(ctx context.Context, name string, objInfo rest.
|
||||
}
|
||||
|
||||
if objFromStorage != nil {
|
||||
if err := updateRVOnLegacyObj(objFromStorage, objFromLegacy); err != nil {
|
||||
log.WithValues("storageObject", objFromStorage, "legacyObject", objFromLegacy).Error(err, "could not update resource version")
|
||||
}
|
||||
return objFromStorage, created, err
|
||||
}
|
||||
|
||||
return objFromLegacy, created, err
|
||||
}
|
||||
|
||||
func updateRVOnLegacyObj(storageObj runtime.Object, legacyObj runtime.Object) error {
|
||||
storageAccessor, err := utils.MetaAccessor(storageObj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
legacyAccessor, err := utils.MetaAccessor(legacyObj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
legacyAccessor.SetResourceVersion(storageAccessor.GetResourceVersion())
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *DualWriterMode2) Destroy() {
|
||||
d.Storage.Destroy()
|
||||
d.Legacy.Destroy()
|
||||
|
@ -244,12 +244,15 @@ func (d *DualWriterMode3) Update(ctx context.Context, name string, objInfo rest.
|
||||
}
|
||||
|
||||
//nolint:errcheck
|
||||
go d.updateOnLegacyStorage(ctx, objFromStorage, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
|
||||
go d.updateOnLegacyStorageMode3(ctx, objFromStorage, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
|
||||
|
||||
return objFromStorage, async, err
|
||||
}
|
||||
|
||||
func (d *DualWriterMode3) updateOnLegacyStorage(ctx context.Context, storageObj runtime.Object, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) error {
|
||||
func (d *DualWriterMode3) updateOnLegacyStorageMode3(ctx context.Context, storageObj runtime.Object, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) error {
|
||||
// The incoming RV is from unified storage, so legacy can ignore it
|
||||
ctx = context.WithValue(ctx, dualWriteContextKey{}, true)
|
||||
|
||||
var method = "update"
|
||||
log := d.Log.WithValues("name", name, "method", method, "name", name)
|
||||
|
||||
|
@ -675,7 +675,7 @@ func TestMode1_UpdateOnLegacyStorage(t *testing.T) {
|
||||
|
||||
dw := NewDualWriter(Mode3, ls, us, p, kind)
|
||||
|
||||
err := dw.(*DualWriterMode3).updateOnLegacyStorage(ctx, exampleObj, tt.input, updatedObjInfoObj{}, func(ctx context.Context, obj runtime.Object) error { return nil }, func(ctx context.Context, obj, old runtime.Object) error { return nil }, false, &metav1.UpdateOptions{})
|
||||
err := dw.(*DualWriterMode3).updateOnLegacyStorageMode3(ctx, exampleObj, tt.input, updatedObjInfoObj{}, func(ctx context.Context, obj runtime.Object) error { return nil }, func(ctx context.Context, obj, old runtime.Object) error { return nil }, false, &metav1.UpdateOptions{})
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
@ -80,16 +80,18 @@ func (s *Storage) prepareObjectForUpdate(ctx context.Context, updateObject runti
|
||||
|
||||
if previous.GetUID() == "" {
|
||||
klog.Errorf("object is missing UID: %s, %s", obj.GetGroupVersionKind().String(), obj.GetName())
|
||||
} else if obj.GetUID() != previous.GetUID() {
|
||||
klog.Errorf("object UID mismatch: %s, was:%s, now: %s", obj.GetGroupVersionKind().String(), previous.GetName(), obj.GetUID())
|
||||
obj.SetUID(previous.GetUID())
|
||||
}
|
||||
|
||||
if obj.GetName() != previous.GetName() {
|
||||
return nil, fmt.Errorf("name mismatch between existing and updated object")
|
||||
}
|
||||
|
||||
obj.SetUID(previous.GetUID())
|
||||
obj.SetCreatedBy(previous.GetCreatedBy())
|
||||
obj.SetCreationTimestamp(previous.GetCreationTimestamp())
|
||||
obj.SetResourceVersion("")
|
||||
obj.SetResourceVersion("") // removed from saved JSON because the RV is not yet calculated
|
||||
|
||||
// Read+write will verify that origin format is accurate
|
||||
origin, err := obj.GetOriginInfo()
|
||||
|
@ -29,6 +29,7 @@ import (
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
|
||||
"github.com/grafana/grafana/pkg/apiserver/rest"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||
)
|
||||
|
||||
@ -468,12 +469,17 @@ func (s *Storage) GuaranteedUpdate(
|
||||
mmm.SetResourceVersionInt64(rsp.ResourceVersion)
|
||||
res.ResourceVersion = uint64(rsp.ResourceVersion)
|
||||
|
||||
if rest.IsDualWriteUpdate(ctx) {
|
||||
// Ignore the RV when updating legacy values
|
||||
mmm.SetResourceVersion("")
|
||||
} else {
|
||||
if err := preconditions.Check(key, existingObj); err != nil {
|
||||
if attempt >= MaxUpdateAttempts {
|
||||
return fmt.Errorf("precondition failed: %w", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
} else if !ignoreNotFound {
|
||||
return apierrors.NewNotFound(s.gr, req.Key.Name)
|
||||
}
|
||||
|
@ -5,16 +5,17 @@ import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/options"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/tests/apis"
|
||||
"github.com/grafana/grafana/pkg/tests/testinfra"
|
||||
"github.com/grafana/grafana/pkg/tests/testsuite"
|
||||
"github.com/stretchr/testify/require"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
)
|
||||
|
||||
var gvr = schema.GroupVersionResource{
|
||||
@ -34,7 +35,7 @@ func TestIntegrationRequiresDevMode(t *testing.T) {
|
||||
helper := apis.NewK8sTestHelper(t, testinfra.GrafanaOpts{
|
||||
AppModeProduction: true, // should fail
|
||||
DisableAnonymous: true,
|
||||
APIServerStorageType: options.StorageTypeUnifiedGrpc, // tests remote connection
|
||||
APIServerStorageType: options.StorageTypeUnified, // tests local unified storage connection
|
||||
EnableFeatureToggles: []string{
|
||||
featuremgmt.FlagGrafanaAPIServerWithExperimentalAPIs, // Required to start the example service
|
||||
},
|
||||
|
@ -20,7 +20,6 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer/yaml"
|
||||
yamlutil "k8s.io/apimachinery/pkg/util/yaml"
|
||||
|
||||
"k8s.io/client-go/discovery"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/rest"
|
||||
@ -33,6 +32,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/accesscontrol/ossaccesscontrol"
|
||||
"github.com/grafana/grafana/pkg/services/accesscontrol/resourcepermissions"
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/options"
|
||||
"github.com/grafana/grafana/pkg/services/datasources"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/org"
|
||||
@ -63,6 +63,12 @@ type K8sTestHelper struct {
|
||||
|
||||
func NewK8sTestHelper(t *testing.T, opts testinfra.GrafanaOpts) *K8sTestHelper {
|
||||
t.Helper()
|
||||
|
||||
// Use GRPC server when not configured
|
||||
if opts.APIServerStorageType == "" && opts.GRPCServerAddress == "" {
|
||||
opts.APIServerStorageType = options.StorageTypeUnifiedGrpc
|
||||
}
|
||||
|
||||
// Always enable `FlagAppPlatformGrpcClientAuth` for k8s integration tests, as this is the desired behavior.
|
||||
// The flag only exists to support the transition from the old to the new behavior in dev/ops/prod.
|
||||
opts.EnableFeatureToggles = append(opts.EnableFeatureToggles, featuremgmt.FlagAppPlatformGrpcClientAuth)
|
||||
|
Loading…
Reference in New Issue
Block a user