mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
UniStore: Enable DataSyncer in Mode1 + better logging (#94688)
* UniStore: Enable DataSyncer Mode1 + better logging Signed-off-by: Maicon Costa <maiconscosta@gmail.com> --------- Signed-off-by: Maicon Costa <maiconscosta@gmail.com> Co-authored-by: Diego Augusto Molina <diegoaugustomolina@gmail.com>
This commit is contained in:
parent
4d8d916434
commit
bfd3506549
@ -6,7 +6,6 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
@ -306,54 +305,3 @@ func getName(o runtime.Object) string {
|
|||||||
}
|
}
|
||||||
return accessor.GetName()
|
return accessor.GetName()
|
||||||
}
|
}
|
||||||
|
|
||||||
const dataSyncerInterval = 60 * time.Minute
|
|
||||||
|
|
||||||
// StartPeriodicDataSyncer starts a background job that will execute the DataSyncer every 60 minutes
|
|
||||||
func StartPeriodicDataSyncer(ctx context.Context, mode DualWriterMode, legacy LegacyStorage, storage Storage,
|
|
||||||
kind string, reg prometheus.Registerer, serverLockService ServerLockService, requestInfo *request.RequestInfo) {
|
|
||||||
klog.Info("Starting periodic data syncer for mode mode: ", mode)
|
|
||||||
|
|
||||||
// run in background
|
|
||||||
go func() {
|
|
||||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
||||||
timeWindow := 600 // 600 seconds (10 minutes)
|
|
||||||
jitterSeconds := r.Int63n(int64(timeWindow))
|
|
||||||
klog.Info("data syncer is going to start at: ", time.Now().Add(time.Second*time.Duration(jitterSeconds)))
|
|
||||||
time.Sleep(time.Second * time.Duration(jitterSeconds))
|
|
||||||
|
|
||||||
// run it immediately
|
|
||||||
syncOK, err := runDataSyncer(ctx, mode, legacy, storage, kind, reg, serverLockService, requestInfo)
|
|
||||||
klog.Info("data syncer finished, syncOK: ", syncOK, ", error: ", err)
|
|
||||||
|
|
||||||
ticker := time.NewTicker(dataSyncerInterval)
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ticker.C:
|
|
||||||
syncOK, err = runDataSyncer(ctx, mode, legacy, storage, kind, reg, serverLockService, requestInfo)
|
|
||||||
klog.Info("data syncer finished, syncOK: ", syncOK, ", error: ", err)
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
// runDataSyncer will ensure that data between legacy storage and unified storage are in sync.
|
|
||||||
// The sync implementation depends on the DualWriter mode
|
|
||||||
func runDataSyncer(ctx context.Context, mode DualWriterMode, legacy LegacyStorage, storage Storage,
|
|
||||||
kind string, reg prometheus.Registerer, serverLockService ServerLockService, requestInfo *request.RequestInfo) (bool, error) {
|
|
||||||
// ensure that execution takes no longer than necessary
|
|
||||||
const timeout = dataSyncerInterval - time.Minute
|
|
||||||
ctx, cancelFn := context.WithTimeout(ctx, timeout)
|
|
||||||
defer cancelFn()
|
|
||||||
|
|
||||||
// implementation depends on the current DualWriter mode
|
|
||||||
switch mode {
|
|
||||||
case Mode2:
|
|
||||||
return mode2DataSyncer(ctx, legacy, storage, kind, reg, serverLockService, requestInfo)
|
|
||||||
default:
|
|
||||||
klog.Info("data syncer not implemented for mode mode:", mode)
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -2,21 +2,16 @@ package rest
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
"k8s.io/apimachinery/pkg/api/meta"
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
|
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apiserver/pkg/endpoints/request"
|
|
||||||
"k8s.io/apiserver/pkg/registry/rest"
|
"k8s.io/apiserver/pkg/registry/rest"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
|
|
||||||
"github.com/grafana/authlib/claims"
|
|
||||||
"github.com/grafana/grafana/pkg/apimachinery/identity"
|
|
||||||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -392,213 +387,3 @@ func enrichLegacyObject(originalObj, returnedObj runtime.Object) error {
|
|||||||
accessorReturned.SetUID(accessorOriginal.GetUID())
|
accessorReturned.SetUID(accessorOriginal.GetUID())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getSyncRequester(orgId int64) *identity.StaticRequester {
|
|
||||||
return &identity.StaticRequester{
|
|
||||||
Type: claims.TypeServiceAccount, // system:apiserver
|
|
||||||
UserID: 1,
|
|
||||||
OrgID: orgId,
|
|
||||||
Name: "admin",
|
|
||||||
Login: "admin",
|
|
||||||
OrgRole: identity.RoleAdmin,
|
|
||||||
IsGrafanaAdmin: true,
|
|
||||||
Permissions: map[int64]map[string][]string{
|
|
||||||
orgId: {
|
|
||||||
"*": {"*"}, // all resources, all scopes
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type syncItem struct {
|
|
||||||
name string
|
|
||||||
objStorage runtime.Object
|
|
||||||
objLegacy runtime.Object
|
|
||||||
}
|
|
||||||
|
|
||||||
func getList(ctx context.Context, obj rest.Lister, listOptions *metainternalversion.ListOptions) ([]runtime.Object, error) {
|
|
||||||
ll, err := obj.List(ctx, listOptions)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return meta.ExtractList(ll)
|
|
||||||
}
|
|
||||||
|
|
||||||
func mode2DataSyncer(ctx context.Context, legacy LegacyStorage, storage Storage, resource string, reg prometheus.Registerer, serverLockService ServerLockService, requestInfo *request.RequestInfo) (bool, error) {
|
|
||||||
metrics := &dualWriterMetrics{}
|
|
||||||
metrics.init(reg)
|
|
||||||
|
|
||||||
log := klog.NewKlogr().WithName("DualWriterMode2Syncer")
|
|
||||||
|
|
||||||
everythingSynced := false
|
|
||||||
outOfSync := 0
|
|
||||||
syncSuccess := 0
|
|
||||||
syncErr := 0
|
|
||||||
|
|
||||||
maxInterval := dataSyncerInterval + 5*time.Minute
|
|
||||||
|
|
||||||
var errSync error
|
|
||||||
const maxRecordsSync = 1000
|
|
||||||
|
|
||||||
// LockExecuteAndRelease ensures that just a single Grafana server acquires a lock at a time
|
|
||||||
// The parameter 'maxInterval' is a timeout safeguard, if the LastExecution in the
|
|
||||||
// database is older than maxInterval, we will assume the lock as timeouted. The 'maxInterval' parameter should be so long
|
|
||||||
// that is impossible for 2 processes to run at the same time.
|
|
||||||
err := serverLockService.LockExecuteAndRelease(ctx, "dualwriter mode 2 sync", maxInterval, func(context.Context) {
|
|
||||||
log.Info("starting dualwriter mode 2 sync")
|
|
||||||
startSync := time.Now()
|
|
||||||
|
|
||||||
orgId := int64(1)
|
|
||||||
|
|
||||||
ctx = klog.NewContext(ctx, log)
|
|
||||||
ctx = identity.WithRequester(ctx, getSyncRequester(orgId))
|
|
||||||
ctx = request.WithNamespace(ctx, requestInfo.Namespace)
|
|
||||||
ctx = request.WithRequestInfo(ctx, requestInfo)
|
|
||||||
|
|
||||||
storageList, err := getList(ctx, storage, &metainternalversion.ListOptions{
|
|
||||||
Limit: maxRecordsSync,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err, "unable to extract list from storage")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(storageList) >= maxRecordsSync {
|
|
||||||
errSync = fmt.Errorf("unified storage has more than %d records. Aborting sync", maxRecordsSync)
|
|
||||||
log.Error(errSync, "Unified storage has more records to be synced than allowed")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Info("got items from unified storage", "items", len(storageList))
|
|
||||||
|
|
||||||
legacyList, err := getList(ctx, legacy, &metainternalversion.ListOptions{})
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err, "unable to extract list from legacy storage")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
log.Info("got items from legacy storage", "items", len(legacyList))
|
|
||||||
|
|
||||||
itemsByName := map[string]syncItem{}
|
|
||||||
for _, obj := range legacyList {
|
|
||||||
accessor, err := utils.MetaAccessor(obj)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err, "error retrieving accessor data for object from legacy storage")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
name := accessor.GetName()
|
|
||||||
|
|
||||||
item, ok := itemsByName[name]
|
|
||||||
if !ok {
|
|
||||||
item = syncItem{}
|
|
||||||
}
|
|
||||||
item.name = name
|
|
||||||
item.objLegacy = obj
|
|
||||||
itemsByName[name] = item
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, obj := range storageList {
|
|
||||||
accessor, err := utils.MetaAccessor(obj)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err, "error retrieving accessor data for object from storage")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
name := accessor.GetName()
|
|
||||||
|
|
||||||
item, ok := itemsByName[name]
|
|
||||||
if !ok {
|
|
||||||
item = syncItem{}
|
|
||||||
}
|
|
||||||
item.name = name
|
|
||||||
item.objStorage = obj
|
|
||||||
itemsByName[name] = item
|
|
||||||
}
|
|
||||||
log.Info("got list of items to be synced", "items", len(itemsByName))
|
|
||||||
|
|
||||||
for name, item := range itemsByName {
|
|
||||||
// upsert if:
|
|
||||||
// - existing in both legacy and storage, but objects are different, or
|
|
||||||
// - if it's missing from storage
|
|
||||||
if item.objLegacy != nil &&
|
|
||||||
((item.objStorage != nil && !Compare(item.objLegacy, item.objStorage)) || (item.objStorage == nil)) {
|
|
||||||
outOfSync++
|
|
||||||
|
|
||||||
accessor, err := utils.MetaAccessor(item.objLegacy)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err, "error retrieving accessor data for object from storage")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if item.objStorage != nil {
|
|
||||||
accessorStorage, err := utils.MetaAccessor(item.objStorage)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err, "error retrieving accessor data for object from storage")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
accessor.SetResourceVersion(accessorStorage.GetResourceVersion())
|
|
||||||
accessor.SetUID(accessorStorage.GetUID())
|
|
||||||
|
|
||||||
log.Info("updating item on unified storage", "name", name)
|
|
||||||
} else {
|
|
||||||
accessor.SetResourceVersion("")
|
|
||||||
accessor.SetUID("")
|
|
||||||
|
|
||||||
log.Info("inserting item on unified storage", "name", name)
|
|
||||||
}
|
|
||||||
|
|
||||||
objInfo := rest.DefaultUpdatedObjectInfo(item.objLegacy, []rest.TransformFunc{}...)
|
|
||||||
res, _, err := storage.Update(ctx,
|
|
||||||
name,
|
|
||||||
objInfo,
|
|
||||||
func(ctx context.Context, obj runtime.Object) error { return nil },
|
|
||||||
func(ctx context.Context, obj, old runtime.Object) error { return nil },
|
|
||||||
true, // force creation
|
|
||||||
&metav1.UpdateOptions{},
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
log.WithValues("object", res).Error(err, "could not update in storage")
|
|
||||||
syncErr++
|
|
||||||
} else {
|
|
||||||
syncSuccess++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// delete if object does not exists on legacy but exists on storage
|
|
||||||
if item.objLegacy == nil && item.objStorage != nil {
|
|
||||||
outOfSync++
|
|
||||||
|
|
||||||
ctx = request.WithRequestInfo(ctx, &request.RequestInfo{
|
|
||||||
APIGroup: requestInfo.APIGroup,
|
|
||||||
Resource: requestInfo.Resource,
|
|
||||||
Name: name,
|
|
||||||
Namespace: requestInfo.Namespace,
|
|
||||||
})
|
|
||||||
|
|
||||||
log.Info("deleting item from unified storage", "name", name)
|
|
||||||
|
|
||||||
deletedS, _, err := storage.Delete(ctx, name, func(ctx context.Context, obj runtime.Object) error { return nil }, &metav1.DeleteOptions{})
|
|
||||||
if err != nil {
|
|
||||||
if !apierrors.IsNotFound(err) {
|
|
||||||
log.WithValues("objectList", deletedS).Error(err, "could not delete from storage")
|
|
||||||
}
|
|
||||||
syncErr++
|
|
||||||
} else {
|
|
||||||
syncSuccess++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
everythingSynced = outOfSync == syncSuccess
|
|
||||||
|
|
||||||
metrics.recordDataSyncerOutcome(mode2Str, resource, everythingSynced)
|
|
||||||
metrics.recordDataSyncerDuration(err != nil, mode2Str, resource, startSync)
|
|
||||||
|
|
||||||
log.Info("finished syncing items", "items", len(itemsByName), "updated", syncSuccess, "failed", syncErr, "outcome", everythingSynced)
|
|
||||||
})
|
|
||||||
|
|
||||||
if errSync != nil {
|
|
||||||
err = errSync
|
|
||||||
}
|
|
||||||
|
|
||||||
return everythingSynced, err
|
|
||||||
}
|
|
||||||
|
@ -4,7 +4,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/mock"
|
"github.com/stretchr/testify/mock"
|
||||||
@ -16,7 +15,6 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/apiserver/pkg/apis/example"
|
"k8s.io/apiserver/pkg/apis/example"
|
||||||
"k8s.io/apiserver/pkg/endpoints/request"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var createFn = func(context.Context, runtime.Object) error { return nil }
|
var createFn = func(context.Context, runtime.Object) error { return nil }
|
||||||
@ -609,197 +607,3 @@ func TestEnrichReturnedObject(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var legacyObj1 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo1", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
|
||||||
var legacyObj2 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo2", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
|
||||||
var legacyObj3 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo3", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
|
||||||
var legacyObj4 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo4", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
|
||||||
|
|
||||||
var legacyObj2WithHostname = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo2", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{Hostname: "hostname"}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
|
||||||
|
|
||||||
var storageObj1 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo1", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
|
||||||
var storageObj2 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo2", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
|
||||||
var storageObj3 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo3", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
|
||||||
var storageObj4 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo4", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
|
||||||
|
|
||||||
var legacyListWith3items = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{},
|
|
||||||
Items: []example.Pod{
|
|
||||||
*legacyObj1,
|
|
||||||
*legacyObj2,
|
|
||||||
*legacyObj3,
|
|
||||||
}}
|
|
||||||
|
|
||||||
var legacyListWith4items = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{},
|
|
||||||
Items: []example.Pod{
|
|
||||||
*legacyObj1,
|
|
||||||
*legacyObj2,
|
|
||||||
*legacyObj3,
|
|
||||||
*legacyObj4,
|
|
||||||
}}
|
|
||||||
|
|
||||||
var legacyListWith3itemsObj2IsDifferent = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{},
|
|
||||||
Items: []example.Pod{
|
|
||||||
*legacyObj1,
|
|
||||||
*legacyObj2WithHostname,
|
|
||||||
*legacyObj3,
|
|
||||||
}}
|
|
||||||
|
|
||||||
var storageListWith3items = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{},
|
|
||||||
Items: []example.Pod{
|
|
||||||
*storageObj1,
|
|
||||||
*storageObj2,
|
|
||||||
*storageObj3,
|
|
||||||
}}
|
|
||||||
|
|
||||||
var storageListWith4items = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{},
|
|
||||||
Items: []example.Pod{
|
|
||||||
*storageObj1,
|
|
||||||
*storageObj2,
|
|
||||||
*storageObj3,
|
|
||||||
*storageObj4,
|
|
||||||
}}
|
|
||||||
|
|
||||||
var storageListWith3itemsMissingFoo2 = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{},
|
|
||||||
Items: []example.Pod{
|
|
||||||
*storageObj1,
|
|
||||||
*storageObj3,
|
|
||||||
*storageObj4,
|
|
||||||
}}
|
|
||||||
|
|
||||||
func TestMode2_DataSyncer(t *testing.T) {
|
|
||||||
type testCase struct {
|
|
||||||
setupLegacyFn func(m *mock.Mock)
|
|
||||||
setupStorageFn func(m *mock.Mock)
|
|
||||||
name string
|
|
||||||
expectedOutcome bool
|
|
||||||
wantErr bool
|
|
||||||
}
|
|
||||||
tests :=
|
|
||||||
[]testCase{
|
|
||||||
{
|
|
||||||
name: "both stores are in sync",
|
|
||||||
setupLegacyFn: func(m *mock.Mock) {
|
|
||||||
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil)
|
|
||||||
},
|
|
||||||
setupStorageFn: func(m *mock.Mock) {
|
|
||||||
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil)
|
|
||||||
},
|
|
||||||
expectedOutcome: true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "both stores are in sync - fail to list from legacy",
|
|
||||||
setupLegacyFn: func(m *mock.Mock) {
|
|
||||||
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, errors.New("error"))
|
|
||||||
},
|
|
||||||
setupStorageFn: func(m *mock.Mock) {
|
|
||||||
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil)
|
|
||||||
},
|
|
||||||
expectedOutcome: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "both stores are in sync - fail to list from storage",
|
|
||||||
setupLegacyFn: func(m *mock.Mock) {
|
|
||||||
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil)
|
|
||||||
},
|
|
||||||
setupStorageFn: func(m *mock.Mock) {
|
|
||||||
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, errors.New("error"))
|
|
||||||
},
|
|
||||||
expectedOutcome: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "storage is missing 1 entry (foo4)",
|
|
||||||
setupLegacyFn: func(m *mock.Mock) {
|
|
||||||
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith4items, nil)
|
|
||||||
},
|
|
||||||
setupStorageFn: func(m *mock.Mock) {
|
|
||||||
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil)
|
|
||||||
m.On("Update", mock.Anything, "foo4", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil)
|
|
||||||
},
|
|
||||||
expectedOutcome: true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "storage needs to be update (foo2 is different)",
|
|
||||||
setupLegacyFn: func(m *mock.Mock) {
|
|
||||||
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3itemsObj2IsDifferent, nil)
|
|
||||||
},
|
|
||||||
setupStorageFn: func(m *mock.Mock) {
|
|
||||||
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil)
|
|
||||||
m.On("Update", mock.Anything, "foo2", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil)
|
|
||||||
},
|
|
||||||
expectedOutcome: true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "storage is missing 1 entry (foo4) - fail to upsert",
|
|
||||||
setupLegacyFn: func(m *mock.Mock) {
|
|
||||||
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith4items, nil)
|
|
||||||
},
|
|
||||||
setupStorageFn: func(m *mock.Mock) {
|
|
||||||
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil)
|
|
||||||
m.On("Update", mock.Anything, "foo4", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, errors.New("error"))
|
|
||||||
},
|
|
||||||
expectedOutcome: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "storage has an extra 1 entry (foo4)",
|
|
||||||
setupLegacyFn: func(m *mock.Mock) {
|
|
||||||
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil)
|
|
||||||
},
|
|
||||||
setupStorageFn: func(m *mock.Mock) {
|
|
||||||
m.On("List", mock.Anything, mock.Anything).Return(storageListWith4items, nil)
|
|
||||||
m.On("Delete", mock.Anything, "foo4", mock.Anything, mock.Anything).Return(exampleObj, false, nil)
|
|
||||||
},
|
|
||||||
expectedOutcome: true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "storage has an extra 1 entry (foo4) - fail to delete",
|
|
||||||
setupLegacyFn: func(m *mock.Mock) {
|
|
||||||
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil)
|
|
||||||
},
|
|
||||||
setupStorageFn: func(m *mock.Mock) {
|
|
||||||
m.On("List", mock.Anything, mock.Anything).Return(storageListWith4items, nil)
|
|
||||||
m.On("Delete", mock.Anything, "foo4", mock.Anything, mock.Anything).Return(exampleObj, false, errors.New("error"))
|
|
||||||
},
|
|
||||||
expectedOutcome: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "storage is missing 1 entry (foo3) and has an extra 1 entry (foo4)",
|
|
||||||
setupLegacyFn: func(m *mock.Mock) {
|
|
||||||
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil)
|
|
||||||
},
|
|
||||||
setupStorageFn: func(m *mock.Mock) {
|
|
||||||
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3itemsMissingFoo2, nil)
|
|
||||||
m.On("Update", mock.Anything, "foo2", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil)
|
|
||||||
m.On("Delete", mock.Anything, "foo4", mock.Anything, mock.Anything).Return(exampleObj, false, nil)
|
|
||||||
},
|
|
||||||
expectedOutcome: true,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
l := (LegacyStorage)(nil)
|
|
||||||
s := (Storage)(nil)
|
|
||||||
lm := &mock.Mock{}
|
|
||||||
um := &mock.Mock{}
|
|
||||||
|
|
||||||
ls := legacyStoreMock{lm, l}
|
|
||||||
us := storageMock{um, s}
|
|
||||||
|
|
||||||
if tt.setupLegacyFn != nil {
|
|
||||||
tt.setupLegacyFn(lm)
|
|
||||||
}
|
|
||||||
if tt.setupStorageFn != nil {
|
|
||||||
tt.setupStorageFn(um)
|
|
||||||
}
|
|
||||||
|
|
||||||
outcome, err := mode2DataSyncer(context.Background(), ls, us, "test.kind", p, &fakeServerLock{}, &request.RequestInfo{})
|
|
||||||
if tt.wantErr {
|
|
||||||
assert.Error(t, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.Equal(t, tt.expectedOutcome, outcome)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
271
pkg/apiserver/rest/dualwriter_syncer.go
Normal file
271
pkg/apiserver/rest/dualwriter_syncer.go
Normal file
@ -0,0 +1,271 @@
|
|||||||
|
package rest
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
|
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apiserver/pkg/endpoints/request"
|
||||||
|
"k8s.io/apiserver/pkg/registry/rest"
|
||||||
|
"k8s.io/klog/v2"
|
||||||
|
|
||||||
|
"github.com/grafana/authlib/claims"
|
||||||
|
"github.com/grafana/grafana/pkg/apimachinery/identity"
|
||||||
|
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||||
|
)
|
||||||
|
|
||||||
|
type syncItem struct {
|
||||||
|
name string
|
||||||
|
objStorage runtime.Object
|
||||||
|
objLegacy runtime.Object
|
||||||
|
accessorStorage utils.GrafanaMetaAccessor
|
||||||
|
accessorLegacy utils.GrafanaMetaAccessor
|
||||||
|
}
|
||||||
|
|
||||||
|
const dataSyncerInterval = 60 * time.Minute
|
||||||
|
|
||||||
|
// StartPeriodicDataSyncer starts a background job that will execute the DataSyncer every 60 minutes
|
||||||
|
func StartPeriodicDataSyncer(ctx context.Context, mode DualWriterMode, legacy LegacyStorage, storage Storage,
|
||||||
|
kind string, reg prometheus.Registerer, serverLockService ServerLockService, requestInfo *request.RequestInfo) {
|
||||||
|
log := klog.NewKlogr().WithName("legacyToUnifiedStorageDataSyncer").WithValues("mode", mode, "resource", kind)
|
||||||
|
|
||||||
|
log.Info("Starting periodic data syncer")
|
||||||
|
|
||||||
|
// run in background
|
||||||
|
go func() {
|
||||||
|
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
|
timeWindow := 600 // 600 seconds (10 minutes)
|
||||||
|
jitterSeconds := r.Int63n(int64(timeWindow))
|
||||||
|
log.Info("data syncer scheduled", "starting time", time.Now().Add(time.Second*time.Duration(jitterSeconds)))
|
||||||
|
time.Sleep(time.Second * time.Duration(jitterSeconds))
|
||||||
|
|
||||||
|
// run it immediately
|
||||||
|
syncOK, err := runDataSyncer(ctx, mode, legacy, storage, kind, reg, serverLockService, requestInfo)
|
||||||
|
log.Info("data syncer finished", "syncOK", syncOK, "error", err)
|
||||||
|
|
||||||
|
ticker := time.NewTicker(dataSyncerInterval)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
syncOK, err = runDataSyncer(ctx, mode, legacy, storage, kind, reg, serverLockService, requestInfo)
|
||||||
|
log.Info("data syncer finished", "syncOK", syncOK, ", error", err)
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// runDataSyncer will ensure that data between legacy storage and unified storage are in sync.
|
||||||
|
// The sync implementation depends on the DualWriter mode
|
||||||
|
func runDataSyncer(ctx context.Context, mode DualWriterMode, legacy LegacyStorage, storage Storage,
|
||||||
|
kind string, reg prometheus.Registerer, serverLockService ServerLockService, requestInfo *request.RequestInfo) (bool, error) {
|
||||||
|
// ensure that execution takes no longer than necessary
|
||||||
|
const timeout = dataSyncerInterval - time.Minute
|
||||||
|
ctx, cancelFn := context.WithTimeout(ctx, timeout)
|
||||||
|
defer cancelFn()
|
||||||
|
|
||||||
|
// implementation depends on the current DualWriter mode
|
||||||
|
switch mode {
|
||||||
|
case Mode1, Mode2:
|
||||||
|
return legacyToUnifiedStorageDataSyncer(ctx, mode, legacy, storage, kind, reg, serverLockService, requestInfo)
|
||||||
|
default:
|
||||||
|
klog.Info("data syncer not implemented for mode mode:", mode)
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func legacyToUnifiedStorageDataSyncer(ctx context.Context, mode DualWriterMode, legacy LegacyStorage, storage Storage, resource string, reg prometheus.Registerer, serverLockService ServerLockService, requestInfo *request.RequestInfo) (bool, error) {
|
||||||
|
metrics := &dualWriterMetrics{}
|
||||||
|
metrics.init(reg)
|
||||||
|
|
||||||
|
log := klog.NewKlogr().WithName("legacyToUnifiedStorageDataSyncer").WithValues("mode", mode, "resource", resource)
|
||||||
|
|
||||||
|
everythingSynced := false
|
||||||
|
outOfSync := 0
|
||||||
|
syncSuccess := 0
|
||||||
|
syncErr := 0
|
||||||
|
|
||||||
|
maxInterval := dataSyncerInterval + 5*time.Minute
|
||||||
|
|
||||||
|
var errSync error
|
||||||
|
const maxRecordsSync = 1000
|
||||||
|
|
||||||
|
// LockExecuteAndRelease ensures that just a single Grafana server acquires a lock at a time
|
||||||
|
// The parameter 'maxInterval' is a timeout safeguard, if the LastExecution in the
|
||||||
|
// database is older than maxInterval, we will assume the lock as timeouted. The 'maxInterval' parameter should be so long
|
||||||
|
// that is impossible for 2 processes to run at the same time.
|
||||||
|
err := serverLockService.LockExecuteAndRelease(ctx, fmt.Sprintf("legacyToUnifiedStorageDataSyncer-%d-%s", mode, resource), maxInterval, func(context.Context) {
|
||||||
|
log.Info("starting legacyToUnifiedStorageDataSyncer")
|
||||||
|
startSync := time.Now()
|
||||||
|
|
||||||
|
orgId := int64(1)
|
||||||
|
|
||||||
|
ctx = klog.NewContext(ctx, log)
|
||||||
|
ctx = identity.WithRequester(ctx, getSyncRequester(orgId))
|
||||||
|
ctx = request.WithNamespace(ctx, requestInfo.Namespace)
|
||||||
|
ctx = request.WithRequestInfo(ctx, requestInfo)
|
||||||
|
|
||||||
|
storageList, err := getList(ctx, storage, &metainternalversion.ListOptions{
|
||||||
|
Limit: maxRecordsSync,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err, "unable to extract list from storage")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(storageList) >= maxRecordsSync {
|
||||||
|
errSync = fmt.Errorf("unified storage has more than %d records. Aborting sync", maxRecordsSync)
|
||||||
|
log.Error(errSync, "Unified storage has more records to be synced than allowed")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("got items from unified storage", "items", len(storageList))
|
||||||
|
|
||||||
|
legacyList, err := getList(ctx, legacy, &metainternalversion.ListOptions{})
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err, "unable to extract list from legacy storage")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Info("got items from legacy storage", "items", len(legacyList))
|
||||||
|
|
||||||
|
itemsByName := map[string]syncItem{}
|
||||||
|
for _, obj := range legacyList {
|
||||||
|
accessor, err := utils.MetaAccessor(obj)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err, "error retrieving accessor data for object from legacy storage")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
name := accessor.GetName()
|
||||||
|
|
||||||
|
item := itemsByName[name]
|
||||||
|
item.name = name
|
||||||
|
item.objLegacy = obj
|
||||||
|
item.accessorLegacy = accessor
|
||||||
|
itemsByName[name] = item
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, obj := range storageList {
|
||||||
|
accessor, err := utils.MetaAccessor(obj)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err, "error retrieving accessor data for object from storage")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
name := accessor.GetName()
|
||||||
|
|
||||||
|
item := itemsByName[name]
|
||||||
|
item.name = name
|
||||||
|
item.objStorage = obj
|
||||||
|
item.accessorStorage = accessor
|
||||||
|
itemsByName[name] = item
|
||||||
|
}
|
||||||
|
log.Info("got list of items to be synced", "items", len(itemsByName))
|
||||||
|
|
||||||
|
for name, item := range itemsByName {
|
||||||
|
// upsert if:
|
||||||
|
// - existing in both legacy and storage, but objects are different, or
|
||||||
|
// - if it's missing from storage
|
||||||
|
if item.objLegacy != nil &&
|
||||||
|
(item.objStorage == nil || !Compare(item.objLegacy, item.objStorage)) {
|
||||||
|
outOfSync++
|
||||||
|
|
||||||
|
if item.objStorage != nil {
|
||||||
|
item.accessorLegacy.SetResourceVersion(item.accessorStorage.GetResourceVersion())
|
||||||
|
item.accessorLegacy.SetUID(item.accessorStorage.GetUID())
|
||||||
|
|
||||||
|
log.Info("updating item on unified storage", "name", name)
|
||||||
|
} else {
|
||||||
|
item.accessorLegacy.SetResourceVersion("")
|
||||||
|
item.accessorLegacy.SetUID("")
|
||||||
|
|
||||||
|
log.Info("inserting item on unified storage", "name", name)
|
||||||
|
}
|
||||||
|
|
||||||
|
objInfo := rest.DefaultUpdatedObjectInfo(item.objLegacy, []rest.TransformFunc{}...)
|
||||||
|
res, _, err := storage.Update(ctx,
|
||||||
|
name,
|
||||||
|
objInfo,
|
||||||
|
func(ctx context.Context, obj runtime.Object) error { return nil },
|
||||||
|
func(ctx context.Context, obj, old runtime.Object) error { return nil },
|
||||||
|
true, // force creation
|
||||||
|
&metav1.UpdateOptions{},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
log.WithValues("object", res).Error(err, "could not update in storage")
|
||||||
|
syncErr++
|
||||||
|
} else {
|
||||||
|
syncSuccess++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// delete if object does not exists on legacy but exists on storage
|
||||||
|
if item.objLegacy == nil && item.objStorage != nil {
|
||||||
|
outOfSync++
|
||||||
|
|
||||||
|
ctx = request.WithRequestInfo(ctx, &request.RequestInfo{
|
||||||
|
APIGroup: requestInfo.APIGroup,
|
||||||
|
Resource: requestInfo.Resource,
|
||||||
|
Name: name,
|
||||||
|
Namespace: requestInfo.Namespace,
|
||||||
|
})
|
||||||
|
|
||||||
|
log.Info("deleting item from unified storage", "name", name)
|
||||||
|
|
||||||
|
deletedS, _, err := storage.Delete(ctx, name, func(ctx context.Context, obj runtime.Object) error { return nil }, &metav1.DeleteOptions{})
|
||||||
|
if err != nil && !apierrors.IsNotFound(err) {
|
||||||
|
log.WithValues("objectList", deletedS).Error(err, "could not delete from storage")
|
||||||
|
syncErr++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
syncSuccess++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
everythingSynced = outOfSync == syncSuccess
|
||||||
|
|
||||||
|
metrics.recordDataSyncerOutcome(mode, resource, everythingSynced)
|
||||||
|
metrics.recordDataSyncerDuration(err != nil, mode, resource, startSync)
|
||||||
|
|
||||||
|
log.Info("finished syncing items", "items", len(itemsByName), "updated", syncSuccess, "failed", syncErr, "outcome", everythingSynced)
|
||||||
|
})
|
||||||
|
|
||||||
|
if errSync != nil {
|
||||||
|
err = errSync
|
||||||
|
}
|
||||||
|
|
||||||
|
return everythingSynced, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func getSyncRequester(orgId int64) *identity.StaticRequester {
|
||||||
|
return &identity.StaticRequester{
|
||||||
|
Type: claims.TypeServiceAccount, // system:apiserver
|
||||||
|
UserID: 1,
|
||||||
|
OrgID: orgId,
|
||||||
|
Name: "admin",
|
||||||
|
Login: "admin",
|
||||||
|
OrgRole: identity.RoleAdmin,
|
||||||
|
IsGrafanaAdmin: true,
|
||||||
|
Permissions: map[int64]map[string][]string{
|
||||||
|
orgId: {
|
||||||
|
"*": {"*"}, // all resources, all scopes
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getList(ctx context.Context, obj rest.Lister, listOptions *metainternalversion.ListOptions) ([]runtime.Object, error) {
|
||||||
|
ll, err := obj.List(ctx, listOptions)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return meta.ExtractList(ll)
|
||||||
|
}
|
238
pkg/apiserver/rest/dualwriter_syncer_test.go
Normal file
238
pkg/apiserver/rest/dualwriter_syncer_test.go
Normal file
@ -0,0 +1,238 @@
|
|||||||
|
package rest
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/mock"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apiserver/pkg/apis/example"
|
||||||
|
"k8s.io/apiserver/pkg/endpoints/request"
|
||||||
|
)
|
||||||
|
|
||||||
|
var legacyObj1 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo1", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
||||||
|
var legacyObj2 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo2", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
||||||
|
var legacyObj3 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo3", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
||||||
|
var legacyObj4 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo4", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
||||||
|
|
||||||
|
var legacyObj2WithHostname = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo2", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{Hostname: "hostname"}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
||||||
|
|
||||||
|
var storageObj1 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo1", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
||||||
|
var storageObj2 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo2", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
||||||
|
var storageObj3 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo3", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
||||||
|
var storageObj4 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo4", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
||||||
|
|
||||||
|
var legacyListWith3items = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{},
|
||||||
|
Items: []example.Pod{
|
||||||
|
*legacyObj1,
|
||||||
|
*legacyObj2,
|
||||||
|
*legacyObj3,
|
||||||
|
}}
|
||||||
|
|
||||||
|
var legacyListWith4items = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{},
|
||||||
|
Items: []example.Pod{
|
||||||
|
*legacyObj1,
|
||||||
|
*legacyObj2,
|
||||||
|
*legacyObj3,
|
||||||
|
*legacyObj4,
|
||||||
|
}}
|
||||||
|
|
||||||
|
var legacyListWith3itemsObj2IsDifferent = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{},
|
||||||
|
Items: []example.Pod{
|
||||||
|
*legacyObj1,
|
||||||
|
*legacyObj2WithHostname,
|
||||||
|
*legacyObj3,
|
||||||
|
}}
|
||||||
|
|
||||||
|
var storageListWith3items = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{},
|
||||||
|
Items: []example.Pod{
|
||||||
|
*storageObj1,
|
||||||
|
*storageObj2,
|
||||||
|
*storageObj3,
|
||||||
|
}}
|
||||||
|
|
||||||
|
var storageListWith4items = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{},
|
||||||
|
Items: []example.Pod{
|
||||||
|
*storageObj1,
|
||||||
|
*storageObj2,
|
||||||
|
*storageObj3,
|
||||||
|
*storageObj4,
|
||||||
|
}}
|
||||||
|
|
||||||
|
var storageListWith3itemsMissingFoo2 = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{},
|
||||||
|
Items: []example.Pod{
|
||||||
|
*storageObj1,
|
||||||
|
*storageObj3,
|
||||||
|
*storageObj4,
|
||||||
|
}}
|
||||||
|
|
||||||
|
func TestLegacyToUnifiedStorage_DataSyncer(t *testing.T) {
|
||||||
|
type testCase struct {
|
||||||
|
setupLegacyFn func(m *mock.Mock)
|
||||||
|
setupStorageFn func(m *mock.Mock)
|
||||||
|
name string
|
||||||
|
expectedOutcome bool
|
||||||
|
wantErr bool
|
||||||
|
}
|
||||||
|
tests :=
|
||||||
|
[]testCase{
|
||||||
|
{
|
||||||
|
name: "both stores are in sync",
|
||||||
|
setupLegacyFn: func(m *mock.Mock) {
|
||||||
|
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil)
|
||||||
|
},
|
||||||
|
setupStorageFn: func(m *mock.Mock) {
|
||||||
|
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil)
|
||||||
|
},
|
||||||
|
expectedOutcome: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "both stores are in sync - fail to list from legacy",
|
||||||
|
setupLegacyFn: func(m *mock.Mock) {
|
||||||
|
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, errors.New("error"))
|
||||||
|
},
|
||||||
|
setupStorageFn: func(m *mock.Mock) {
|
||||||
|
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil)
|
||||||
|
},
|
||||||
|
expectedOutcome: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "both stores are in sync - fail to list from storage",
|
||||||
|
setupLegacyFn: func(m *mock.Mock) {
|
||||||
|
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil)
|
||||||
|
},
|
||||||
|
setupStorageFn: func(m *mock.Mock) {
|
||||||
|
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, errors.New("error"))
|
||||||
|
},
|
||||||
|
expectedOutcome: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "storage is missing 1 entry (foo4)",
|
||||||
|
setupLegacyFn: func(m *mock.Mock) {
|
||||||
|
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith4items, nil)
|
||||||
|
},
|
||||||
|
setupStorageFn: func(m *mock.Mock) {
|
||||||
|
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil)
|
||||||
|
m.On("Update", mock.Anything, "foo4", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil)
|
||||||
|
},
|
||||||
|
expectedOutcome: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "storage needs to be update (foo2 is different)",
|
||||||
|
setupLegacyFn: func(m *mock.Mock) {
|
||||||
|
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3itemsObj2IsDifferent, nil)
|
||||||
|
},
|
||||||
|
setupStorageFn: func(m *mock.Mock) {
|
||||||
|
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil)
|
||||||
|
m.On("Update", mock.Anything, "foo2", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil)
|
||||||
|
},
|
||||||
|
expectedOutcome: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "storage is missing 1 entry (foo4) - fail to upsert",
|
||||||
|
setupLegacyFn: func(m *mock.Mock) {
|
||||||
|
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith4items, nil)
|
||||||
|
},
|
||||||
|
setupStorageFn: func(m *mock.Mock) {
|
||||||
|
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil)
|
||||||
|
m.On("Update", mock.Anything, "foo4", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, errors.New("error"))
|
||||||
|
},
|
||||||
|
expectedOutcome: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "storage has an extra 1 entry (foo4)",
|
||||||
|
setupLegacyFn: func(m *mock.Mock) {
|
||||||
|
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil)
|
||||||
|
},
|
||||||
|
setupStorageFn: func(m *mock.Mock) {
|
||||||
|
m.On("List", mock.Anything, mock.Anything).Return(storageListWith4items, nil)
|
||||||
|
m.On("Delete", mock.Anything, "foo4", mock.Anything, mock.Anything).Return(exampleObj, false, nil)
|
||||||
|
},
|
||||||
|
expectedOutcome: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "storage has an extra 1 entry (foo4) - fail to delete",
|
||||||
|
setupLegacyFn: func(m *mock.Mock) {
|
||||||
|
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil)
|
||||||
|
},
|
||||||
|
setupStorageFn: func(m *mock.Mock) {
|
||||||
|
m.On("List", mock.Anything, mock.Anything).Return(storageListWith4items, nil)
|
||||||
|
m.On("Delete", mock.Anything, "foo4", mock.Anything, mock.Anything).Return(exampleObj, false, errors.New("error"))
|
||||||
|
},
|
||||||
|
expectedOutcome: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "storage is missing 1 entry (foo3) and has an extra 1 entry (foo4)",
|
||||||
|
setupLegacyFn: func(m *mock.Mock) {
|
||||||
|
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil)
|
||||||
|
},
|
||||||
|
setupStorageFn: func(m *mock.Mock) {
|
||||||
|
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3itemsMissingFoo2, nil)
|
||||||
|
m.On("Update", mock.Anything, "foo2", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil)
|
||||||
|
m.On("Delete", mock.Anything, "foo4", mock.Anything, mock.Anything).Return(exampleObj, false, nil)
|
||||||
|
},
|
||||||
|
expectedOutcome: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// mode 1
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run("Mode-1-"+tt.name, func(t *testing.T) {
|
||||||
|
l := (LegacyStorage)(nil)
|
||||||
|
s := (Storage)(nil)
|
||||||
|
lm := &mock.Mock{}
|
||||||
|
um := &mock.Mock{}
|
||||||
|
|
||||||
|
ls := legacyStoreMock{lm, l}
|
||||||
|
us := storageMock{um, s}
|
||||||
|
|
||||||
|
if tt.setupLegacyFn != nil {
|
||||||
|
tt.setupLegacyFn(lm)
|
||||||
|
}
|
||||||
|
if tt.setupStorageFn != nil {
|
||||||
|
tt.setupStorageFn(um)
|
||||||
|
}
|
||||||
|
|
||||||
|
outcome, err := legacyToUnifiedStorageDataSyncer(context.Background(), Mode1, ls, us, "test.kind", p, &fakeServerLock{}, &request.RequestInfo{})
|
||||||
|
if tt.wantErr {
|
||||||
|
assert.Error(t, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, tt.expectedOutcome, outcome)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// mode 2
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run("Mode-2-"+tt.name, func(t *testing.T) {
|
||||||
|
l := (LegacyStorage)(nil)
|
||||||
|
s := (Storage)(nil)
|
||||||
|
lm := &mock.Mock{}
|
||||||
|
um := &mock.Mock{}
|
||||||
|
|
||||||
|
ls := legacyStoreMock{lm, l}
|
||||||
|
us := storageMock{um, s}
|
||||||
|
|
||||||
|
if tt.setupLegacyFn != nil {
|
||||||
|
tt.setupLegacyFn(lm)
|
||||||
|
}
|
||||||
|
if tt.setupStorageFn != nil {
|
||||||
|
tt.setupStorageFn(um)
|
||||||
|
}
|
||||||
|
|
||||||
|
outcome, err := legacyToUnifiedStorageDataSyncer(context.Background(), Mode1, ls, us, "test.kind", p, &fakeServerLock{}, &request.RequestInfo{})
|
||||||
|
if tt.wantErr {
|
||||||
|
assert.Error(t, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, tt.expectedOutcome, outcome)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
@ -1,6 +1,7 @@
|
|||||||
package rest
|
package rest
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -97,15 +98,15 @@ func (m *dualWriterMetrics) recordOutcome(mode string, name string, areEqual boo
|
|||||||
m.outcome.WithLabelValues(mode, name, method).Observe(observeValue)
|
m.outcome.WithLabelValues(mode, name, method).Observe(observeValue)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *dualWriterMetrics) recordDataSyncerDuration(isError bool, mode string, resource string, startFrom time.Time) {
|
func (m *dualWriterMetrics) recordDataSyncerDuration(isError bool, mode DualWriterMode, resource string, startFrom time.Time) {
|
||||||
duration := time.Since(startFrom).Seconds()
|
duration := time.Since(startFrom).Seconds()
|
||||||
m.syncer.WithLabelValues(strconv.FormatBool(isError), mode, resource).Observe(duration)
|
m.syncer.WithLabelValues(strconv.FormatBool(isError), fmt.Sprintf("%d", mode), resource).Observe(duration)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *dualWriterMetrics) recordDataSyncerOutcome(mode string, resource string, synced bool) {
|
func (m *dualWriterMetrics) recordDataSyncerOutcome(mode DualWriterMode, resource string, synced bool) {
|
||||||
var observeValue float64
|
var observeValue float64
|
||||||
if !synced {
|
if !synced {
|
||||||
observeValue = 1
|
observeValue = 1
|
||||||
}
|
}
|
||||||
m.syncerOutcome.WithLabelValues(mode, resource).Observe(observeValue)
|
m.syncerOutcome.WithLabelValues(fmt.Sprintf("%d", mode), resource).Observe(observeValue)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user