diff --git a/pkg/apiserver/rest/dualwriter.go b/pkg/apiserver/rest/dualwriter.go index 09eafb93da6..1d42944d3e9 100644 --- a/pkg/apiserver/rest/dualwriter.go +++ b/pkg/apiserver/rest/dualwriter.go @@ -6,7 +6,6 @@ import ( "encoding/json" "errors" "fmt" - "math/rand" "time" "github.com/prometheus/client_golang/prometheus" @@ -306,54 +305,3 @@ func getName(o runtime.Object) string { } return accessor.GetName() } - -const dataSyncerInterval = 60 * time.Minute - -// StartPeriodicDataSyncer starts a background job that will execute the DataSyncer every 60 minutes -func StartPeriodicDataSyncer(ctx context.Context, mode DualWriterMode, legacy LegacyStorage, storage Storage, - kind string, reg prometheus.Registerer, serverLockService ServerLockService, requestInfo *request.RequestInfo) { - klog.Info("Starting periodic data syncer for mode mode: ", mode) - - // run in background - go func() { - r := rand.New(rand.NewSource(time.Now().UnixNano())) - timeWindow := 600 // 600 seconds (10 minutes) - jitterSeconds := r.Int63n(int64(timeWindow)) - klog.Info("data syncer is going to start at: ", time.Now().Add(time.Second*time.Duration(jitterSeconds))) - time.Sleep(time.Second * time.Duration(jitterSeconds)) - - // run it immediately - syncOK, err := runDataSyncer(ctx, mode, legacy, storage, kind, reg, serverLockService, requestInfo) - klog.Info("data syncer finished, syncOK: ", syncOK, ", error: ", err) - - ticker := time.NewTicker(dataSyncerInterval) - for { - select { - case <-ticker.C: - syncOK, err = runDataSyncer(ctx, mode, legacy, storage, kind, reg, serverLockService, requestInfo) - klog.Info("data syncer finished, syncOK: ", syncOK, ", error: ", err) - case <-ctx.Done(): - return - } - } - }() -} - -// runDataSyncer will ensure that data between legacy storage and unified storage are in sync. -// The sync implementation depends on the DualWriter mode -func runDataSyncer(ctx context.Context, mode DualWriterMode, legacy LegacyStorage, storage Storage, - kind string, reg prometheus.Registerer, serverLockService ServerLockService, requestInfo *request.RequestInfo) (bool, error) { - // ensure that execution takes no longer than necessary - const timeout = dataSyncerInterval - time.Minute - ctx, cancelFn := context.WithTimeout(ctx, timeout) - defer cancelFn() - - // implementation depends on the current DualWriter mode - switch mode { - case Mode2: - return mode2DataSyncer(ctx, legacy, storage, kind, reg, serverLockService, requestInfo) - default: - klog.Info("data syncer not implemented for mode mode:", mode) - return false, nil - } -} diff --git a/pkg/apiserver/rest/dualwriter_mode2.go b/pkg/apiserver/rest/dualwriter_mode2.go index c1356c5de16..8a2a989cdb7 100644 --- a/pkg/apiserver/rest/dualwriter_mode2.go +++ b/pkg/apiserver/rest/dualwriter_mode2.go @@ -2,21 +2,16 @@ package rest import ( "context" - "fmt" "time" - "github.com/prometheus/client_golang/prometheus" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" "k8s.io/klog/v2" - "github.com/grafana/authlib/claims" - "github.com/grafana/grafana/pkg/apimachinery/identity" "github.com/grafana/grafana/pkg/apimachinery/utils" ) @@ -392,213 +387,3 @@ func enrichLegacyObject(originalObj, returnedObj runtime.Object) error { accessorReturned.SetUID(accessorOriginal.GetUID()) return nil } - -func getSyncRequester(orgId int64) *identity.StaticRequester { - return &identity.StaticRequester{ - Type: claims.TypeServiceAccount, // system:apiserver - UserID: 1, - OrgID: orgId, - Name: "admin", - Login: "admin", - OrgRole: identity.RoleAdmin, - IsGrafanaAdmin: true, - Permissions: map[int64]map[string][]string{ - orgId: { - "*": {"*"}, // all resources, all scopes - }, - }, - } -} - -type syncItem struct { - name string - objStorage runtime.Object - objLegacy runtime.Object -} - -func getList(ctx context.Context, obj rest.Lister, listOptions *metainternalversion.ListOptions) ([]runtime.Object, error) { - ll, err := obj.List(ctx, listOptions) - if err != nil { - return nil, err - } - - return meta.ExtractList(ll) -} - -func mode2DataSyncer(ctx context.Context, legacy LegacyStorage, storage Storage, resource string, reg prometheus.Registerer, serverLockService ServerLockService, requestInfo *request.RequestInfo) (bool, error) { - metrics := &dualWriterMetrics{} - metrics.init(reg) - - log := klog.NewKlogr().WithName("DualWriterMode2Syncer") - - everythingSynced := false - outOfSync := 0 - syncSuccess := 0 - syncErr := 0 - - maxInterval := dataSyncerInterval + 5*time.Minute - - var errSync error - const maxRecordsSync = 1000 - - // LockExecuteAndRelease ensures that just a single Grafana server acquires a lock at a time - // The parameter 'maxInterval' is a timeout safeguard, if the LastExecution in the - // database is older than maxInterval, we will assume the lock as timeouted. The 'maxInterval' parameter should be so long - // that is impossible for 2 processes to run at the same time. - err := serverLockService.LockExecuteAndRelease(ctx, "dualwriter mode 2 sync", maxInterval, func(context.Context) { - log.Info("starting dualwriter mode 2 sync") - startSync := time.Now() - - orgId := int64(1) - - ctx = klog.NewContext(ctx, log) - ctx = identity.WithRequester(ctx, getSyncRequester(orgId)) - ctx = request.WithNamespace(ctx, requestInfo.Namespace) - ctx = request.WithRequestInfo(ctx, requestInfo) - - storageList, err := getList(ctx, storage, &metainternalversion.ListOptions{ - Limit: maxRecordsSync, - }) - if err != nil { - log.Error(err, "unable to extract list from storage") - return - } - - if len(storageList) >= maxRecordsSync { - errSync = fmt.Errorf("unified storage has more than %d records. Aborting sync", maxRecordsSync) - log.Error(errSync, "Unified storage has more records to be synced than allowed") - return - } - - log.Info("got items from unified storage", "items", len(storageList)) - - legacyList, err := getList(ctx, legacy, &metainternalversion.ListOptions{}) - if err != nil { - log.Error(err, "unable to extract list from legacy storage") - return - } - log.Info("got items from legacy storage", "items", len(legacyList)) - - itemsByName := map[string]syncItem{} - for _, obj := range legacyList { - accessor, err := utils.MetaAccessor(obj) - if err != nil { - log.Error(err, "error retrieving accessor data for object from legacy storage") - continue - } - name := accessor.GetName() - - item, ok := itemsByName[name] - if !ok { - item = syncItem{} - } - item.name = name - item.objLegacy = obj - itemsByName[name] = item - } - - for _, obj := range storageList { - accessor, err := utils.MetaAccessor(obj) - if err != nil { - log.Error(err, "error retrieving accessor data for object from storage") - continue - } - name := accessor.GetName() - - item, ok := itemsByName[name] - if !ok { - item = syncItem{} - } - item.name = name - item.objStorage = obj - itemsByName[name] = item - } - log.Info("got list of items to be synced", "items", len(itemsByName)) - - for name, item := range itemsByName { - // upsert if: - // - existing in both legacy and storage, but objects are different, or - // - if it's missing from storage - if item.objLegacy != nil && - ((item.objStorage != nil && !Compare(item.objLegacy, item.objStorage)) || (item.objStorage == nil)) { - outOfSync++ - - accessor, err := utils.MetaAccessor(item.objLegacy) - if err != nil { - log.Error(err, "error retrieving accessor data for object from storage") - continue - } - - if item.objStorage != nil { - accessorStorage, err := utils.MetaAccessor(item.objStorage) - if err != nil { - log.Error(err, "error retrieving accessor data for object from storage") - continue - } - accessor.SetResourceVersion(accessorStorage.GetResourceVersion()) - accessor.SetUID(accessorStorage.GetUID()) - - log.Info("updating item on unified storage", "name", name) - } else { - accessor.SetResourceVersion("") - accessor.SetUID("") - - log.Info("inserting item on unified storage", "name", name) - } - - objInfo := rest.DefaultUpdatedObjectInfo(item.objLegacy, []rest.TransformFunc{}...) - res, _, err := storage.Update(ctx, - name, - objInfo, - func(ctx context.Context, obj runtime.Object) error { return nil }, - func(ctx context.Context, obj, old runtime.Object) error { return nil }, - true, // force creation - &metav1.UpdateOptions{}, - ) - if err != nil { - log.WithValues("object", res).Error(err, "could not update in storage") - syncErr++ - } else { - syncSuccess++ - } - } - - // delete if object does not exists on legacy but exists on storage - if item.objLegacy == nil && item.objStorage != nil { - outOfSync++ - - ctx = request.WithRequestInfo(ctx, &request.RequestInfo{ - APIGroup: requestInfo.APIGroup, - Resource: requestInfo.Resource, - Name: name, - Namespace: requestInfo.Namespace, - }) - - log.Info("deleting item from unified storage", "name", name) - - deletedS, _, err := storage.Delete(ctx, name, func(ctx context.Context, obj runtime.Object) error { return nil }, &metav1.DeleteOptions{}) - if err != nil { - if !apierrors.IsNotFound(err) { - log.WithValues("objectList", deletedS).Error(err, "could not delete from storage") - } - syncErr++ - } else { - syncSuccess++ - } - } - } - - everythingSynced = outOfSync == syncSuccess - - metrics.recordDataSyncerOutcome(mode2Str, resource, everythingSynced) - metrics.recordDataSyncerDuration(err != nil, mode2Str, resource, startSync) - - log.Info("finished syncing items", "items", len(itemsByName), "updated", syncSuccess, "failed", syncErr, "outcome", everythingSynced) - }) - - if errSync != nil { - err = errSync - } - - return everythingSynced, err -} diff --git a/pkg/apiserver/rest/dualwriter_mode2_test.go b/pkg/apiserver/rest/dualwriter_mode2_test.go index 809e10fe645..fd7e168b84a 100644 --- a/pkg/apiserver/rest/dualwriter_mode2_test.go +++ b/pkg/apiserver/rest/dualwriter_mode2_test.go @@ -4,7 +4,6 @@ import ( "context" "errors" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -16,7 +15,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apiserver/pkg/apis/example" - "k8s.io/apiserver/pkg/endpoints/request" ) var createFn = func(context.Context, runtime.Object) error { return nil } @@ -609,197 +607,3 @@ func TestEnrichReturnedObject(t *testing.T) { }) } } - -var legacyObj1 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo1", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}} -var legacyObj2 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo2", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}} -var legacyObj3 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo3", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}} -var legacyObj4 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo4", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}} - -var legacyObj2WithHostname = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo2", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{Hostname: "hostname"}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}} - -var storageObj1 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo1", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}} -var storageObj2 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo2", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}} -var storageObj3 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo3", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}} -var storageObj4 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo4", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}} - -var legacyListWith3items = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{}, - Items: []example.Pod{ - *legacyObj1, - *legacyObj2, - *legacyObj3, - }} - -var legacyListWith4items = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{}, - Items: []example.Pod{ - *legacyObj1, - *legacyObj2, - *legacyObj3, - *legacyObj4, - }} - -var legacyListWith3itemsObj2IsDifferent = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{}, - Items: []example.Pod{ - *legacyObj1, - *legacyObj2WithHostname, - *legacyObj3, - }} - -var storageListWith3items = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{}, - Items: []example.Pod{ - *storageObj1, - *storageObj2, - *storageObj3, - }} - -var storageListWith4items = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{}, - Items: []example.Pod{ - *storageObj1, - *storageObj2, - *storageObj3, - *storageObj4, - }} - -var storageListWith3itemsMissingFoo2 = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{}, - Items: []example.Pod{ - *storageObj1, - *storageObj3, - *storageObj4, - }} - -func TestMode2_DataSyncer(t *testing.T) { - type testCase struct { - setupLegacyFn func(m *mock.Mock) - setupStorageFn func(m *mock.Mock) - name string - expectedOutcome bool - wantErr bool - } - tests := - []testCase{ - { - name: "both stores are in sync", - setupLegacyFn: func(m *mock.Mock) { - m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil) - }, - setupStorageFn: func(m *mock.Mock) { - m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil) - }, - expectedOutcome: true, - }, - { - name: "both stores are in sync - fail to list from legacy", - setupLegacyFn: func(m *mock.Mock) { - m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, errors.New("error")) - }, - setupStorageFn: func(m *mock.Mock) { - m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil) - }, - expectedOutcome: false, - }, - { - name: "both stores are in sync - fail to list from storage", - setupLegacyFn: func(m *mock.Mock) { - m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil) - }, - setupStorageFn: func(m *mock.Mock) { - m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, errors.New("error")) - }, - expectedOutcome: false, - }, - { - name: "storage is missing 1 entry (foo4)", - setupLegacyFn: func(m *mock.Mock) { - m.On("List", mock.Anything, mock.Anything).Return(legacyListWith4items, nil) - }, - setupStorageFn: func(m *mock.Mock) { - m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil) - m.On("Update", mock.Anything, "foo4", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil) - }, - expectedOutcome: true, - }, - { - name: "storage needs to be update (foo2 is different)", - setupLegacyFn: func(m *mock.Mock) { - m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3itemsObj2IsDifferent, nil) - }, - setupStorageFn: func(m *mock.Mock) { - m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil) - m.On("Update", mock.Anything, "foo2", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil) - }, - expectedOutcome: true, - }, - { - name: "storage is missing 1 entry (foo4) - fail to upsert", - setupLegacyFn: func(m *mock.Mock) { - m.On("List", mock.Anything, mock.Anything).Return(legacyListWith4items, nil) - }, - setupStorageFn: func(m *mock.Mock) { - m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil) - m.On("Update", mock.Anything, "foo4", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, errors.New("error")) - }, - expectedOutcome: false, - }, - { - name: "storage has an extra 1 entry (foo4)", - setupLegacyFn: func(m *mock.Mock) { - m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil) - }, - setupStorageFn: func(m *mock.Mock) { - m.On("List", mock.Anything, mock.Anything).Return(storageListWith4items, nil) - m.On("Delete", mock.Anything, "foo4", mock.Anything, mock.Anything).Return(exampleObj, false, nil) - }, - expectedOutcome: true, - }, - { - name: "storage has an extra 1 entry (foo4) - fail to delete", - setupLegacyFn: func(m *mock.Mock) { - m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil) - }, - setupStorageFn: func(m *mock.Mock) { - m.On("List", mock.Anything, mock.Anything).Return(storageListWith4items, nil) - m.On("Delete", mock.Anything, "foo4", mock.Anything, mock.Anything).Return(exampleObj, false, errors.New("error")) - }, - expectedOutcome: false, - }, - { - name: "storage is missing 1 entry (foo3) and has an extra 1 entry (foo4)", - setupLegacyFn: func(m *mock.Mock) { - m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil) - }, - setupStorageFn: func(m *mock.Mock) { - m.On("List", mock.Anything, mock.Anything).Return(storageListWith3itemsMissingFoo2, nil) - m.On("Update", mock.Anything, "foo2", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil) - m.On("Delete", mock.Anything, "foo4", mock.Anything, mock.Anything).Return(exampleObj, false, nil) - }, - expectedOutcome: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - l := (LegacyStorage)(nil) - s := (Storage)(nil) - lm := &mock.Mock{} - um := &mock.Mock{} - - ls := legacyStoreMock{lm, l} - us := storageMock{um, s} - - if tt.setupLegacyFn != nil { - tt.setupLegacyFn(lm) - } - if tt.setupStorageFn != nil { - tt.setupStorageFn(um) - } - - outcome, err := mode2DataSyncer(context.Background(), ls, us, "test.kind", p, &fakeServerLock{}, &request.RequestInfo{}) - if tt.wantErr { - assert.Error(t, err) - return - } - - assert.NoError(t, err) - assert.Equal(t, tt.expectedOutcome, outcome) - }) - } -} diff --git a/pkg/apiserver/rest/dualwriter_syncer.go b/pkg/apiserver/rest/dualwriter_syncer.go new file mode 100644 index 00000000000..34bfef41a19 --- /dev/null +++ b/pkg/apiserver/rest/dualwriter_syncer.go @@ -0,0 +1,271 @@ +package rest + +import ( + "context" + "fmt" + "math/rand" + "time" + + "github.com/prometheus/client_golang/prometheus" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/registry/rest" + "k8s.io/klog/v2" + + "github.com/grafana/authlib/claims" + "github.com/grafana/grafana/pkg/apimachinery/identity" + "github.com/grafana/grafana/pkg/apimachinery/utils" +) + +type syncItem struct { + name string + objStorage runtime.Object + objLegacy runtime.Object + accessorStorage utils.GrafanaMetaAccessor + accessorLegacy utils.GrafanaMetaAccessor +} + +const dataSyncerInterval = 60 * time.Minute + +// StartPeriodicDataSyncer starts a background job that will execute the DataSyncer every 60 minutes +func StartPeriodicDataSyncer(ctx context.Context, mode DualWriterMode, legacy LegacyStorage, storage Storage, + kind string, reg prometheus.Registerer, serverLockService ServerLockService, requestInfo *request.RequestInfo) { + log := klog.NewKlogr().WithName("legacyToUnifiedStorageDataSyncer").WithValues("mode", mode, "resource", kind) + + log.Info("Starting periodic data syncer") + + // run in background + go func() { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + timeWindow := 600 // 600 seconds (10 minutes) + jitterSeconds := r.Int63n(int64(timeWindow)) + log.Info("data syncer scheduled", "starting time", time.Now().Add(time.Second*time.Duration(jitterSeconds))) + time.Sleep(time.Second * time.Duration(jitterSeconds)) + + // run it immediately + syncOK, err := runDataSyncer(ctx, mode, legacy, storage, kind, reg, serverLockService, requestInfo) + log.Info("data syncer finished", "syncOK", syncOK, "error", err) + + ticker := time.NewTicker(dataSyncerInterval) + for { + select { + case <-ticker.C: + syncOK, err = runDataSyncer(ctx, mode, legacy, storage, kind, reg, serverLockService, requestInfo) + log.Info("data syncer finished", "syncOK", syncOK, ", error", err) + case <-ctx.Done(): + return + } + } + }() +} + +// runDataSyncer will ensure that data between legacy storage and unified storage are in sync. +// The sync implementation depends on the DualWriter mode +func runDataSyncer(ctx context.Context, mode DualWriterMode, legacy LegacyStorage, storage Storage, + kind string, reg prometheus.Registerer, serverLockService ServerLockService, requestInfo *request.RequestInfo) (bool, error) { + // ensure that execution takes no longer than necessary + const timeout = dataSyncerInterval - time.Minute + ctx, cancelFn := context.WithTimeout(ctx, timeout) + defer cancelFn() + + // implementation depends on the current DualWriter mode + switch mode { + case Mode1, Mode2: + return legacyToUnifiedStorageDataSyncer(ctx, mode, legacy, storage, kind, reg, serverLockService, requestInfo) + default: + klog.Info("data syncer not implemented for mode mode:", mode) + return false, nil + } +} + +func legacyToUnifiedStorageDataSyncer(ctx context.Context, mode DualWriterMode, legacy LegacyStorage, storage Storage, resource string, reg prometheus.Registerer, serverLockService ServerLockService, requestInfo *request.RequestInfo) (bool, error) { + metrics := &dualWriterMetrics{} + metrics.init(reg) + + log := klog.NewKlogr().WithName("legacyToUnifiedStorageDataSyncer").WithValues("mode", mode, "resource", resource) + + everythingSynced := false + outOfSync := 0 + syncSuccess := 0 + syncErr := 0 + + maxInterval := dataSyncerInterval + 5*time.Minute + + var errSync error + const maxRecordsSync = 1000 + + // LockExecuteAndRelease ensures that just a single Grafana server acquires a lock at a time + // The parameter 'maxInterval' is a timeout safeguard, if the LastExecution in the + // database is older than maxInterval, we will assume the lock as timeouted. The 'maxInterval' parameter should be so long + // that is impossible for 2 processes to run at the same time. + err := serverLockService.LockExecuteAndRelease(ctx, fmt.Sprintf("legacyToUnifiedStorageDataSyncer-%d-%s", mode, resource), maxInterval, func(context.Context) { + log.Info("starting legacyToUnifiedStorageDataSyncer") + startSync := time.Now() + + orgId := int64(1) + + ctx = klog.NewContext(ctx, log) + ctx = identity.WithRequester(ctx, getSyncRequester(orgId)) + ctx = request.WithNamespace(ctx, requestInfo.Namespace) + ctx = request.WithRequestInfo(ctx, requestInfo) + + storageList, err := getList(ctx, storage, &metainternalversion.ListOptions{ + Limit: maxRecordsSync, + }) + if err != nil { + log.Error(err, "unable to extract list from storage") + return + } + + if len(storageList) >= maxRecordsSync { + errSync = fmt.Errorf("unified storage has more than %d records. Aborting sync", maxRecordsSync) + log.Error(errSync, "Unified storage has more records to be synced than allowed") + return + } + + log.Info("got items from unified storage", "items", len(storageList)) + + legacyList, err := getList(ctx, legacy, &metainternalversion.ListOptions{}) + if err != nil { + log.Error(err, "unable to extract list from legacy storage") + return + } + log.Info("got items from legacy storage", "items", len(legacyList)) + + itemsByName := map[string]syncItem{} + for _, obj := range legacyList { + accessor, err := utils.MetaAccessor(obj) + if err != nil { + log.Error(err, "error retrieving accessor data for object from legacy storage") + continue + } + name := accessor.GetName() + + item := itemsByName[name] + item.name = name + item.objLegacy = obj + item.accessorLegacy = accessor + itemsByName[name] = item + } + + for _, obj := range storageList { + accessor, err := utils.MetaAccessor(obj) + if err != nil { + log.Error(err, "error retrieving accessor data for object from storage") + continue + } + name := accessor.GetName() + + item := itemsByName[name] + item.name = name + item.objStorage = obj + item.accessorStorage = accessor + itemsByName[name] = item + } + log.Info("got list of items to be synced", "items", len(itemsByName)) + + for name, item := range itemsByName { + // upsert if: + // - existing in both legacy and storage, but objects are different, or + // - if it's missing from storage + if item.objLegacy != nil && + (item.objStorage == nil || !Compare(item.objLegacy, item.objStorage)) { + outOfSync++ + + if item.objStorage != nil { + item.accessorLegacy.SetResourceVersion(item.accessorStorage.GetResourceVersion()) + item.accessorLegacy.SetUID(item.accessorStorage.GetUID()) + + log.Info("updating item on unified storage", "name", name) + } else { + item.accessorLegacy.SetResourceVersion("") + item.accessorLegacy.SetUID("") + + log.Info("inserting item on unified storage", "name", name) + } + + objInfo := rest.DefaultUpdatedObjectInfo(item.objLegacy, []rest.TransformFunc{}...) + res, _, err := storage.Update(ctx, + name, + objInfo, + func(ctx context.Context, obj runtime.Object) error { return nil }, + func(ctx context.Context, obj, old runtime.Object) error { return nil }, + true, // force creation + &metav1.UpdateOptions{}, + ) + if err != nil { + log.WithValues("object", res).Error(err, "could not update in storage") + syncErr++ + } else { + syncSuccess++ + } + } + + // delete if object does not exists on legacy but exists on storage + if item.objLegacy == nil && item.objStorage != nil { + outOfSync++ + + ctx = request.WithRequestInfo(ctx, &request.RequestInfo{ + APIGroup: requestInfo.APIGroup, + Resource: requestInfo.Resource, + Name: name, + Namespace: requestInfo.Namespace, + }) + + log.Info("deleting item from unified storage", "name", name) + + deletedS, _, err := storage.Delete(ctx, name, func(ctx context.Context, obj runtime.Object) error { return nil }, &metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + log.WithValues("objectList", deletedS).Error(err, "could not delete from storage") + syncErr++ + continue + } + + syncSuccess++ + } + } + + everythingSynced = outOfSync == syncSuccess + + metrics.recordDataSyncerOutcome(mode, resource, everythingSynced) + metrics.recordDataSyncerDuration(err != nil, mode, resource, startSync) + + log.Info("finished syncing items", "items", len(itemsByName), "updated", syncSuccess, "failed", syncErr, "outcome", everythingSynced) + }) + + if errSync != nil { + err = errSync + } + + return everythingSynced, err +} + +func getSyncRequester(orgId int64) *identity.StaticRequester { + return &identity.StaticRequester{ + Type: claims.TypeServiceAccount, // system:apiserver + UserID: 1, + OrgID: orgId, + Name: "admin", + Login: "admin", + OrgRole: identity.RoleAdmin, + IsGrafanaAdmin: true, + Permissions: map[int64]map[string][]string{ + orgId: { + "*": {"*"}, // all resources, all scopes + }, + }, + } +} + +func getList(ctx context.Context, obj rest.Lister, listOptions *metainternalversion.ListOptions) ([]runtime.Object, error) { + ll, err := obj.List(ctx, listOptions) + if err != nil { + return nil, err + } + + return meta.ExtractList(ll) +} diff --git a/pkg/apiserver/rest/dualwriter_syncer_test.go b/pkg/apiserver/rest/dualwriter_syncer_test.go new file mode 100644 index 00000000000..868cad182a4 --- /dev/null +++ b/pkg/apiserver/rest/dualwriter_syncer_test.go @@ -0,0 +1,238 @@ +package rest + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apiserver/pkg/apis/example" + "k8s.io/apiserver/pkg/endpoints/request" +) + +var legacyObj1 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo1", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}} +var legacyObj2 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo2", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}} +var legacyObj3 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo3", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}} +var legacyObj4 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo4", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}} + +var legacyObj2WithHostname = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo2", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{Hostname: "hostname"}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}} + +var storageObj1 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo1", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}} +var storageObj2 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo2", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}} +var storageObj3 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo3", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}} +var storageObj4 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo4", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}} + +var legacyListWith3items = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{}, + Items: []example.Pod{ + *legacyObj1, + *legacyObj2, + *legacyObj3, + }} + +var legacyListWith4items = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{}, + Items: []example.Pod{ + *legacyObj1, + *legacyObj2, + *legacyObj3, + *legacyObj4, + }} + +var legacyListWith3itemsObj2IsDifferent = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{}, + Items: []example.Pod{ + *legacyObj1, + *legacyObj2WithHostname, + *legacyObj3, + }} + +var storageListWith3items = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{}, + Items: []example.Pod{ + *storageObj1, + *storageObj2, + *storageObj3, + }} + +var storageListWith4items = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{}, + Items: []example.Pod{ + *storageObj1, + *storageObj2, + *storageObj3, + *storageObj4, + }} + +var storageListWith3itemsMissingFoo2 = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{}, + Items: []example.Pod{ + *storageObj1, + *storageObj3, + *storageObj4, + }} + +func TestLegacyToUnifiedStorage_DataSyncer(t *testing.T) { + type testCase struct { + setupLegacyFn func(m *mock.Mock) + setupStorageFn func(m *mock.Mock) + name string + expectedOutcome bool + wantErr bool + } + tests := + []testCase{ + { + name: "both stores are in sync", + setupLegacyFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil) + }, + setupStorageFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil) + }, + expectedOutcome: true, + }, + { + name: "both stores are in sync - fail to list from legacy", + setupLegacyFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, errors.New("error")) + }, + setupStorageFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil) + }, + expectedOutcome: false, + }, + { + name: "both stores are in sync - fail to list from storage", + setupLegacyFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil) + }, + setupStorageFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, errors.New("error")) + }, + expectedOutcome: false, + }, + { + name: "storage is missing 1 entry (foo4)", + setupLegacyFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(legacyListWith4items, nil) + }, + setupStorageFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil) + m.On("Update", mock.Anything, "foo4", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil) + }, + expectedOutcome: true, + }, + { + name: "storage needs to be update (foo2 is different)", + setupLegacyFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3itemsObj2IsDifferent, nil) + }, + setupStorageFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil) + m.On("Update", mock.Anything, "foo2", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil) + }, + expectedOutcome: true, + }, + { + name: "storage is missing 1 entry (foo4) - fail to upsert", + setupLegacyFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(legacyListWith4items, nil) + }, + setupStorageFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil) + m.On("Update", mock.Anything, "foo4", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, errors.New("error")) + }, + expectedOutcome: false, + }, + { + name: "storage has an extra 1 entry (foo4)", + setupLegacyFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil) + }, + setupStorageFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(storageListWith4items, nil) + m.On("Delete", mock.Anything, "foo4", mock.Anything, mock.Anything).Return(exampleObj, false, nil) + }, + expectedOutcome: true, + }, + { + name: "storage has an extra 1 entry (foo4) - fail to delete", + setupLegacyFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil) + }, + setupStorageFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(storageListWith4items, nil) + m.On("Delete", mock.Anything, "foo4", mock.Anything, mock.Anything).Return(exampleObj, false, errors.New("error")) + }, + expectedOutcome: false, + }, + { + name: "storage is missing 1 entry (foo3) and has an extra 1 entry (foo4)", + setupLegacyFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil) + }, + setupStorageFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(storageListWith3itemsMissingFoo2, nil) + m.On("Update", mock.Anything, "foo2", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil) + m.On("Delete", mock.Anything, "foo4", mock.Anything, mock.Anything).Return(exampleObj, false, nil) + }, + expectedOutcome: true, + }, + } + + // mode 1 + for _, tt := range tests { + t.Run("Mode-1-"+tt.name, func(t *testing.T) { + l := (LegacyStorage)(nil) + s := (Storage)(nil) + lm := &mock.Mock{} + um := &mock.Mock{} + + ls := legacyStoreMock{lm, l} + us := storageMock{um, s} + + if tt.setupLegacyFn != nil { + tt.setupLegacyFn(lm) + } + if tt.setupStorageFn != nil { + tt.setupStorageFn(um) + } + + outcome, err := legacyToUnifiedStorageDataSyncer(context.Background(), Mode1, ls, us, "test.kind", p, &fakeServerLock{}, &request.RequestInfo{}) + if tt.wantErr { + assert.Error(t, err) + return + } + + assert.NoError(t, err) + assert.Equal(t, tt.expectedOutcome, outcome) + }) + } + + // mode 2 + for _, tt := range tests { + t.Run("Mode-2-"+tt.name, func(t *testing.T) { + l := (LegacyStorage)(nil) + s := (Storage)(nil) + lm := &mock.Mock{} + um := &mock.Mock{} + + ls := legacyStoreMock{lm, l} + us := storageMock{um, s} + + if tt.setupLegacyFn != nil { + tt.setupLegacyFn(lm) + } + if tt.setupStorageFn != nil { + tt.setupStorageFn(um) + } + + outcome, err := legacyToUnifiedStorageDataSyncer(context.Background(), Mode1, ls, us, "test.kind", p, &fakeServerLock{}, &request.RequestInfo{}) + if tt.wantErr { + assert.Error(t, err) + return + } + + assert.NoError(t, err) + assert.Equal(t, tt.expectedOutcome, outcome) + }) + } +} diff --git a/pkg/apiserver/rest/metrics.go b/pkg/apiserver/rest/metrics.go index 780b9b8a4ea..436f6a7dabe 100644 --- a/pkg/apiserver/rest/metrics.go +++ b/pkg/apiserver/rest/metrics.go @@ -1,6 +1,7 @@ package rest import ( + "fmt" "strconv" "time" @@ -97,15 +98,15 @@ func (m *dualWriterMetrics) recordOutcome(mode string, name string, areEqual boo m.outcome.WithLabelValues(mode, name, method).Observe(observeValue) } -func (m *dualWriterMetrics) recordDataSyncerDuration(isError bool, mode string, resource string, startFrom time.Time) { +func (m *dualWriterMetrics) recordDataSyncerDuration(isError bool, mode DualWriterMode, resource string, startFrom time.Time) { duration := time.Since(startFrom).Seconds() - m.syncer.WithLabelValues(strconv.FormatBool(isError), mode, resource).Observe(duration) + m.syncer.WithLabelValues(strconv.FormatBool(isError), fmt.Sprintf("%d", mode), resource).Observe(duration) } -func (m *dualWriterMetrics) recordDataSyncerOutcome(mode string, resource string, synced bool) { +func (m *dualWriterMetrics) recordDataSyncerOutcome(mode DualWriterMode, resource string, synced bool) { var observeValue float64 if !synced { observeValue = 1 } - m.syncerOutcome.WithLabelValues(mode, resource).Observe(observeValue) + m.syncerOutcome.WithLabelValues(fmt.Sprintf("%d", mode), resource).Observe(observeValue) }