K8s/Playlist: Test dual write path (#77604)

This commit is contained in:
Ryan McKinley 2023-11-14 14:07:32 -08:00 committed by GitHub
parent 19a7cd88b0
commit 2b1e731c15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 113 additions and 16 deletions

View File

@ -3,10 +3,13 @@ package rest
import (
"context"
"k8s.io/apimachinery/pkg/api/meta"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/registry/rest"
"github.com/grafana/grafana/pkg/infra/log"
)
var (
@ -26,6 +29,7 @@ type Storage interface {
rest.Scoper
rest.TableConvertor
rest.SingularNameProvider
rest.Getter
}
// LegacyStorage is a storage implementation that writes to the Grafana SQL database.
@ -38,7 +42,12 @@ type LegacyStorage interface {
// DualWriter is a storage implementation that writes first to LegacyStorage and then to Storage.
// If writing to LegacyStorage fails, the write to Storage is skipped and the error is returned.
// Storage is used for all read operations.
// Storage is used for all read operations. This is useful as a migration step from SQL based
// legacy storage to a more standard kubernetes backed storage interface.
//
// NOTE: Only values supported by legacy storage will be preserved in the CREATE/UPDATE commands.
// For example, annotations, labels, and managed fields may not be preserved. Everything in upstream
// storage can be recrated from the data in legacy storage.
//
// The LegacyStorage implementation must implement the following interfaces:
// - rest.Storage
@ -54,6 +63,7 @@ type LegacyStorage interface {
type DualWriter struct {
Storage
legacy LegacyStorage
log log.Logger
}
// NewDualWriter returns a new DualWriter.
@ -61,16 +71,23 @@ func NewDualWriter(legacy LegacyStorage, storage Storage) *DualWriter {
return &DualWriter{
Storage: storage,
legacy: legacy,
log: log.New("grafana-apiserver.dualwriter"),
}
}
// Create overrides the default behavior of the Storage and writes to both the LegacyStorage and Storage.
func (d *DualWriter) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
if legacy, ok := d.legacy.(rest.Creater); ok {
_, err := legacy.Create(ctx, obj, createValidation, options)
created, err := legacy.Create(ctx, obj, createValidation, options)
if err != nil {
return nil, err
}
obj = created // write the updated version
rsp, err := d.Storage.Create(ctx, obj, createValidation, options)
if err != nil {
d.log.Error("unable to create object in duplicate storage", "error", err)
}
return rsp, err
}
return d.Storage.Create(ctx, obj, createValidation, options)
@ -79,10 +96,50 @@ func (d *DualWriter) Create(ctx context.Context, obj runtime.Object, createValid
// Update overrides the default behavior of the Storage and writes to both the LegacyStorage and Storage.
func (d *DualWriter) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
if legacy, ok := d.legacy.(rest.Updater); ok {
_, _, err := legacy.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
// Will resource version checking work????
old, err := d.Get(ctx, name, &metav1.GetOptions{})
if err != nil {
return nil, false, err
}
accessor, err := meta.Accessor(old)
if err != nil {
return nil, false, err
}
// Hold on to the RV+UID for the dual write
theRV := accessor.GetResourceVersion()
theUID := accessor.GetUID()
// Changes applied within new storage
// will fail if RV is out of sync
updated, err := objInfo.UpdatedObject(ctx, old)
if err != nil {
return nil, false, err
}
accessor, err = meta.Accessor(updated)
if err != nil {
return nil, false, err
}
accessor.SetUID("") // clear it
accessor.SetResourceVersion("") // remove it so it is not a constraint
obj, created, err := legacy.Update(ctx, name, &updateWrapper{
upstream: objInfo,
updated: updated, // returned as the object that will be updated
}, createValidation, updateValidation, forceAllowCreate, options)
if err != nil {
return obj, created, err
}
accessor, err = meta.Accessor(obj)
if err != nil {
return nil, false, err
}
accessor.SetResourceVersion(theRV) // the original RV
accessor.SetUID(theUID)
objInfo = &updateWrapper{
upstream: objInfo,
updated: obj, // returned as the object that will be updated
}
}
return d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
@ -90,24 +147,41 @@ func (d *DualWriter) Update(ctx context.Context, name string, objInfo rest.Updat
// Delete overrides the default behavior of the Storage and delete from both the LegacyStorage and Storage.
func (d *DualWriter) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
if legacy, ok := d.legacy.(rest.GracefulDeleter); ok {
_, _, err := legacy.Delete(ctx, name, deleteValidation, options)
if err != nil {
return nil, false, err
// Delete from storage *first* so the item is still exists if a failure happens
obj, async, err := d.Storage.Delete(ctx, name, deleteValidation, options)
if err == nil {
if legacy, ok := d.legacy.(rest.GracefulDeleter); ok {
obj, async, err = legacy.Delete(ctx, name, deleteValidation, options)
}
}
return d.Storage.Delete(ctx, name, deleteValidation, options)
return obj, async, err
}
// DeleteCollection overrides the default behavior of the Storage and delete from both the LegacyStorage and Storage.
func (d *DualWriter) DeleteCollection(ctx context.Context, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) (runtime.Object, error) {
if legacy, ok := d.legacy.(rest.CollectionDeleter); ok {
_, err := legacy.DeleteCollection(ctx, deleteValidation, options, listOptions)
if err != nil {
return nil, err
out, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions)
if err == nil {
if legacy, ok := d.legacy.(rest.CollectionDeleter); ok {
out, err = legacy.DeleteCollection(ctx, deleteValidation, options, listOptions)
}
}
return d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions)
return out, err
}
type updateWrapper struct {
upstream rest.UpdatedObjectInfo
updated runtime.Object
}
// Returns preconditions built from the updated object, if applicable.
// May return nil, or a preconditions object containing nil fields,
// if no preconditions can be determined from the updated object.
func (u *updateWrapper) Preconditions() *metav1.Preconditions {
return u.upstream.Preconditions()
}
// UpdatedObject returns the updated object, given a context and old object.
// The only time an empty oldObj should be passed in is if a "create on update" is occurring (there is no oldObj).
func (u *updateWrapper) UpdatedObject(ctx context.Context, oldObj runtime.Object) (newObj runtime.Object, err error) {
return u.updated, nil
}

View File

@ -7,6 +7,7 @@ package file
import (
"bytes"
"errors"
"os"
"path/filepath"
@ -55,6 +56,9 @@ func readDirRecursive(codec runtime.Codec, path string, newFunc func() runtime.O
return nil
})
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return objs, nil
}
return nil, err
}
return objs, nil

View File

@ -74,7 +74,6 @@ func NewK8sTestHelper(t *testing.T, opts testinfra.GrafanaOpts) *K8sTestHelper {
}
func (c *K8sTestHelper) Shutdown() {
fmt.Printf("calling shutdown on: %s\n", c.env.Server.HTTPServer.Listener.Addr())
err := c.env.Server.Shutdown(context.Background(), "done")
require.NoError(c.t, err)
}

View File

@ -75,6 +75,18 @@ func TestPlaylist(t *testing.T) {
},
}))
})
t.Run("with dual write", func(t *testing.T) {
doPlaylistTests(t, apis.NewK8sTestHelper(t, testinfra.GrafanaOpts{
AppModeProduction: true,
DisableAnonymous: true,
APIServerStorageType: "file", // write the files to disk
EnableFeatureToggles: []string{
featuremgmt.FlagGrafanaAPIServer,
featuremgmt.FlagKubernetesPlaylists, // Required so that legacy calls are also written
},
}))
})
}
func doPlaylistTests(t *testing.T, helper *apis.K8sTestHelper) *apis.K8sTestHelper {

View File

@ -342,6 +342,13 @@ func CreateGrafDir(t *testing.T, opts ...GrafanaOpts) (string, string) {
require.NoError(t, err)
}
if o.APIServerStorageType != "" {
section, err := getOrCreateSection("grafana-apiserver")
require.NoError(t, err)
_, err = section.NewKey("storage_type", o.APIServerStorageType)
require.NoError(t, err)
}
if o.GRPCServerAddress != "" {
logSection, err := getOrCreateSection("grpc_server")
require.NoError(t, err)
@ -397,6 +404,7 @@ type GrafanaOpts struct {
EnableLog bool
GRPCServerAddress string
QueryRetries int64
APIServerStorageType string
}
func CreateUser(t *testing.T, store *sqlstore.SQLStore, cmd user.CreateUserCommand) *user.User {