mirror of
https://github.com/grafana/grafana.git
synced 2024-11-28 19:54:10 -06:00
Unified Storage: First iteration Dual Write Syncer (#89809)
* Unified Storage: First iteration Dual Write Syncer Signed-off-by: Maicon Costa <maiconscosta@gmail.com> Co-authored-by: Leonor Oliveira <9090754+leonorfmartins@users.noreply.github.com> Co-authored-by: Dan Cech <dcech@grafana.com>
This commit is contained in:
parent
2235825ef9
commit
de2c9a06bf
@ -6,12 +6,15 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
@ -102,7 +105,13 @@ const (
|
||||
|
||||
// TODO: make this function private as there should only be one public way of setting the dual writing mode
|
||||
// NewDualWriter returns a new DualWriter.
|
||||
func NewDualWriter(mode DualWriterMode, legacy LegacyStorage, storage Storage, reg prometheus.Registerer, kind string) DualWriter {
|
||||
func NewDualWriter(
|
||||
mode DualWriterMode,
|
||||
legacy LegacyStorage,
|
||||
storage Storage,
|
||||
reg prometheus.Registerer,
|
||||
kind string,
|
||||
) DualWriter {
|
||||
metrics := &dualWriterMetrics{}
|
||||
metrics.init(reg)
|
||||
switch mode {
|
||||
@ -148,6 +157,10 @@ type NamespacedKVStore interface {
|
||||
Set(ctx context.Context, key, value string) error
|
||||
}
|
||||
|
||||
type ServerLockService interface {
|
||||
LockExecuteAndRelease(ctx context.Context, actionName string, maxInterval time.Duration, fn func(ctx context.Context)) error
|
||||
}
|
||||
|
||||
func SetDualWritingMode(
|
||||
ctx context.Context,
|
||||
kvs NamespacedKVStore,
|
||||
@ -156,6 +169,8 @@ func SetDualWritingMode(
|
||||
entity string,
|
||||
desiredMode DualWriterMode,
|
||||
reg prometheus.Registerer,
|
||||
serverLockService ServerLockService,
|
||||
requestInfo *request.RequestInfo,
|
||||
) (DualWriterMode, error) {
|
||||
// Mode0 means no DualWriter
|
||||
if desiredMode == Mode0 {
|
||||
@ -206,6 +221,7 @@ func SetDualWritingMode(
|
||||
return Mode0, errDualWriterSetCurrentMode
|
||||
}
|
||||
}
|
||||
|
||||
if (desiredMode == Mode1) && (currentMode == Mode2) {
|
||||
// This is where we go through the different gates to allow the instance to migrate from mode 2 to mode 1.
|
||||
// There are none between mode 1 and mode 2
|
||||
@ -217,6 +233,28 @@ func SetDualWritingMode(
|
||||
}
|
||||
}
|
||||
|
||||
if (desiredMode == Mode3) && (currentMode == Mode2) {
|
||||
// This is where we go through the different gates to allow the instance to migrate from mode 2 to mode 3.
|
||||
|
||||
// gate #1: ensure the data is 100% in sync
|
||||
syncOk, err := runDataSyncer(ctx, currentMode, legacy, storage, entity, reg, serverLockService, requestInfo)
|
||||
if err != nil {
|
||||
klog.Info("data syncer failed for mode:", m)
|
||||
return currentMode, err
|
||||
}
|
||||
if !syncOk {
|
||||
klog.Info("data syncer not ok for mode:", m)
|
||||
return currentMode, nil
|
||||
}
|
||||
|
||||
err = kvs.Set(ctx, entity, fmt.Sprint(desiredMode))
|
||||
if err != nil {
|
||||
return currentMode, errDualWriterSetCurrentMode
|
||||
}
|
||||
|
||||
return desiredMode, nil
|
||||
}
|
||||
|
||||
// #TODO add support for other combinations of desired and current modes
|
||||
|
||||
return currentMode, nil
|
||||
@ -260,3 +298,54 @@ func getName(o runtime.Object) string {
|
||||
}
|
||||
return accessor.GetName()
|
||||
}
|
||||
|
||||
const dataSyncerInterval = 60 * time.Minute
|
||||
|
||||
// StartPeriodicDataSyncer starts a background job that will execute the DataSyncer every 60 minutes
|
||||
func StartPeriodicDataSyncer(ctx context.Context, mode DualWriterMode, legacy LegacyStorage, storage Storage,
|
||||
kind string, reg prometheus.Registerer, serverLockService ServerLockService, requestInfo *request.RequestInfo) {
|
||||
klog.Info("Starting periodic data syncer for mode mode: ", mode)
|
||||
|
||||
// run in background
|
||||
go func() {
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
timeWindow := 600 // 600 seconds (10 minutes)
|
||||
jitterSeconds := r.Int63n(int64(timeWindow))
|
||||
klog.Info("data syncer is going to start at: ", time.Now().Add(time.Second*time.Duration(jitterSeconds)))
|
||||
time.Sleep(time.Second * time.Duration(jitterSeconds))
|
||||
|
||||
// run it immediately
|
||||
syncOK, err := runDataSyncer(ctx, mode, legacy, storage, kind, reg, serverLockService, requestInfo)
|
||||
klog.Info("data syncer finished, syncOK: ", syncOK, ", error: ", err)
|
||||
|
||||
ticker := time.NewTicker(dataSyncerInterval)
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
syncOK, err = runDataSyncer(ctx, mode, legacy, storage, kind, reg, serverLockService, requestInfo)
|
||||
klog.Info("data syncer finished, syncOK: ", syncOK, ", error: ", err)
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// runDataSyncer will ensure that data between legacy storage and unified storage are in sync.
|
||||
// The sync implementation depends on the DualWriter mode
|
||||
func runDataSyncer(ctx context.Context, mode DualWriterMode, legacy LegacyStorage, storage Storage,
|
||||
kind string, reg prometheus.Registerer, serverLockService ServerLockService, requestInfo *request.RequestInfo) (bool, error) {
|
||||
// ensure that execution takes no longer than necessary
|
||||
const timeout = dataSyncerInterval - time.Minute
|
||||
ctx, cancelFn := context.WithTimeout(ctx, timeout)
|
||||
defer cancelFn()
|
||||
|
||||
// implementation depends on the current DualWriter mode
|
||||
switch mode {
|
||||
case Mode2:
|
||||
return mode2DataSyncer(ctx, legacy, storage, kind, reg, serverLockService, requestInfo)
|
||||
default:
|
||||
klog.Info("data syncer not implemented for mode mode:", mode)
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
@ -3,17 +3,22 @@ package rest
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
"github.com/grafana/authlib/claims"
|
||||
"github.com/grafana/grafana/pkg/apimachinery/identity"
|
||||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||
)
|
||||
|
||||
@ -30,7 +35,9 @@ const mode2Str = "2"
|
||||
// NewDualWriterMode2 returns a new DualWriter in mode 2.
|
||||
// Mode 2 represents writing to LegacyStorage and Storage and reading from LegacyStorage.
|
||||
func newDualWriterMode2(legacy LegacyStorage, storage Storage, dwm *dualWriterMetrics, kind string) *DualWriterMode2 {
|
||||
return &DualWriterMode2{Legacy: legacy, Storage: storage, Log: klog.NewKlogr().WithName("DualWriterMode2").WithValues("mode", mode2Str, "kind", kind), dualWriterMetrics: dwm}
|
||||
return &DualWriterMode2{
|
||||
Legacy: legacy, Storage: storage, Log: klog.NewKlogr().WithName("DualWriterMode2").WithValues("mode", mode2Str, "kind", kind), dualWriterMetrics: dwm,
|
||||
}
|
||||
}
|
||||
|
||||
// Mode returns the mode of the dual writer.
|
||||
@ -394,3 +401,213 @@ func enrichLegacyObject(originalObj, returnedObj runtime.Object) error {
|
||||
accessorReturned.SetUID(accessorOriginal.GetUID())
|
||||
return nil
|
||||
}
|
||||
|
||||
func getSyncRequester(orgId int64) *identity.StaticRequester {
|
||||
return &identity.StaticRequester{
|
||||
Type: claims.TypeServiceAccount, // system:apiserver
|
||||
UserID: 1,
|
||||
OrgID: orgId,
|
||||
Name: "admin",
|
||||
Login: "admin",
|
||||
OrgRole: identity.RoleAdmin,
|
||||
IsGrafanaAdmin: true,
|
||||
Permissions: map[int64]map[string][]string{
|
||||
orgId: {
|
||||
"*": {"*"}, // all resources, all scopes
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
type syncItem struct {
|
||||
name string
|
||||
objStorage runtime.Object
|
||||
objLegacy runtime.Object
|
||||
}
|
||||
|
||||
func getList(ctx context.Context, obj rest.Lister, listOptions *metainternalversion.ListOptions) ([]runtime.Object, error) {
|
||||
ll, err := obj.List(ctx, listOptions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return meta.ExtractList(ll)
|
||||
}
|
||||
|
||||
func mode2DataSyncer(ctx context.Context, legacy LegacyStorage, storage Storage, kind string, reg prometheus.Registerer, serverLockService ServerLockService, requestInfo *request.RequestInfo) (bool, error) {
|
||||
metrics := &dualWriterMetrics{}
|
||||
metrics.init(reg)
|
||||
|
||||
log := klog.NewKlogr().WithName("DualWriterMode2Syncer")
|
||||
|
||||
everythingSynced := false
|
||||
outOfSync := 0
|
||||
syncSuccess := 0
|
||||
syncErr := 0
|
||||
|
||||
maxInterval := dataSyncerInterval + 5*time.Minute
|
||||
|
||||
var errSync error
|
||||
const maxRecordsSync = 1000
|
||||
|
||||
// LockExecuteAndRelease ensures that just a single Grafana server acquires a lock at a time
|
||||
// The parameter 'maxInterval' is a timeout safeguard, if the LastExecution in the
|
||||
// database is older than maxInterval, we will assume the lock as timeouted. The 'maxInterval' parameter should be so long
|
||||
// that is impossible for 2 processes to run at the same time.
|
||||
err := serverLockService.LockExecuteAndRelease(ctx, "dualwriter mode 2 sync", maxInterval, func(context.Context) {
|
||||
log.Info("starting dualwriter mode 2 sync")
|
||||
startSync := time.Now()
|
||||
|
||||
orgId := int64(1)
|
||||
|
||||
ctx = klog.NewContext(ctx, log)
|
||||
ctx = identity.WithRequester(ctx, getSyncRequester(orgId))
|
||||
ctx = request.WithNamespace(ctx, requestInfo.Namespace)
|
||||
ctx = request.WithRequestInfo(ctx, requestInfo)
|
||||
|
||||
storageList, err := getList(ctx, storage, &metainternalversion.ListOptions{
|
||||
Limit: maxRecordsSync,
|
||||
})
|
||||
if err != nil {
|
||||
log.Error(err, "unable to extract list from storage")
|
||||
return
|
||||
}
|
||||
|
||||
if len(storageList) >= maxRecordsSync {
|
||||
errSync = fmt.Errorf("unified storage has more than %d records. Aborting sync", maxRecordsSync)
|
||||
log.Error(errSync, "Unified storage has more records to be synced than allowed")
|
||||
return
|
||||
}
|
||||
|
||||
log.Info("got items from unified storage", "items", len(storageList))
|
||||
|
||||
legacyList, err := getList(ctx, legacy, &metainternalversion.ListOptions{})
|
||||
if err != nil {
|
||||
log.Error(err, "unable to extract list from legacy storage")
|
||||
return
|
||||
}
|
||||
log.Info("got items from legacy storage", "items", len(legacyList))
|
||||
|
||||
itemsByName := map[string]syncItem{}
|
||||
for _, obj := range legacyList {
|
||||
accessor, err := utils.MetaAccessor(obj)
|
||||
if err != nil {
|
||||
log.Error(err, "error retrieving accessor data for object from legacy storage")
|
||||
continue
|
||||
}
|
||||
name := accessor.GetName()
|
||||
|
||||
item, ok := itemsByName[name]
|
||||
if !ok {
|
||||
item = syncItem{}
|
||||
}
|
||||
item.name = name
|
||||
item.objLegacy = obj
|
||||
itemsByName[name] = item
|
||||
}
|
||||
|
||||
for _, obj := range storageList {
|
||||
accessor, err := utils.MetaAccessor(obj)
|
||||
if err != nil {
|
||||
log.Error(err, "error retrieving accessor data for object from storage")
|
||||
continue
|
||||
}
|
||||
name := accessor.GetName()
|
||||
|
||||
item, ok := itemsByName[name]
|
||||
if !ok {
|
||||
item = syncItem{}
|
||||
}
|
||||
item.name = name
|
||||
item.objStorage = obj
|
||||
itemsByName[name] = item
|
||||
}
|
||||
log.Info("got list of items to be synced", "items", len(itemsByName))
|
||||
|
||||
for name, item := range itemsByName {
|
||||
// upsert if:
|
||||
// - existing in both legacy and storage, but objects are different, or
|
||||
// - if it's missing from storage
|
||||
if item.objLegacy != nil &&
|
||||
((item.objStorage != nil && !Compare(item.objLegacy, item.objStorage)) || (item.objStorage == nil)) {
|
||||
outOfSync++
|
||||
|
||||
accessor, err := utils.MetaAccessor(item.objLegacy)
|
||||
if err != nil {
|
||||
log.Error(err, "error retrieving accessor data for object from storage")
|
||||
continue
|
||||
}
|
||||
|
||||
if item.objStorage != nil {
|
||||
accessorStorage, err := utils.MetaAccessor(item.objStorage)
|
||||
if err != nil {
|
||||
log.Error(err, "error retrieving accessor data for object from storage")
|
||||
continue
|
||||
}
|
||||
accessor.SetResourceVersion(accessorStorage.GetResourceVersion())
|
||||
accessor.SetUID(accessorStorage.GetUID())
|
||||
|
||||
log.Info("updating item on unified storage", "name", name)
|
||||
} else {
|
||||
accessor.SetResourceVersion("")
|
||||
accessor.SetUID("")
|
||||
|
||||
log.Info("inserting item on unified storage", "name", name)
|
||||
}
|
||||
|
||||
objInfo := rest.DefaultUpdatedObjectInfo(item.objLegacy, []rest.TransformFunc{}...)
|
||||
res, _, err := storage.Update(ctx,
|
||||
name,
|
||||
objInfo,
|
||||
func(ctx context.Context, obj runtime.Object) error { return nil },
|
||||
func(ctx context.Context, obj, old runtime.Object) error { return nil },
|
||||
true, // force creation
|
||||
&metav1.UpdateOptions{},
|
||||
)
|
||||
if err != nil {
|
||||
log.WithValues("object", res).Error(err, "could not update in storage")
|
||||
syncErr++
|
||||
} else {
|
||||
syncSuccess++
|
||||
}
|
||||
}
|
||||
|
||||
// delete if object does not exists on legacy but exists on storage
|
||||
if item.objLegacy == nil && item.objStorage != nil {
|
||||
outOfSync++
|
||||
|
||||
ctx = request.WithRequestInfo(ctx, &request.RequestInfo{
|
||||
APIGroup: requestInfo.APIGroup,
|
||||
Resource: requestInfo.Resource,
|
||||
Name: name,
|
||||
Namespace: requestInfo.Namespace,
|
||||
})
|
||||
|
||||
log.Info("deleting item from unified storage", "name", name)
|
||||
|
||||
deletedS, _, err := storage.Delete(ctx, name, func(ctx context.Context, obj runtime.Object) error { return nil }, &metav1.DeleteOptions{})
|
||||
if err != nil {
|
||||
if !apierrors.IsNotFound(err) {
|
||||
log.WithValues("objectList", deletedS).Error(err, "could not delete from storage")
|
||||
}
|
||||
syncErr++
|
||||
} else {
|
||||
syncSuccess++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
everythingSynced = outOfSync == syncSuccess
|
||||
|
||||
metrics.recordDataSyncerOutcome(mode2Str, kind, everythingSynced)
|
||||
metrics.recordDataSyncerDuration(err != nil, mode2Str, kind, startSync)
|
||||
|
||||
log.Info("finished syncing items", "items", len(itemsByName), "updated", syncSuccess, "failed", syncErr, "outcome", everythingSynced)
|
||||
})
|
||||
|
||||
if errSync != nil {
|
||||
err = errSync
|
||||
}
|
||||
|
||||
return everythingSynced, err
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
@ -15,6 +16,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apiserver/pkg/apis/example"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
)
|
||||
|
||||
var createFn = func(context.Context, runtime.Object) error { return nil }
|
||||
@ -607,3 +609,197 @@ func TestEnrichReturnedObject(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
var legacyObj1 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo1", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
||||
var legacyObj2 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo2", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
||||
var legacyObj3 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo3", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
||||
var legacyObj4 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo4", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
||||
|
||||
var legacyObj2WithHostname = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo2", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{Hostname: "hostname"}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
||||
|
||||
var storageObj1 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo1", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
||||
var storageObj2 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo2", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
||||
var storageObj3 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo3", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
||||
var storageObj4 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo4", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}}
|
||||
|
||||
var legacyListWith3items = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{},
|
||||
Items: []example.Pod{
|
||||
*legacyObj1,
|
||||
*legacyObj2,
|
||||
*legacyObj3,
|
||||
}}
|
||||
|
||||
var legacyListWith4items = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{},
|
||||
Items: []example.Pod{
|
||||
*legacyObj1,
|
||||
*legacyObj2,
|
||||
*legacyObj3,
|
||||
*legacyObj4,
|
||||
}}
|
||||
|
||||
var legacyListWith3itemsObj2IsDifferent = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{},
|
||||
Items: []example.Pod{
|
||||
*legacyObj1,
|
||||
*legacyObj2WithHostname,
|
||||
*legacyObj3,
|
||||
}}
|
||||
|
||||
var storageListWith3items = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{},
|
||||
Items: []example.Pod{
|
||||
*storageObj1,
|
||||
*storageObj2,
|
||||
*storageObj3,
|
||||
}}
|
||||
|
||||
var storageListWith4items = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{},
|
||||
Items: []example.Pod{
|
||||
*storageObj1,
|
||||
*storageObj2,
|
||||
*storageObj3,
|
||||
*storageObj4,
|
||||
}}
|
||||
|
||||
var storageListWith3itemsMissingFoo2 = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{},
|
||||
Items: []example.Pod{
|
||||
*storageObj1,
|
||||
*storageObj3,
|
||||
*storageObj4,
|
||||
}}
|
||||
|
||||
func TestMode2_DataSyncer(t *testing.T) {
|
||||
type testCase struct {
|
||||
setupLegacyFn func(m *mock.Mock)
|
||||
setupStorageFn func(m *mock.Mock)
|
||||
name string
|
||||
expectedOutcome bool
|
||||
wantErr bool
|
||||
}
|
||||
tests :=
|
||||
[]testCase{
|
||||
{
|
||||
name: "both stores are in sync",
|
||||
setupLegacyFn: func(m *mock.Mock) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil)
|
||||
},
|
||||
setupStorageFn: func(m *mock.Mock) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil)
|
||||
},
|
||||
expectedOutcome: true,
|
||||
},
|
||||
{
|
||||
name: "both stores are in sync - fail to list from legacy",
|
||||
setupLegacyFn: func(m *mock.Mock) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, errors.New("error"))
|
||||
},
|
||||
setupStorageFn: func(m *mock.Mock) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil)
|
||||
},
|
||||
expectedOutcome: false,
|
||||
},
|
||||
{
|
||||
name: "both stores are in sync - fail to list from storage",
|
||||
setupLegacyFn: func(m *mock.Mock) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil)
|
||||
},
|
||||
setupStorageFn: func(m *mock.Mock) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, errors.New("error"))
|
||||
},
|
||||
expectedOutcome: false,
|
||||
},
|
||||
{
|
||||
name: "storage is missing 1 entry (foo4)",
|
||||
setupLegacyFn: func(m *mock.Mock) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith4items, nil)
|
||||
},
|
||||
setupStorageFn: func(m *mock.Mock) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil)
|
||||
m.On("Update", mock.Anything, "foo4", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil)
|
||||
},
|
||||
expectedOutcome: true,
|
||||
},
|
||||
{
|
||||
name: "storage needs to be update (foo2 is different)",
|
||||
setupLegacyFn: func(m *mock.Mock) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3itemsObj2IsDifferent, nil)
|
||||
},
|
||||
setupStorageFn: func(m *mock.Mock) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil)
|
||||
m.On("Update", mock.Anything, "foo2", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil)
|
||||
},
|
||||
expectedOutcome: true,
|
||||
},
|
||||
{
|
||||
name: "storage is missing 1 entry (foo4) - fail to upsert",
|
||||
setupLegacyFn: func(m *mock.Mock) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith4items, nil)
|
||||
},
|
||||
setupStorageFn: func(m *mock.Mock) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil)
|
||||
m.On("Update", mock.Anything, "foo4", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, errors.New("error"))
|
||||
},
|
||||
expectedOutcome: false,
|
||||
},
|
||||
{
|
||||
name: "storage has an extra 1 entry (foo4)",
|
||||
setupLegacyFn: func(m *mock.Mock) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil)
|
||||
},
|
||||
setupStorageFn: func(m *mock.Mock) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(storageListWith4items, nil)
|
||||
m.On("Delete", mock.Anything, "foo4", mock.Anything, mock.Anything).Return(exampleObj, false, nil)
|
||||
},
|
||||
expectedOutcome: true,
|
||||
},
|
||||
{
|
||||
name: "storage has an extra 1 entry (foo4) - fail to delete",
|
||||
setupLegacyFn: func(m *mock.Mock) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil)
|
||||
},
|
||||
setupStorageFn: func(m *mock.Mock) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(storageListWith4items, nil)
|
||||
m.On("Delete", mock.Anything, "foo4", mock.Anything, mock.Anything).Return(exampleObj, false, errors.New("error"))
|
||||
},
|
||||
expectedOutcome: false,
|
||||
},
|
||||
{
|
||||
name: "storage is missing 1 entry (foo3) and has an extra 1 entry (foo4)",
|
||||
setupLegacyFn: func(m *mock.Mock) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil)
|
||||
},
|
||||
setupStorageFn: func(m *mock.Mock) {
|
||||
m.On("List", mock.Anything, mock.Anything).Return(storageListWith3itemsMissingFoo2, nil)
|
||||
m.On("Update", mock.Anything, "foo2", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil)
|
||||
m.On("Delete", mock.Anything, "foo4", mock.Anything, mock.Anything).Return(exampleObj, false, nil)
|
||||
},
|
||||
expectedOutcome: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
l := (LegacyStorage)(nil)
|
||||
s := (Storage)(nil)
|
||||
lm := &mock.Mock{}
|
||||
um := &mock.Mock{}
|
||||
|
||||
ls := legacyStoreMock{lm, l}
|
||||
us := storageMock{um, s}
|
||||
|
||||
if tt.setupLegacyFn != nil {
|
||||
tt.setupLegacyFn(lm)
|
||||
}
|
||||
if tt.setupStorageFn != nil {
|
||||
tt.setupStorageFn(um)
|
||||
}
|
||||
|
||||
outcome, err := mode2DataSyncer(context.Background(), ls, us, "test.kind", p, &fakeServerLock{}, &request.RequestInfo{})
|
||||
if tt.wantErr {
|
||||
assert.Error(t, err)
|
||||
return
|
||||
}
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, tt.expectedOutcome, outcome)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -6,12 +6,12 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apiserver/pkg/apis/example"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
)
|
||||
|
||||
func TestSetDualWritingMode(t *testing.T) {
|
||||
@ -43,13 +43,15 @@ func TestSetDualWritingMode(t *testing.T) {
|
||||
s := (Storage)(nil)
|
||||
m := &mock.Mock{}
|
||||
|
||||
m.On("List", mock.Anything, mock.Anything).Return(exampleList, nil)
|
||||
m.On("List", mock.Anything, mock.Anything).Return(anotherList, nil)
|
||||
|
||||
ls := legacyStoreMock{m, l}
|
||||
us := storageMock{m, s}
|
||||
|
||||
kvStore := &fakeNamespacedKV{data: make(map[string]string), namespace: "storage.dualwriting." + tt.stackID}
|
||||
|
||||
p := prometheus.NewRegistry()
|
||||
dwMode, err := SetDualWritingMode(context.Background(), kvStore, ls, us, "playlist.grafana.app/v0alpha1", tt.desiredMode, p)
|
||||
dwMode, err := SetDualWritingMode(context.Background(), kvStore, ls, us, "playlist.grafana.app/v0alpha1", tt.desiredMode, p, &fakeServerLock{}, &request.RequestInfo{})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, tt.expectedMode, dwMode)
|
||||
|
||||
@ -112,3 +114,12 @@ func (f *fakeNamespacedKV) Set(ctx context.Context, key, value string) error {
|
||||
f.data[f.namespace+key] = value
|
||||
return nil
|
||||
}
|
||||
|
||||
// Never lock in tests
|
||||
type fakeServerLock struct {
|
||||
}
|
||||
|
||||
func (f *fakeServerLock) LockExecuteAndRelease(ctx context.Context, actionName string, duration time.Duration, fn func(ctx context.Context)) error {
|
||||
fn(ctx)
|
||||
return nil
|
||||
}
|
||||
|
@ -9,9 +9,11 @@ import (
|
||||
)
|
||||
|
||||
type dualWriterMetrics struct {
|
||||
legacy *prometheus.HistogramVec
|
||||
storage *prometheus.HistogramVec
|
||||
outcome *prometheus.HistogramVec
|
||||
legacy *prometheus.HistogramVec
|
||||
storage *prometheus.HistogramVec
|
||||
outcome *prometheus.HistogramVec
|
||||
syncer *prometheus.HistogramVec
|
||||
syncerOutcome *prometheus.HistogramVec
|
||||
}
|
||||
|
||||
// DualWriterStorageDuration is a metric summary for dual writer storage duration per mode
|
||||
@ -38,15 +40,41 @@ var DualWriterOutcome = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
NativeHistogramBucketFactor: 1.1,
|
||||
}, []string{"mode", "name", "method"})
|
||||
|
||||
var DualWriterReadLegacyCounts = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "dual_writer_read_legacy_count",
|
||||
Help: "Histogram for the runtime of dual writer reads from legacy",
|
||||
Namespace: "grafana",
|
||||
}, []string{"kind", "method"})
|
||||
|
||||
// DualWriterSyncerDuration is a metric summary for dual writer sync duration per mode
|
||||
var DualWriterSyncerDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Name: "dual_writer_data_syncer_duration_seconds",
|
||||
Help: "Histogram for the runtime of dual writer data syncer duration per mode",
|
||||
Namespace: "grafana",
|
||||
NativeHistogramBucketFactor: 1.1,
|
||||
}, []string{"is_error", "mode", "kind"})
|
||||
|
||||
// DualWriterDataSyncerOutcome is a metric summary for dual writer data syncer outcome comparison between the 2 stores per mode
|
||||
var DualWriterDataSyncerOutcome = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Name: "dual_writer_data_syncer_outcome",
|
||||
Help: "Histogram for the runtime of dual writer data syncer outcome comparison between the 2 stores per mode",
|
||||
Namespace: "grafana",
|
||||
NativeHistogramBucketFactor: 1.1,
|
||||
}, []string{"mode", "kind"})
|
||||
|
||||
func (m *dualWriterMetrics) init(reg prometheus.Registerer) {
|
||||
log := klog.NewKlogr()
|
||||
m.legacy = DualWriterLegacyDuration
|
||||
m.storage = DualWriterStorageDuration
|
||||
m.outcome = DualWriterOutcome
|
||||
m.syncer = DualWriterSyncerDuration
|
||||
m.syncerOutcome = DualWriterDataSyncerOutcome
|
||||
errLegacy := reg.Register(m.legacy)
|
||||
errStorage := reg.Register(m.storage)
|
||||
errOutcome := reg.Register(m.outcome)
|
||||
if errLegacy != nil || errStorage != nil || errOutcome != nil {
|
||||
errSyncer := reg.Register(m.syncer)
|
||||
errSyncerOutcome := reg.Register(m.syncer)
|
||||
if errLegacy != nil || errStorage != nil || errOutcome != nil || errSyncer != nil || errSyncerOutcome != nil {
|
||||
log.Info("cloud migration metrics already registered")
|
||||
}
|
||||
}
|
||||
@ -68,3 +96,16 @@ func (m *dualWriterMetrics) recordOutcome(mode string, name string, areEqual boo
|
||||
}
|
||||
m.outcome.WithLabelValues(mode, name, method).Observe(observeValue)
|
||||
}
|
||||
|
||||
func (m *dualWriterMetrics) recordDataSyncerDuration(isError bool, mode string, kind string, startFrom time.Time) {
|
||||
duration := time.Since(startFrom).Seconds()
|
||||
m.syncer.WithLabelValues(strconv.FormatBool(isError), mode, kind).Observe(duration)
|
||||
}
|
||||
|
||||
func (m *dualWriterMetrics) recordDataSyncerOutcome(mode string, kind string, synced bool) {
|
||||
var observeValue float64
|
||||
if !synced {
|
||||
observeValue = 1
|
||||
}
|
||||
m.syncerOutcome.WithLabelValues(mode, kind).Observe(observeValue)
|
||||
}
|
||||
|
@ -167,7 +167,7 @@ func (o *APIServerOptions) RunAPIServer(ctx context.Context, config *genericapis
|
||||
// Install the API Group+version
|
||||
// #TODO figure out how to configure storage type in o.Options.StorageOptions
|
||||
err = builder.InstallAPIs(grafanaAPIServer.Scheme, grafanaAPIServer.Codecs, server, config.RESTOptionsGetter, o.builders, o.Options.StorageOptions,
|
||||
o.Options.MetricsOptions.MetricsRegisterer, nil, nil, // no need for server lock in standalone
|
||||
o.Options.MetricsOptions.MetricsRegisterer, nil, nil, nil, // no need for server lock in standalone
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -14,6 +14,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
|
||||
k8srequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/registry/generic"
|
||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||
"k8s.io/apiserver/pkg/util/openapi"
|
||||
@ -26,6 +27,7 @@ import (
|
||||
|
||||
"github.com/grafana/grafana/pkg/apiserver/endpoints/filters"
|
||||
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/options"
|
||||
)
|
||||
|
||||
@ -132,6 +134,15 @@ type ServerLockService interface {
|
||||
LockExecuteAndRelease(ctx context.Context, actionName string, maxInterval time.Duration, fn func(ctx context.Context)) error
|
||||
}
|
||||
|
||||
func getRequestInfo(gr schema.GroupResource, namespaceMapper request.NamespaceMapper) *k8srequest.RequestInfo {
|
||||
return &k8srequest.RequestInfo{
|
||||
APIGroup: gr.Group,
|
||||
Resource: gr.Resource,
|
||||
Name: "",
|
||||
Namespace: namespaceMapper(int64(1)),
|
||||
}
|
||||
}
|
||||
|
||||
func InstallAPIs(
|
||||
scheme *runtime.Scheme,
|
||||
codecs serializer.CodecFactory,
|
||||
@ -140,6 +151,7 @@ func InstallAPIs(
|
||||
builders []APIGroupBuilder,
|
||||
storageOpts *options.StorageOptions,
|
||||
reg prometheus.Registerer,
|
||||
namespaceMapper request.NamespaceMapper,
|
||||
kvStore grafanarest.NamespacedKVStore,
|
||||
serverLock ServerLockService,
|
||||
) error {
|
||||
@ -155,9 +167,12 @@ func InstallAPIs(
|
||||
// when missing this will default to mode zero (legacy only)
|
||||
mode := storageOpts.DualWriterDesiredModes[key]
|
||||
|
||||
// TODO: inherited context from main Grafana process
|
||||
ctx := context.Background()
|
||||
|
||||
// Moving from one version to the next can only happen after the previous step has
|
||||
// successfully synchronized.
|
||||
currentMode, err := grafanarest.SetDualWritingMode(context.Background(), kvStore, legacy, storage, key, mode, reg)
|
||||
currentMode, err := grafanarest.SetDualWritingMode(ctx, kvStore, legacy, storage, key, mode, reg, serverLock, getRequestInfo(gr, namespaceMapper))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -168,6 +183,10 @@ func InstallAPIs(
|
||||
return storage, nil
|
||||
default:
|
||||
}
|
||||
|
||||
if storageOpts.DualWriterDataSyncJobEnabled[key] {
|
||||
grafanarest.StartPeriodicDataSyncer(ctx, currentMode, legacy, storage, key, reg, serverLock, getRequestInfo(gr, namespaceMapper))
|
||||
}
|
||||
return grafanarest.NewDualWriter(currentMode, legacy, storage, reg, key), nil
|
||||
}
|
||||
}
|
||||
|
@ -60,6 +60,11 @@ func applyGrafanaConfig(cfg *setting.Cfg, features featuremgmt.FeatureToggles, o
|
||||
playlist.GROUPRESOURCE: 2,
|
||||
}
|
||||
|
||||
o.StorageOptions.DualWriterDataSyncJobEnabled = map[string]bool{
|
||||
// TODO: This will be enabled later, when we get a dedicated config section for unified_storage
|
||||
// playlist.RESOURCE + "." + playlist.GROUP: true,
|
||||
}
|
||||
|
||||
// TODO: ensure backwards compatibility with production
|
||||
// remove this after changing the unified_storage_mode key format in HGAPI
|
||||
o.StorageOptions.DualWriterDesiredModes[playlist.RESOURCE+"."+playlist.GROUP] = o.StorageOptions.DualWriterDesiredModes[playlist.GROUPRESOURCE]
|
||||
|
@ -22,10 +22,11 @@ const (
|
||||
)
|
||||
|
||||
type StorageOptions struct {
|
||||
StorageType StorageType
|
||||
DataPath string
|
||||
Address string
|
||||
DualWriterDesiredModes map[string]grafanarest.DualWriterMode
|
||||
StorageType StorageType
|
||||
DataPath string
|
||||
Address string
|
||||
DualWriterDesiredModes map[string]grafanarest.DualWriterMode
|
||||
DualWriterDataSyncJobEnabled map[string]bool
|
||||
}
|
||||
|
||||
func NewStorageOptions() *StorageOptions {
|
||||
|
@ -145,21 +145,22 @@ func ProvideService(
|
||||
pluginStore pluginstore.Store,
|
||||
) (*service, error) {
|
||||
s := &service{
|
||||
cfg: cfg,
|
||||
features: features,
|
||||
rr: rr,
|
||||
startedCh: make(chan struct{}),
|
||||
stopCh: make(chan struct{}),
|
||||
builders: []builder.APIGroupBuilder{},
|
||||
authorizer: authorizer.NewGrafanaAuthorizer(cfg, orgService),
|
||||
tracing: tracing,
|
||||
db: db, // For Unified storage
|
||||
metrics: metrics.ProvideRegisterer(),
|
||||
kvStore: kvStore,
|
||||
pluginClient: pluginClient,
|
||||
datasources: datasources,
|
||||
contextProvider: contextProvider,
|
||||
pluginStore: pluginStore,
|
||||
cfg: cfg,
|
||||
features: features,
|
||||
rr: rr,
|
||||
startedCh: make(chan struct{}),
|
||||
stopCh: make(chan struct{}),
|
||||
builders: []builder.APIGroupBuilder{},
|
||||
authorizer: authorizer.NewGrafanaAuthorizer(cfg, orgService),
|
||||
tracing: tracing,
|
||||
db: db, // For Unified storage
|
||||
metrics: metrics.ProvideRegisterer(),
|
||||
kvStore: kvStore,
|
||||
pluginClient: pluginClient,
|
||||
datasources: datasources,
|
||||
contextProvider: contextProvider,
|
||||
pluginStore: pluginStore,
|
||||
serverLockService: serverLockService,
|
||||
}
|
||||
|
||||
// This will be used when running as a dskit service
|
||||
@ -341,7 +342,7 @@ func (s *service) start(ctx context.Context) error {
|
||||
// Install the API group+version
|
||||
err = builder.InstallAPIs(Scheme, Codecs, server, serverConfig.RESTOptionsGetter, builders, o.StorageOptions,
|
||||
// Required for the dual writer initialization
|
||||
s.metrics, kvstore.WithNamespace(s.kvStore, 0, "storage.dualwriting"), s.serverLockService,
|
||||
s.metrics, request.GetNamespaceMapper(s.cfg), kvstore.WithNamespace(s.kvStore, 0, "storage.dualwriting"), s.serverLockService,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
|
Loading…
Reference in New Issue
Block a user