Storage: Add mode 2 dual writing improvements (#87204)

* Fix mode 2 List test
* Set origin timestamp during conversion to k8s resource
* Add instructions for updating a playlist
* Handle partial deletions of a collection in mode 2
This commit is contained in:
Arati R 2024-05-02 16:06:51 +02:00 committed by GitHub
parent b1d98939e1
commit 9e6de035c0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 173 additions and 85 deletions

View File

@ -154,7 +154,7 @@ func TestMode1_List(t *testing.T) {
{
name: "error when listing an object in the legacy store is not implemented",
setupLegacyFn: func(m *mock.Mock) {
m.On("List", context.Background(), mock.Anything).Return(nil, errors.New("error"))
m.On("List", context.Background(), mock.Anything).Return(&example.PodList{}, errors.New("error"))
},
},
// TODO: legacy list is missing
@ -183,8 +183,6 @@ func TestMode1_List(t *testing.T) {
assert.Error(t, err)
continue
}
us.AssertNotCalled(t, "List", context.Background(), &metainternalversion.ListOptions{})
}
}

View File

@ -93,34 +93,15 @@ func (d *DualWriterMode2) List(ctx context.Context, options *metainternalversion
return nil, err
}
originKeys := []string{}
indexMap := map[string]int{}
for i, obj := range legacyList {
metaAccessor, err := utils.MetaAccessor(obj)
if err != nil {
return nil, err
}
originKeys = append(originKeys, metaAccessor.GetOriginKey())
accessor, err := meta.Accessor(obj)
if err != nil {
return nil, err
}
// Record the index of each LegacyStorage object so it can later be replaced by
// an equivalent Storage object if it exists.
indexMap[accessor.GetName()] = i
}
if len(originKeys) == 0 {
return ll, nil
}
r, err := labels.NewRequirement(utils.AnnoKeyOriginKey, selection.In, originKeys)
// Record the index of each LegacyStorage object so it can later be replaced by
// an equivalent Storage object if it exists.
optionsStorage, indexMap, err := parseList(legacyList)
if err != nil {
return nil, err
}
optionsStorage := metainternalversion.ListOptions{
LabelSelector: labels.NewSelector().Add(*r),
if optionsStorage.LabelSelector == nil {
return ll, nil
}
sl, err := d.Storage.List(ctx, &optionsStorage)
@ -155,30 +136,32 @@ func (d *DualWriterMode2) DeleteCollection(ctx context.Context, deleteValidation
return nil, errDualWriterCollectionDeleterMissing
}
// #TODO: figure out how to handle partial deletions
deleted, err := legacy.DeleteCollection(ctx, deleteValidation, options, listOptions)
if err != nil {
klog.FromContext(ctx).Error(err, "failed to delete collection successfully from legacy storage", "deletedObjects", deleted)
}
res, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions)
legacyList, err := meta.ExtractList(deleted)
if err != nil {
klog.FromContext(ctx).Error(err, "failed to delete collection successfully from Storage", "deletedObjects", deleted)
return nil, err
}
// Only the items deleted by the legacy DeleteCollection call are selected for deletion by Storage.
optionsStorage, _, err := parseList(legacyList)
if err != nil {
return nil, err
}
if optionsStorage.LabelSelector == nil {
return deleted, nil
}
res, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, &optionsStorage)
if err != nil {
klog.FromContext(ctx).Error(err, "failed to delete collection successfully from Storage", "deletedObjects", res)
}
return res, err
}
func enrichObject(accessorO, accessorC metav1.Object) {
accessorC.SetLabels(accessorO.GetLabels())
ac := accessorC.GetAnnotations()
for k, v := range accessorO.GetAnnotations() {
ac[k] = v
}
accessorC.SetAnnotations(ac)
}
func (d *DualWriterMode2) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
legacy, ok := d.Legacy.(rest.GracefulDeleter)
if !ok {
@ -264,3 +247,45 @@ func (d *DualWriterMode2) Update(ctx context.Context, name string, objInfo rest.
// it doesn't exist: https://github.com/grafana/grafana/pull/85206
return d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
}
func parseList(legacyList []runtime.Object) (metainternalversion.ListOptions, map[string]int, error) {
options := metainternalversion.ListOptions{}
originKeys := []string{}
indexMap := map[string]int{}
for i, obj := range legacyList {
metaAccessor, err := utils.MetaAccessor(obj)
if err != nil {
return options, nil, err
}
originKeys = append(originKeys, metaAccessor.GetOriginKey())
accessor, err := meta.Accessor(obj)
if err != nil {
return options, nil, err
}
indexMap[accessor.GetName()] = i
}
if len(originKeys) == 0 {
return options, nil, nil
}
r, err := labels.NewRequirement(utils.AnnoKeyOriginKey, selection.In, originKeys)
if err != nil {
return options, nil, err
}
options.LabelSelector = labels.NewSelector().Add(*r)
return options, indexMap, nil
}
func enrichObject(accessorO, accessorC metav1.Object) {
accessorC.SetLabels(accessorO.GetLabels())
ac := accessorC.GetAnnotations()
for k, v := range accessorO.GetAnnotations() {
ac[k] = v
}
accessorC.SetAnnotations(ac)
}

View File

@ -5,22 +5,35 @@ import (
"errors"
"testing"
"github.com/grafana/grafana/pkg/services/apiserver/utils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apiserver/pkg/apis/example"
)
var createFn = func(context.Context, runtime.Object) error { return nil }
var exampleOption = &metainternalversion.ListOptions{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "foo",
var exampleOption = &metainternalversion.ListOptions{}
var legacyItem = example.Pod{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
ResourceVersion: "1",
Annotations: map[string]string{
"grafana.app/originKey": "1",
},
},
Spec: example.PodSpec{},
Status: example.PodStatus{},
}
func TestMode2_Create(t *testing.T) {
@ -157,9 +170,24 @@ func TestMode2_Get(t *testing.T) {
}
func TestMode2_List(t *testing.T) {
storageItem := legacyItem.DeepCopy()
storageItem.Labels = map[string]string{"exampleLabel": "value"}
legacyList := example.PodList{Items: []example.Pod{legacyItem}}
storageList := example.PodList{Items: []example.Pod{*storageItem}}
expectedList := storageList.DeepCopy()
r, err := labels.NewRequirement(utils.AnnoKeyOriginKey, selection.In, []string{"1"})
assert.NoError(t, err)
storageOptions := &metainternalversion.ListOptions{
LabelSelector: labels.NewSelector().Add(*r),
TypeMeta: metav1.TypeMeta{},
}
type testCase struct {
name string
input *metainternalversion.ListOptions
inputLegacy *metainternalversion.ListOptions
inputStorage *metainternalversion.ListOptions
setupLegacyFn func(m *mock.Mock, input *metainternalversion.ListOptions)
setupStorageFn func(m *mock.Mock, input *metainternalversion.ListOptions)
wantErr bool
@ -167,15 +195,15 @@ func TestMode2_List(t *testing.T) {
tests :=
[]testCase{
{
name: "error when legacy list is not implmented",
input: exampleOption,
name: "object present in both Storage and LegacyStorage",
inputLegacy: exampleOption,
inputStorage: storageOptions,
setupLegacyFn: func(m *mock.Mock, input *metainternalversion.ListOptions) {
m.On("List", context.Background(), input).Return(exampleObj, nil)
m.On("List", context.Background(), input).Return(&legacyList, nil)
},
setupStorageFn: func(m *mock.Mock, input *metainternalversion.ListOptions) {
m.On("List", context.Background(), input).Return(exampleObj, nil)
m.On("List", context.Background(), input).Return(&storageList, nil)
},
wantErr: true,
},
}
@ -188,10 +216,10 @@ func TestMode2_List(t *testing.T) {
us := storageMock{m, s}
if tt.setupLegacyFn != nil {
tt.setupLegacyFn(m, tt.input)
tt.setupLegacyFn(m, tt.inputLegacy)
}
if tt.setupStorageFn != nil {
tt.setupStorageFn(m, tt.input)
tt.setupStorageFn(m, tt.inputStorage)
}
dw := SelectDualWriter(Mode2, ls, us)
@ -203,7 +231,7 @@ func TestMode2_List(t *testing.T) {
continue
}
assert.Equal(t, obj, exampleObj)
assert.Equal(t, expectedList, obj)
}
}
@ -301,46 +329,63 @@ func TestMode2_Delete(t *testing.T) {
}
func TestMode2_DeleteCollection(t *testing.T) {
storageItem := legacyItem.DeepCopy()
storageItem.Labels = map[string]string{"exampleLabel": "value"}
legacyList := example.PodList{Items: []example.Pod{legacyItem}}
storageList := example.PodList{Items: []example.Pod{*storageItem}}
expectedList := storageList.DeepCopy()
r, err := labels.NewRequirement(utils.AnnoKeyOriginKey, selection.In, []string{"1"})
assert.NoError(t, err)
storageOptions := &metainternalversion.ListOptions{
LabelSelector: labels.NewSelector().Add(*r),
TypeMeta: metav1.TypeMeta{},
}
type testCase struct {
name string
input *metav1.DeleteOptions
setupLegacyFn func(m *mock.Mock, input *metav1.DeleteOptions)
setupStorageFn func(m *mock.Mock, input *metav1.DeleteOptions)
legacyInput *metainternalversion.ListOptions
storageInput *metainternalversion.ListOptions
setupLegacyFn func(m *mock.Mock, input *metainternalversion.ListOptions)
setupStorageFn func(m *mock.Mock, input *metainternalversion.ListOptions)
wantErr bool
expectedList *example.PodList
}
tests :=
[]testCase{
{
name: "deleting a collection in both stores",
input: &metav1.DeleteOptions{TypeMeta: metav1.TypeMeta{Kind: "foo"}},
setupLegacyFn: func(m *mock.Mock, input *metav1.DeleteOptions) {
m.On("DeleteCollection", context.Background(), mock.Anything, input, mock.Anything).Return(exampleObj, nil)
name: "deleting a collection in both stores",
legacyInput: exampleOption,
storageInput: storageOptions,
setupLegacyFn: func(m *mock.Mock, input *metainternalversion.ListOptions) {
m.On("DeleteCollection", context.Background(), mock.Anything, mock.Anything, input).Return(&legacyList, nil)
},
setupStorageFn: func(m *mock.Mock, input *metav1.DeleteOptions) {
m.On("DeleteCollection", context.Background(), mock.Anything, input, mock.Anything).Return(exampleObj, nil)
setupStorageFn: func(m *mock.Mock, input *metainternalversion.ListOptions) {
m.On("DeleteCollection", context.Background(), mock.Anything, mock.Anything, input).Return(&storageList, nil)
},
expectedList: expectedList,
},
{
name: "error deleting a collection in the storage when legacy store is successful",
input: &metav1.DeleteOptions{TypeMeta: metav1.TypeMeta{Kind: "fail"}},
setupLegacyFn: func(m *mock.Mock, input *metav1.DeleteOptions) {
m.On("DeleteCollection", context.Background(), mock.Anything, &metav1.DeleteOptions{TypeMeta: metav1.TypeMeta{Kind: "foo"}}, mock.Anything).Return(exampleObj, nil)
name: "error deleting a collection in the storage when legacy store is successful",
setupLegacyFn: func(m *mock.Mock, input *metainternalversion.ListOptions) {
m.On("DeleteCollection", context.Background(), mock.Anything, mock.Anything, input).Return(exampleObj, nil)
},
setupStorageFn: func(m *mock.Mock, input *metav1.DeleteOptions) {
m.On("DeleteCollection", context.Background(), mock.Anything, input, mock.Anything).Return(nil, errors.New("error"))
setupStorageFn: func(m *mock.Mock, input *metainternalversion.ListOptions) {
m.On("DeleteCollection", context.Background(), mock.Anything, mock.Anything, input).Return(nil, errors.New("error"))
},
wantErr: true,
wantErr: true,
expectedList: &example.PodList{},
},
{
name: "error deleting a collection when error in both stores",
input: &metav1.DeleteOptions{TypeMeta: metav1.TypeMeta{Kind: "fail"}},
setupLegacyFn: func(m *mock.Mock, input *metav1.DeleteOptions) {
m.On("DeleteCollection", context.Background(), mock.Anything, input, mock.Anything).Return(nil, errors.New("error"))
name: "deleting a collection when error in both stores",
setupLegacyFn: func(m *mock.Mock, input *metainternalversion.ListOptions) {
m.On("DeleteCollection", context.Background(), mock.Anything, mock.Anything, input).Return(&example.PodList{}, errors.New("error"))
},
setupStorageFn: func(m *mock.Mock, input *metav1.DeleteOptions) {
m.On("DeleteCollection", context.Background(), mock.Anything, input, mock.Anything).Return(nil, errors.New("error"))
setupStorageFn: func(m *mock.Mock, input *metainternalversion.ListOptions) {
m.On("DeleteCollection", context.Background(), mock.Anything, mock.Anything, input).Return(&example.PodList{}, errors.New("error"))
},
wantErr: true,
expectedList: &example.PodList{},
},
}
@ -353,23 +398,22 @@ func TestMode2_DeleteCollection(t *testing.T) {
us := storageMock{m, s}
if tt.setupLegacyFn != nil {
tt.setupLegacyFn(m, tt.input)
tt.setupLegacyFn(m, tt.legacyInput)
}
if tt.setupStorageFn != nil {
tt.setupStorageFn(m, tt.input)
tt.setupStorageFn(m, tt.storageInput)
}
dw := SelectDualWriter(Mode2, ls, us)
obj, err := dw.DeleteCollection(context.Background(), func(ctx context.Context, obj runtime.Object) error { return nil }, tt.input, &metainternalversion.ListOptions{})
obj, err := dw.DeleteCollection(context.Background(), func(ctx context.Context, obj runtime.Object) error { return nil }, &metav1.DeleteOptions{}, tt.legacyInput)
if tt.wantErr {
assert.Error(t, err)
continue
}
assert.Equal(t, obj, exampleObj)
assert.NotEqual(t, obj, anotherObj)
assert.Equal(t, tt.expectedList, obj)
}
}

View File

@ -50,6 +50,10 @@ func (m legacyStoreMock) List(ctx context.Context, options *metainternalversion.
return args.Get(0).(runtime.Object), args.Error(1)
}
func (m legacyStoreMock) NewList() runtime.Object {
return nil
}
func (m legacyStoreMock) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
args := m.Called(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
if name == "object-fail" {
@ -110,6 +114,10 @@ func (m storageMock) List(ctx context.Context, options *metainternalversion.List
return args.Get(0).(runtime.Object), args.Error(1)
}
func (m storageMock) NewList() runtime.Object {
return nil
}
func (m storageMock) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
args := m.Called(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
if name == "object-fail" {

View File

@ -91,9 +91,11 @@ func convertToK8sResource(v *playlistsvc.PlaylistDTO, namespacer request.Namespa
if err == nil {
meta.SetUpdatedTimestampMillis(v.UpdatedAt)
if v.Id > 0 {
createdAt := time.UnixMilli(v.CreatedAt)
meta.SetOriginInfo(&utils.ResourceOriginInfo{
Name: "SQL",
Key: fmt.Sprintf("%d", v.Id),
Name: "SQL",
Key: fmt.Sprintf("%d", v.Id),
Timestamp: &createdAt,
})
}
}

View File

@ -44,6 +44,7 @@ func TestPlaylistConversion(t *testing.T) {
"annotations": {
"grafana.app/originKey": "123",
"grafana.app/originName": "SQL",
"grafana.app/originTimestamp":"1970-01-01T00:00:12Z",
"grafana.app/updatedTimestamp": "1970-01-01T00:00:54Z"
}
},

View File

@ -100,6 +100,9 @@ metadata:
generateName: x # anything is ok here... except yes or true -- they become boolean!
labels:
foo: bar
annotations:
grafana.app/slug: "slugger"
grafana.app/updatedBy: "updater"
spec:
title: Playlist with auto generated UID
interval: 5m
@ -129,6 +132,13 @@ NAME TITLE INTERV
u394j4d3-s63j-2d74-g8hf-958773jtybf2 Playlist with auto generated UID 5m 2023-12-14T13:53:35Z
```
To update the playlist, update the `playlist-generate.yaml` file then run:
```sh
kubectl --kubeconfig=./grafana.kubeconfig patch playlist <NAME> --patch-file playlist-generate.yaml
```
In the example, `<NAME>` would be `u394j4d3-s63j-2d74-g8hf-958773jtybf2`.
### Use a separate database
By default Unified Storage uses the Grafana database. To run against a separate database, update `custom.ini` by adding the following section to it: