refactor(unified-storage): make datasyncer configurable (#97832)

This commit is contained in:
Jean-Philippe Quéméner 2024-12-12 21:41:01 +01:00 committed by GitHub
parent 6dc41f0b18
commit 1509fadd5b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 216 additions and 78 deletions

View File

@ -12,7 +12,6 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/klog/v2"
)
@ -149,16 +148,13 @@ type ServerLockService interface {
func SetDualWritingMode(
ctx context.Context,
kvs NamespacedKVStore,
legacy LegacyStorage,
storage Storage,
entity string,
desiredMode DualWriterMode,
reg prometheus.Registerer,
serverLockService ServerLockService,
requestInfo *request.RequestInfo,
cfg *SyncerConfig,
) (DualWriterMode, error) {
if cfg == nil {
return Mode0, errors.New("syncer config is nil")
}
// Mode0 means no DualWriter
if desiredMode == Mode0 {
if cfg.Mode == Mode0 {
return Mode0, nil
}
@ -174,58 +170,66 @@ func SetDualWritingMode(
errDualWriterSetCurrentMode := errors.New("failed to set current dual writing mode")
// Use entity name as key
m, ok, err := kvs.Get(ctx, entity)
m, ok, err := kvs.Get(ctx, cfg.Kind)
if err != nil {
return Mode0, errors.New("failed to fetch current dual writing mode")
}
currentMode, valid := toMode[m]
currentMode, exists := toMode[m]
if !valid && ok {
// If the mode does not exist in our mapping, we log an error.
if !exists && ok {
// Only log if "ok" because initially all instances will have mode unset for playlists.
klog.Infof("invalid dual writing mode for %s mode: %v", entity, m)
klog.Infof("invalid dual writing mode for %s mode: %v", cfg.Kind, m)
}
if !valid || !ok {
// If the mode does not exist in our mapping, and we also didn't find an entry for this kind, fallback.
if !exists || !ok {
// Default to mode 1
currentMode = Mode1
err := kvs.Set(ctx, entity, fmt.Sprint(currentMode))
if err != nil {
if err := kvs.Set(ctx, cfg.Kind, fmt.Sprint(currentMode)); err != nil {
return Mode0, errDualWriterSetCurrentMode
}
}
// Handle transitions to the desired mode.
switch {
case desiredMode == Mode2 || desiredMode == Mode1:
currentMode = desiredMode
err := kvs.Set(ctx, entity, fmt.Sprint(currentMode))
if err != nil {
case cfg.Mode == Mode2 || cfg.Mode == Mode1:
// Directly set the mode for Mode1 and Mode2.
currentMode = cfg.Mode
if err := kvs.Set(ctx, cfg.Kind, fmt.Sprint(currentMode)); err != nil {
return Mode0, errDualWriterSetCurrentMode
}
case desiredMode >= Mode3 && currentMode < Mode3:
syncOk, err := runDataSyncer(ctx, currentMode, legacy, storage, entity, reg, serverLockService, requestInfo)
case cfg.Mode >= Mode3 && currentMode < Mode3:
// Transitioning to Mode3 or higher requires data synchronization.
cfgModeTmp := cfg.Mode
// Before running the sync, set the syncer config to the current mode, as we have to run the syncer
// once in the current active mode before we can upgrade.
cfg.Mode = currentMode
syncOk, err := runDataSyncer(ctx, cfg)
// Once we are done with running the syncer, we can change the mode back on the config to the desired one.
cfg.Mode = cfgModeTmp
if err != nil {
klog.Info("data syncer failed for mode:", m)
return currentMode, err
return Mode0, err
}
if !syncOk {
klog.Info("data syncer not ok for mode:", m)
return currentMode, nil
}
err = kvs.Set(ctx, entity, fmt.Sprint(desiredMode))
if err != nil {
return currentMode, errDualWriterSetCurrentMode
// If sync is successful, update the mode to the desired one.
if err := kvs.Set(ctx, cfg.Kind, fmt.Sprint(cfg.Mode)); err != nil {
return Mode0, errDualWriterSetCurrentMode
}
return desiredMode, nil
case desiredMode >= Mode3 && currentMode >= Mode3:
currentMode = desiredMode
err := kvs.Set(ctx, entity, fmt.Sprint(currentMode))
if err != nil {
return currentMode, errDualWriterSetCurrentMode
return cfg.Mode, nil
case cfg.Mode >= Mode3 && currentMode >= Mode3:
// If already in Mode3 or higher, simply update to the desired mode.
currentMode = cfg.Mode
if err := kvs.Set(ctx, cfg.Kind, fmt.Sprint(currentMode)); err != nil {
return Mode0, errDualWriterSetCurrentMode
}
default:
// Handle any unexpected cases (should not normally happen).
return Mode0, errDualWriterSetCurrentMode
}
return currentMode, nil

View File

@ -17,6 +17,7 @@ import (
"k8s.io/klog/v2"
"github.com/grafana/authlib/claims"
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/apimachinery/utils"
)
@ -29,12 +30,60 @@ type syncItem struct {
accessorLegacy utils.GrafanaMetaAccessor
}
const dataSyncerInterval = 60 * time.Minute
type SyncerConfig struct {
Kind string
RequestInfo *request.RequestInfo
// StartPeriodicDataSyncer starts a background job that will execute the DataSyncer every 60 minutes
func StartPeriodicDataSyncer(ctx context.Context, mode DualWriterMode, legacy LegacyStorage, storage Storage,
kind string, reg prometheus.Registerer, serverLockService ServerLockService, requestInfo *request.RequestInfo) {
log := klog.NewKlogr().WithName("legacyToUnifiedStorageDataSyncer").WithValues("mode", mode, "resource", kind)
Mode DualWriterMode
LegacyStorage LegacyStorage
Storage Storage
ServerLockService ServerLockService
DataSyncerInterval time.Duration
DataSyncerRecordsLimit int
Reg prometheus.Registerer
}
func (s *SyncerConfig) Validate() error {
if s == nil {
return fmt.Errorf("syncer config is nil")
}
if s.Kind == "" {
return fmt.Errorf("kind must be specified")
}
if s.RequestInfo == nil {
return fmt.Errorf("requestInfo must be specified")
}
if s.ServerLockService == nil {
return fmt.Errorf("serverLockService must be specified")
}
if s.Storage == nil {
return fmt.Errorf("storage must be specified")
}
if s.LegacyStorage == nil {
return fmt.Errorf("legacy storage must be specified")
}
if s.DataSyncerInterval == 0 {
s.DataSyncerInterval = time.Hour
}
if s.DataSyncerRecordsLimit == 0 {
s.DataSyncerRecordsLimit = 1000
}
if s.Reg == nil {
s.Reg = prometheus.DefaultRegisterer
}
return nil
}
// StartPeriodicDataSyncer starts a background job that will execute the DataSyncer, syncing the data
// from the hosted grafana backend into the unified storage backend. This is run in the grafana instance.
func StartPeriodicDataSyncer(ctx context.Context, cfg *SyncerConfig) error {
if err := cfg.Validate(); err != nil {
return fmt.Errorf("invalid syncer config: %w", err)
}
log := klog.NewKlogr().WithName("legacyToUnifiedStorageDataSyncer").WithValues("mode", cfg.Mode, "resource", cfg.Kind)
log.Info("Starting periodic data syncer")
@ -47,62 +96,67 @@ func StartPeriodicDataSyncer(ctx context.Context, mode DualWriterMode, legacy Le
time.Sleep(time.Second * time.Duration(jitterSeconds))
// run it immediately
syncOK, err := runDataSyncer(ctx, mode, legacy, storage, kind, reg, serverLockService, requestInfo)
syncOK, err := runDataSyncer(ctx, cfg)
log.Info("data syncer finished", "syncOK", syncOK, "error", err)
ticker := time.NewTicker(dataSyncerInterval)
ticker := time.NewTicker(cfg.DataSyncerInterval)
for {
select {
case <-ticker.C:
syncOK, err = runDataSyncer(ctx, mode, legacy, storage, kind, reg, serverLockService, requestInfo)
syncOK, err = runDataSyncer(ctx, cfg)
log.Info("data syncer finished", "syncOK", syncOK, ", error", err)
case <-ctx.Done():
return
}
}
}()
return nil
}
// runDataSyncer will ensure that data between legacy storage and unified storage are in sync.
// The sync implementation depends on the DualWriter mode
func runDataSyncer(ctx context.Context, mode DualWriterMode, legacy LegacyStorage, storage Storage,
kind string, reg prometheus.Registerer, serverLockService ServerLockService, requestInfo *request.RequestInfo) (bool, error) {
func runDataSyncer(ctx context.Context, cfg *SyncerConfig) (bool, error) {
if err := cfg.Validate(); err != nil {
return false, fmt.Errorf("invalid syncer config: %w", err)
}
// ensure that execution takes no longer than necessary
const timeout = dataSyncerInterval - time.Minute
timeout := cfg.DataSyncerInterval - time.Minute
ctx, cancelFn := context.WithTimeout(ctx, timeout)
defer cancelFn()
// implementation depends on the current DualWriter mode
switch mode {
switch cfg.Mode {
case Mode1, Mode2:
return legacyToUnifiedStorageDataSyncer(ctx, mode, legacy, storage, kind, reg, serverLockService, requestInfo)
return legacyToUnifiedStorageDataSyncer(ctx, cfg)
default:
klog.Info("data syncer not implemented for mode mode:", mode)
klog.Info("data syncer not implemented for mode:", cfg.Mode)
return false, nil
}
}
func legacyToUnifiedStorageDataSyncer(ctx context.Context, mode DualWriterMode, legacy LegacyStorage, storage Storage, resource string, reg prometheus.Registerer, serverLockService ServerLockService, requestInfo *request.RequestInfo) (bool, error) {
func legacyToUnifiedStorageDataSyncer(ctx context.Context, cfg *SyncerConfig) (bool, error) {
if err := cfg.Validate(); err != nil {
return false, fmt.Errorf("invalid syncer config: %w", err)
}
metrics := &dualWriterMetrics{}
metrics.init(reg)
metrics.init(cfg.Reg)
log := klog.NewKlogr().WithName("legacyToUnifiedStorageDataSyncer").WithValues("mode", mode, "resource", resource)
log := klog.NewKlogr().WithName("legacyToUnifiedStorageDataSyncer").WithValues("mode", cfg.Mode, "resource", cfg.Kind)
everythingSynced := false
outOfSync := 0
syncSuccess := 0
syncErr := 0
maxInterval := dataSyncerInterval + 5*time.Minute
maxInterval := cfg.DataSyncerInterval + 5*time.Minute
var errSync error
const maxRecordsSync = 1000
// LockExecuteAndRelease ensures that just a single Grafana server acquires a lock at a time
// The parameter 'maxInterval' is a timeout safeguard, if the LastExecution in the
// database is older than maxInterval, we will assume the lock as timeouted. The 'maxInterval' parameter should be so long
// that is impossible for 2 processes to run at the same time.
err := serverLockService.LockExecuteAndRelease(ctx, fmt.Sprintf("legacyToUnifiedStorageDataSyncer-%d-%s", mode, resource), maxInterval, func(context.Context) {
err := cfg.ServerLockService.LockExecuteAndRelease(ctx, fmt.Sprintf("legacyToUnifiedStorageDataSyncer-%d-%s", cfg.Mode, cfg.Kind), maxInterval, func(context.Context) {
log.Info("starting legacyToUnifiedStorageDataSyncer")
startSync := time.Now()
@ -110,26 +164,26 @@ func legacyToUnifiedStorageDataSyncer(ctx context.Context, mode DualWriterMode,
ctx = klog.NewContext(ctx, log)
ctx = identity.WithRequester(ctx, getSyncRequester(orgId))
ctx = request.WithNamespace(ctx, requestInfo.Namespace)
ctx = request.WithRequestInfo(ctx, requestInfo)
ctx = request.WithNamespace(ctx, cfg.RequestInfo.Namespace)
ctx = request.WithRequestInfo(ctx, cfg.RequestInfo)
storageList, err := getList(ctx, storage, &metainternalversion.ListOptions{
Limit: maxRecordsSync,
storageList, err := getList(ctx, cfg.Storage, &metainternalversion.ListOptions{
Limit: int64(cfg.DataSyncerRecordsLimit),
})
if err != nil {
log.Error(err, "unable to extract list from storage")
return
}
if len(storageList) >= maxRecordsSync {
errSync = fmt.Errorf("unified storage has more than %d records. Aborting sync", maxRecordsSync)
if len(storageList) >= cfg.DataSyncerRecordsLimit {
errSync = fmt.Errorf("unified storage has more than %d records. Aborting sync", cfg.DataSyncerRecordsLimit)
log.Error(errSync, "Unified storage has more records to be synced than allowed")
return
}
log.Info("got items from unified storage", "items", len(storageList))
legacyList, err := getList(ctx, legacy, &metainternalversion.ListOptions{})
legacyList, err := getList(ctx, cfg.LegacyStorage, &metainternalversion.ListOptions{})
if err != nil {
log.Error(err, "unable to extract list from legacy storage")
return
@ -189,7 +243,7 @@ func legacyToUnifiedStorageDataSyncer(ctx context.Context, mode DualWriterMode,
}
objInfo := rest.DefaultUpdatedObjectInfo(item.objLegacy, []rest.TransformFunc{}...)
res, _, err := storage.Update(ctx,
res, _, err := cfg.Storage.Update(ctx,
name,
objInfo,
func(ctx context.Context, obj runtime.Object) error { return nil },
@ -210,15 +264,15 @@ func legacyToUnifiedStorageDataSyncer(ctx context.Context, mode DualWriterMode,
outOfSync++
ctx = request.WithRequestInfo(ctx, &request.RequestInfo{
APIGroup: requestInfo.APIGroup,
Resource: requestInfo.Resource,
APIGroup: cfg.RequestInfo.APIGroup,
Resource: cfg.RequestInfo.Resource,
Name: name,
Namespace: requestInfo.Namespace,
Namespace: cfg.RequestInfo.Namespace,
})
log.Info("deleting item from unified storage", "name", name)
deletedS, _, err := storage.Delete(ctx, name, func(ctx context.Context, obj runtime.Object) error { return nil }, &metav1.DeleteOptions{})
deletedS, _, err := cfg.Storage.Delete(ctx, name, func(ctx context.Context, obj runtime.Object) error { return nil }, &metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
log.WithValues("objectList", deletedS).Error(err, "could not delete from storage")
syncErr++
@ -231,8 +285,8 @@ func legacyToUnifiedStorageDataSyncer(ctx context.Context, mode DualWriterMode,
everythingSynced = outOfSync == syncSuccess
metrics.recordDataSyncerOutcome(mode, resource, everythingSynced)
metrics.recordDataSyncerDuration(err != nil, mode, resource, startSync)
metrics.recordDataSyncerOutcome(cfg.Mode, cfg.Kind, everythingSynced)
metrics.recordDataSyncerDuration(err != nil, cfg.Mode, cfg.Kind, startSync)
log.Info("finished syncing items", "items", len(itemsByName), "updated", syncSuccess, "failed", syncErr, "outcome", everythingSynced)
})

View File

@ -196,7 +196,18 @@ func TestLegacyToUnifiedStorage_DataSyncer(t *testing.T) {
tt.setupStorageFn(um)
}
outcome, err := legacyToUnifiedStorageDataSyncer(context.Background(), Mode1, ls, us, "test.kind", p, &fakeServerLock{}, &request.RequestInfo{})
outcome, err := legacyToUnifiedStorageDataSyncer(context.Background(), &SyncerConfig{
Mode: Mode1,
LegacyStorage: ls,
Storage: us,
Kind: "test.kind",
Reg: p,
ServerLockService: &fakeServerLock{},
RequestInfo: &request.RequestInfo{},
DataSyncerRecordsLimit: 1000,
DataSyncerInterval: time.Hour,
})
if tt.wantErr {
assert.Error(t, err)
return
@ -225,7 +236,18 @@ func TestLegacyToUnifiedStorage_DataSyncer(t *testing.T) {
tt.setupStorageFn(um)
}
outcome, err := legacyToUnifiedStorageDataSyncer(context.Background(), Mode2, ls, us, "test.kind", p, &fakeServerLock{}, &request.RequestInfo{})
outcome, err := legacyToUnifiedStorageDataSyncer(context.Background(), &SyncerConfig{
Mode: Mode2,
LegacyStorage: ls,
Storage: us,
Kind: "test.kind",
Reg: p,
ServerLockService: &fakeServerLock{},
RequestInfo: &request.RequestInfo{},
DataSyncerRecordsLimit: 1000,
DataSyncerInterval: time.Hour,
})
if tt.wantErr {
assert.Error(t, err)
return

View File

@ -65,7 +65,18 @@ func TestSetDualWritingMode(t *testing.T) {
ls := legacyStoreMock{m, l}
us := storageMock{m, s}
dwMode, err := SetDualWritingMode(context.Background(), tt.kvStore, ls, us, "playlist.grafana.app/playlists", tt.desiredMode, p, &fakeServerLock{}, &request.RequestInfo{})
dwMode, err := SetDualWritingMode(context.Background(), tt.kvStore, &SyncerConfig{
LegacyStorage: ls,
Storage: us,
Kind: "playlist.grafana.app/playlists",
Mode: tt.desiredMode,
ServerLockService: &fakeServerLock{},
RequestInfo: &request.RequestInfo{},
Reg: p,
DataSyncerRecordsLimit: 1000,
DataSyncerInterval: time.Hour,
})
assert.NoError(t, err)
assert.Equal(t, tt.expectedMode, dwMode)
}

View File

@ -186,12 +186,18 @@ func InstallAPIs(
// when missing this will default to mode zero (legacy only)
var mode = grafanarest.DualWriterMode(0)
var dualWriterPeriodicDataSyncJobEnabled bool
var (
dualWriterPeriodicDataSyncJobEnabled bool
dataSyncerInterval = time.Hour
dataSyncerRecordsLimit = 1000
)
resourceConfig, resourceExists := storageOpts.UnifiedStorageConfig[key]
if resourceExists {
mode = resourceConfig.DualWriterMode
dualWriterPeriodicDataSyncJobEnabled = resourceConfig.DualWriterPeriodicDataSyncJobEnabled
dataSyncerInterval = resourceConfig.DataSyncerInterval
dataSyncerRecordsLimit = resourceConfig.DataSyncerRecordsLimit
}
// Force using storage only -- regardless of internal synchronization state
@ -205,7 +211,21 @@ func InstallAPIs(
// Moving from one version to the next can only happen after the previous step has
// successfully synchronized.
requestInfo := getRequestInfo(gr, namespaceMapper)
currentMode, err := grafanarest.SetDualWritingMode(ctx, kvStore, legacy, storage, key, mode, reg, serverLock, requestInfo)
syncerCfg := &grafanarest.SyncerConfig{
Kind: key,
RequestInfo: requestInfo,
Mode: mode,
LegacyStorage: legacy,
Storage: storage,
ServerLockService: serverLock,
DataSyncerInterval: dataSyncerInterval,
DataSyncerRecordsLimit: dataSyncerRecordsLimit,
Reg: reg,
}
// This also sets the currentMode on the syncer config.
currentMode, err := grafanarest.SetDualWritingMode(ctx, kvStore, syncerCfg)
if err != nil {
return nil, err
}
@ -216,9 +236,12 @@ func InstallAPIs(
return storage, nil
default:
}
if dualWriterPeriodicDataSyncJobEnabled {
grafanarest.StartPeriodicDataSyncer(ctx, currentMode, legacy, storage, key, reg, serverLock, requestInfo)
// The mode might have changed in SetDualWritingMode, so apply current mode first.
syncerCfg.Mode = currentMode
if err := grafanarest.StartPeriodicDataSyncer(ctx, syncerCfg); err != nil {
return nil, err
}
}
// when unable to use

View File

@ -4,11 +4,12 @@ import (
"fmt"
"net"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/setting"
"github.com/spf13/pflag"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/options"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/setting"
)
type StorageType string
@ -21,7 +22,8 @@ const (
StorageTypeUnifiedGrpc StorageType = "unified-grpc"
)
type StorageOptions struct { // The desired storage type
type StorageOptions struct {
// The desired storage type
StorageType StorageType
// For unified-grpc, the address is required

View File

@ -538,6 +538,10 @@ type Cfg struct {
type UnifiedStorageConfig struct {
DualWriterMode rest.DualWriterMode
DualWriterPeriodicDataSyncJobEnabled bool
// DataSyncerInterval defines how often the data syncer should run for a resource on the grafana instance.
DataSyncerInterval time.Duration
// DataSyncerRecordsLimit defines how many records will be processed at max during a sync invocation.
DataSyncerRecordsLimit int
}
type InstallPlugin struct {

View File

@ -2,6 +2,7 @@ package setting
import (
"strings"
"time"
"github.com/grafana/grafana/pkg/apiserver/rest"
)
@ -29,9 +30,17 @@ func (cfg *Cfg) setUnifiedStorageConfig() {
// parse dualWriter periodic data syncer config
dualWriterPeriodicDataSyncJobEnabled := section.Key("dualWriterPeriodicDataSyncJobEnabled").MustBool(false)
// parse dataSyncerRecordsLimit from resource section
dataSyncerRecordsLimit := section.Key("dataSyncerRecordsLimit").MustInt(1000)
// parse dataSyncerInterval from resource section
dataSyncerInterval := section.Key("dataSyncerInterval").MustDuration(time.Hour)
storageConfig[resourceName] = UnifiedStorageConfig{
DualWriterMode: rest.DualWriterMode(dualWriterMode),
DualWriterPeriodicDataSyncJobEnabled: dualWriterPeriodicDataSyncJobEnabled,
DataSyncerRecordsLimit: dataSyncerRecordsLimit,
DataSyncerInterval: dataSyncerInterval,
}
}
cfg.UnifiedStorage = storageConfig

View File

@ -2,6 +2,7 @@ package setting
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
@ -21,6 +22,12 @@ func TestCfg_setUnifiedStorageConfig(t *testing.T) {
_, err = s.NewKey("dualWriterPeriodicDataSyncJobEnabled", "true")
assert.NoError(t, err)
_, err = s.NewKey("dataSyncerRecordsLimit", "1001")
assert.NoError(t, err)
_, err = s.NewKey("dataSyncerInterval", "10m")
assert.NoError(t, err)
cfg.setUnifiedStorageConfig()
value, exists := cfg.UnifiedStorage["playlists.playlist.grafana.app"]
@ -29,6 +36,8 @@ func TestCfg_setUnifiedStorageConfig(t *testing.T) {
assert.Equal(t, value, UnifiedStorageConfig{
DualWriterMode: 2,
DualWriterPeriodicDataSyncJobEnabled: true,
DataSyncerRecordsLimit: 1001,
DataSyncerInterval: time.Minute * 10,
})
})
}