Pass resource name into the dual writer initialization (#92654)

This commit is contained in:
Leonor Oliveira 2024-08-29 12:30:48 +01:00 committed by GitHub
parent 82417c916f
commit 66e0121dd5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 117 additions and 95 deletions

View File

@ -110,7 +110,7 @@ func NewDualWriter(
legacy LegacyStorage, legacy LegacyStorage,
storage Storage, storage Storage,
reg prometheus.Registerer, reg prometheus.Registerer,
kind string, resource string,
) DualWriter { ) DualWriter {
metrics := &dualWriterMetrics{} metrics := &dualWriterMetrics{}
metrics.init(reg) metrics.init(reg)
@ -119,18 +119,18 @@ func NewDualWriter(
// writing to legacy storage without `unifiedStorage` enabled. // writing to legacy storage without `unifiedStorage` enabled.
case Mode1: case Mode1:
// read and write only from legacy storage // read and write only from legacy storage
return newDualWriterMode1(legacy, storage, metrics, kind) return newDualWriterMode1(legacy, storage, metrics, resource)
case Mode2: case Mode2:
// write to both, read from storage but use legacy as backup // write to both, read from storage but use legacy as backup
return newDualWriterMode2(legacy, storage, metrics, kind) return newDualWriterMode2(legacy, storage, metrics, resource)
case Mode3: case Mode3:
// write to both, read from storage only // write to both, read from storage only
return newDualWriterMode3(legacy, storage, metrics, kind) return newDualWriterMode3(legacy, storage, metrics, resource)
case Mode4: case Mode4:
// read and write only from storage // read and write only from storage
return newDualWriterMode4(legacy, storage, metrics, kind) return newDualWriterMode4(legacy, storage, metrics, resource)
default: default:
return newDualWriterMode1(legacy, storage, metrics, kind) return newDualWriterMode1(legacy, storage, metrics, resource)
} }
} }

View File

@ -18,7 +18,7 @@ type DualWriterMode1 struct {
Legacy LegacyStorage Legacy LegacyStorage
Storage Storage Storage Storage
*dualWriterMetrics *dualWriterMetrics
kind string resource string
Log klog.Logger Log klog.Logger
} }
@ -26,8 +26,14 @@ const mode1Str = "1"
// NewDualWriterMode1 returns a new DualWriter in mode 1. // NewDualWriterMode1 returns a new DualWriter in mode 1.
// Mode 1 represents writing to and reading from LegacyStorage. // Mode 1 represents writing to and reading from LegacyStorage.
func newDualWriterMode1(legacy LegacyStorage, storage Storage, dwm *dualWriterMetrics, kind string) *DualWriterMode1 { func newDualWriterMode1(legacy LegacyStorage, storage Storage, dwm *dualWriterMetrics, resource string) *DualWriterMode1 {
return &DualWriterMode1{Legacy: legacy, Storage: storage, Log: klog.NewKlogr().WithName("DualWriterMode1").WithValues("mode", mode1Str, "kind", kind), dualWriterMetrics: dwm} return &DualWriterMode1{
Legacy: legacy,
Storage: storage,
Log: klog.NewKlogr().WithName("DualWriterMode1").WithValues("mode", mode1Str, "resource", resource),
dualWriterMetrics: dwm,
resource: resource,
}
} }
// Mode returns the mode of the dual writer. // Mode returns the mode of the dual writer.
@ -45,10 +51,10 @@ func (d *DualWriterMode1) Create(ctx context.Context, original runtime.Object, c
created, err := d.Legacy.Create(ctx, original, createValidation, options) created, err := d.Legacy.Create(ctx, original, createValidation, options)
if err != nil { if err != nil {
log.Error(err, "unable to create object in legacy storage") log.Error(err, "unable to create object in legacy storage")
d.recordLegacyDuration(true, mode1Str, d.kind, method, startLegacy) d.recordLegacyDuration(true, mode1Str, d.resource, method, startLegacy)
return created, err return created, err
} }
d.recordLegacyDuration(false, mode1Str, d.kind, method, startLegacy) d.recordLegacyDuration(false, mode1Str, d.resource, method, startLegacy)
createdCopy := created.DeepCopyObject() createdCopy := created.DeepCopyObject()
@ -62,7 +68,7 @@ func (d *DualWriterMode1) Create(ctx context.Context, original runtime.Object, c
startStorage := time.Now() startStorage := time.Now()
storageObj, errObjectSt := d.Storage.Create(ctx, createdCopy, createValidation, options) storageObj, errObjectSt := d.Storage.Create(ctx, createdCopy, createValidation, options)
d.recordStorageDuration(errObjectSt != nil, mode1Str, d.kind, method, startStorage) d.recordStorageDuration(errObjectSt != nil, mode1Str, d.resource, method, startStorage)
if err != nil { if err != nil {
cancel() cancel()
} }
@ -87,14 +93,14 @@ func (d *DualWriterMode1) Get(ctx context.Context, name string, options *metav1.
if errLegacy != nil { if errLegacy != nil {
log.Error(errLegacy, "unable to get object in legacy storage") log.Error(errLegacy, "unable to get object in legacy storage")
} }
d.recordLegacyDuration(errLegacy != nil, mode1Str, d.kind, method, startLegacy) d.recordLegacyDuration(errLegacy != nil, mode1Str, d.resource, method, startLegacy)
go func(res runtime.Object) { go func(res runtime.Object) {
startStorage := time.Now() startStorage := time.Now()
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage get timeout")) ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage get timeout"))
defer cancel() defer cancel()
storageObj, err := d.Storage.Get(ctx, name, options) storageObj, err := d.Storage.Get(ctx, name, options)
d.recordStorageDuration(err != nil, mode1Str, d.kind, method, startStorage) d.recordStorageDuration(err != nil, mode1Str, d.resource, method, startStorage)
if err != nil { if err != nil {
log.Error(err, "unable to get object in storage") log.Error(err, "unable to get object in storage")
cancel() cancel()
@ -121,14 +127,14 @@ func (d *DualWriterMode1) List(ctx context.Context, options *metainternalversion
if errLegacy != nil { if errLegacy != nil {
log.Error(errLegacy, "unable to list object in legacy storage") log.Error(errLegacy, "unable to list object in legacy storage")
} }
d.recordLegacyDuration(errLegacy != nil, mode1Str, d.kind, method, startLegacy) d.recordLegacyDuration(errLegacy != nil, mode1Str, d.resource, method, startLegacy)
go func(res runtime.Object) { go func(res runtime.Object) {
startStorage := time.Now() startStorage := time.Now()
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage list timeout")) ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage list timeout"))
defer cancel() defer cancel()
storageObj, err := d.Storage.List(ctx, options) storageObj, err := d.Storage.List(ctx, options)
d.recordStorageDuration(err != nil, mode1Str, d.kind, method, startStorage) d.recordStorageDuration(err != nil, mode1Str, d.resource, method, startStorage)
if err != nil { if err != nil {
cancel() cancel()
} }
@ -151,7 +157,7 @@ func (d *DualWriterMode1) Delete(ctx context.Context, name string, deleteValidat
res, async, err := d.Legacy.Delete(ctx, name, deleteValidation, options) res, async, err := d.Legacy.Delete(ctx, name, deleteValidation, options)
if err != nil { if err != nil {
log.Error(err, "unable to delete object in legacy storage") log.Error(err, "unable to delete object in legacy storage")
d.recordLegacyDuration(true, mode1Str, d.kind, method, startLegacy) d.recordLegacyDuration(true, mode1Str, d.resource, method, startLegacy)
return res, async, err return res, async, err
} }
d.recordLegacyDuration(false, mode1Str, name, method, startLegacy) d.recordLegacyDuration(false, mode1Str, name, method, startLegacy)
@ -161,7 +167,7 @@ func (d *DualWriterMode1) Delete(ctx context.Context, name string, deleteValidat
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage delete timeout")) ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage delete timeout"))
defer cancel() defer cancel()
storageObj, _, err := d.Storage.Delete(ctx, name, deleteValidation, options) storageObj, _, err := d.Storage.Delete(ctx, name, deleteValidation, options)
d.recordStorageDuration(err != nil, mode1Str, d.kind, method, startStorage) d.recordStorageDuration(err != nil, mode1Str, d.resource, method, startStorage)
if err != nil { if err != nil {
cancel() cancel()
} }
@ -185,17 +191,17 @@ func (d *DualWriterMode1) DeleteCollection(ctx context.Context, deleteValidation
res, err := d.Legacy.DeleteCollection(ctx, deleteValidation, options, listOptions) res, err := d.Legacy.DeleteCollection(ctx, deleteValidation, options, listOptions)
if err != nil { if err != nil {
log.Error(err, "unable to delete collection in legacy storage") log.Error(err, "unable to delete collection in legacy storage")
d.recordLegacyDuration(true, mode1Str, d.kind, method, startLegacy) d.recordLegacyDuration(true, mode1Str, d.resource, method, startLegacy)
return res, err return res, err
} }
d.recordLegacyDuration(false, mode1Str, d.kind, method, startLegacy) d.recordLegacyDuration(false, mode1Str, d.resource, method, startLegacy)
go func(res runtime.Object) { go func(res runtime.Object) {
startStorage := time.Now() startStorage := time.Now()
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage deletecollection timeout")) ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage deletecollection timeout"))
defer cancel() defer cancel()
storageObj, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions) storageObj, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions)
d.recordStorageDuration(err != nil, mode1Str, d.kind, method, startStorage) d.recordStorageDuration(err != nil, mode1Str, d.resource, method, startStorage)
if err != nil { if err != nil {
cancel() cancel()
} }
@ -218,10 +224,10 @@ func (d *DualWriterMode1) Update(ctx context.Context, name string, objInfo rest.
res, async, err := d.Legacy.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options) res, async, err := d.Legacy.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
if err != nil { if err != nil {
log.Error(err, "unable to update in legacy storage") log.Error(err, "unable to update in legacy storage")
d.recordLegacyDuration(true, mode1Str, d.kind, method, startLegacy) d.recordLegacyDuration(true, mode1Str, d.resource, method, startLegacy)
return res, async, err return res, async, err
} }
d.recordLegacyDuration(false, mode1Str, d.kind, method, startLegacy) d.recordLegacyDuration(false, mode1Str, d.resource, method, startLegacy)
go func(res runtime.Object) { go func(res runtime.Object) {
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage update timeout")) ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage update timeout"))
@ -257,7 +263,7 @@ func (d *DualWriterMode1) Update(ctx context.Context, name string, objInfo rest.
startStorage := time.Now() startStorage := time.Now()
defer cancel() defer cancel()
storageObj, _, errObjectSt := d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options) storageObj, _, errObjectSt := d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
d.recordStorageDuration(errObjectSt != nil, mode1Str, d.kind, method, startStorage) d.recordStorageDuration(errObjectSt != nil, mode1Str, d.resource, method, startStorage)
if err != nil { if err != nil {
cancel() cancel()
} }

View File

@ -26,7 +26,7 @@ type DualWriterMode2 struct {
Storage Storage Storage Storage
Legacy LegacyStorage Legacy LegacyStorage
*dualWriterMetrics *dualWriterMetrics
kind string resource string
Log klog.Logger Log klog.Logger
} }
@ -34,9 +34,13 @@ const mode2Str = "2"
// NewDualWriterMode2 returns a new DualWriter in mode 2. // NewDualWriterMode2 returns a new DualWriter in mode 2.
// Mode 2 represents writing to LegacyStorage and Storage and reading from LegacyStorage. // Mode 2 represents writing to LegacyStorage and Storage and reading from LegacyStorage.
func newDualWriterMode2(legacy LegacyStorage, storage Storage, dwm *dualWriterMetrics, kind string) *DualWriterMode2 { func newDualWriterMode2(legacy LegacyStorage, storage Storage, dwm *dualWriterMetrics, resource string) *DualWriterMode2 {
return &DualWriterMode2{ return &DualWriterMode2{
Legacy: legacy, Storage: storage, Log: klog.NewKlogr().WithName("DualWriterMode2").WithValues("mode", mode2Str, "kind", kind), dualWriterMetrics: dwm, Legacy: legacy,
Storage: storage,
Log: klog.NewKlogr().WithName("DualWriterMode2").WithValues("mode", mode2Str, "resource", resource),
dualWriterMetrics: dwm,
resource: resource,
} }
} }
@ -55,10 +59,10 @@ func (d *DualWriterMode2) Create(ctx context.Context, original runtime.Object, c
created, err := d.Legacy.Create(ctx, original, createValidation, options) created, err := d.Legacy.Create(ctx, original, createValidation, options)
if err != nil { if err != nil {
log.Error(err, "unable to create object in legacy storage") log.Error(err, "unable to create object in legacy storage")
d.recordLegacyDuration(true, mode2Str, d.kind, method, startLegacy) d.recordLegacyDuration(true, mode2Str, d.resource, method, startLegacy)
return created, err return created, err
} }
d.recordLegacyDuration(false, mode2Str, d.kind, method, startLegacy) d.recordLegacyDuration(false, mode2Str, d.resource, method, startLegacy)
if err := enrichLegacyObject(original, created); err != nil { if err := enrichLegacyObject(original, created); err != nil {
return created, err return created, err
@ -68,10 +72,10 @@ func (d *DualWriterMode2) Create(ctx context.Context, original runtime.Object, c
rsp, err := d.Storage.Create(ctx, created, createValidation, options) rsp, err := d.Storage.Create(ctx, created, createValidation, options)
if err != nil { if err != nil {
log.WithValues("name").Error(err, "unable to create object in storage") log.WithValues("name").Error(err, "unable to create object in storage")
d.recordStorageDuration(true, mode2Str, d.kind, method, startStorage) d.recordStorageDuration(true, mode2Str, d.resource, method, startStorage)
return rsp, err return rsp, err
} }
d.recordStorageDuration(false, mode2Str, d.kind, method, startStorage) d.recordStorageDuration(false, mode2Str, d.resource, method, startStorage)
areEqual := Compare(rsp, created) areEqual := Compare(rsp, created)
d.recordOutcome(mode2Str, getName(rsp), areEqual, method) d.recordOutcome(mode2Str, getName(rsp), areEqual, method)
@ -89,7 +93,7 @@ func (d *DualWriterMode2) Get(ctx context.Context, name string, options *metav1.
startStorage := time.Now() startStorage := time.Now()
objStorage, err := d.Storage.Get(ctx, name, options) objStorage, err := d.Storage.Get(ctx, name, options)
d.recordStorageDuration(err != nil, mode2Str, d.kind, method, startStorage) d.recordStorageDuration(err != nil, mode2Str, d.resource, method, startStorage)
if err != nil { if err != nil {
// if it errors because it's not found, we try to fetch it from the legacy storage // if it errors because it's not found, we try to fetch it from the legacy storage
if !apierrors.IsNotFound(err) { if !apierrors.IsNotFound(err) {
@ -103,10 +107,10 @@ func (d *DualWriterMode2) Get(ctx context.Context, name string, options *metav1.
objLegacy, err := d.Legacy.Get(ctx, name, options) objLegacy, err := d.Legacy.Get(ctx, name, options)
if err != nil { if err != nil {
log.Error(err, "unable to fetch object from legacy") log.Error(err, "unable to fetch object from legacy")
d.recordLegacyDuration(true, mode2Str, d.kind, method, startLegacy) d.recordLegacyDuration(true, mode2Str, d.resource, method, startLegacy)
return objLegacy, err return objLegacy, err
} }
d.recordLegacyDuration(false, mode2Str, d.kind, method, startLegacy) d.recordLegacyDuration(false, mode2Str, d.resource, method, startLegacy)
areEqual := Compare(objStorage, objLegacy) areEqual := Compare(objStorage, objLegacy)
d.recordOutcome(mode2Str, name, areEqual, method) d.recordOutcome(mode2Str, name, areEqual, method)
@ -132,10 +136,10 @@ func (d *DualWriterMode2) List(ctx context.Context, options *metainternalversion
ll, err := d.Legacy.List(ctx, options) ll, err := d.Legacy.List(ctx, options)
if err != nil { if err != nil {
log.Error(err, "unable to list objects from legacy storage") log.Error(err, "unable to list objects from legacy storage")
d.recordLegacyDuration(true, mode2Str, d.kind, method, startLegacy) d.recordLegacyDuration(true, mode2Str, d.resource, method, startLegacy)
return ll, err return ll, err
} }
d.recordLegacyDuration(false, mode2Str, d.kind, method, startLegacy) d.recordLegacyDuration(false, mode2Str, d.resource, method, startLegacy)
legacyList, err := meta.ExtractList(ll) legacyList, err := meta.ExtractList(ll)
if err != nil { if err != nil {
@ -154,10 +158,10 @@ func (d *DualWriterMode2) List(ctx context.Context, options *metainternalversion
sl, err := d.Storage.List(ctx, options) sl, err := d.Storage.List(ctx, options)
if err != nil { if err != nil {
log.Error(err, "unable to list objects from storage") log.Error(err, "unable to list objects from storage")
d.recordStorageDuration(true, mode2Str, d.kind, method, startStorage) d.recordStorageDuration(true, mode2Str, d.resource, method, startStorage)
return sl, err return sl, err
} }
d.recordStorageDuration(false, mode2Str, d.kind, method, startStorage) d.recordStorageDuration(false, mode2Str, d.resource, method, startStorage)
storageList, err := meta.ExtractList(sl) storageList, err := meta.ExtractList(sl)
if err != nil { if err != nil {
@ -195,10 +199,10 @@ func (d *DualWriterMode2) DeleteCollection(ctx context.Context, deleteValidation
deleted, err := d.Legacy.DeleteCollection(ctx, deleteValidation, options, listOptions) deleted, err := d.Legacy.DeleteCollection(ctx, deleteValidation, options, listOptions)
if err != nil { if err != nil {
log.WithValues("deleted", deleted).Error(err, "failed to delete collection successfully from legacy storage") log.WithValues("deleted", deleted).Error(err, "failed to delete collection successfully from legacy storage")
d.recordLegacyDuration(true, mode2Str, d.kind, method, startLegacy) d.recordLegacyDuration(true, mode2Str, d.resource, method, startLegacy)
return deleted, err return deleted, err
} }
d.recordLegacyDuration(false, mode2Str, d.kind, method, startLegacy) d.recordLegacyDuration(false, mode2Str, d.resource, method, startLegacy)
legacyList, err := meta.ExtractList(deleted) legacyList, err := meta.ExtractList(deleted)
if err != nil { if err != nil {
@ -216,10 +220,10 @@ func (d *DualWriterMode2) DeleteCollection(ctx context.Context, deleteValidation
res, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions) res, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions)
if err != nil { if err != nil {
log.WithValues("deleted", res).Error(err, "failed to delete collection successfully from Storage") log.WithValues("deleted", res).Error(err, "failed to delete collection successfully from Storage")
d.recordStorageDuration(true, mode2Str, d.kind, method, startStorage) d.recordStorageDuration(true, mode2Str, d.resource, method, startStorage)
return res, err return res, err
} }
d.recordStorageDuration(false, mode2Str, d.kind, method, startStorage) d.recordStorageDuration(false, mode2Str, d.resource, method, startStorage)
areEqual := Compare(res, deleted) areEqual := Compare(res, deleted)
d.recordOutcome(mode2Str, getName(res), areEqual, method) d.recordOutcome(mode2Str, getName(res), areEqual, method)
@ -241,22 +245,22 @@ func (d *DualWriterMode2) Delete(ctx context.Context, name string, deleteValidat
if err != nil { if err != nil {
if !apierrors.IsNotFound(err) { if !apierrors.IsNotFound(err) {
log.WithValues("objectList", deletedLS).Error(err, "could not delete from legacy store") log.WithValues("objectList", deletedLS).Error(err, "could not delete from legacy store")
d.recordLegacyDuration(true, mode2Str, d.kind, method, startLegacy) d.recordLegacyDuration(true, mode2Str, d.resource, method, startLegacy)
return deletedLS, async, err return deletedLS, async, err
} }
} }
d.recordLegacyDuration(false, mode2Str, d.kind, method, startLegacy) d.recordLegacyDuration(false, mode2Str, d.resource, method, startLegacy)
startStorage := time.Now() startStorage := time.Now()
deletedS, _, err := d.Storage.Delete(ctx, name, deleteValidation, options) deletedS, _, err := d.Storage.Delete(ctx, name, deleteValidation, options)
if err != nil { if err != nil {
if !apierrors.IsNotFound(err) { if !apierrors.IsNotFound(err) {
log.WithValues("objectList", deletedS).Error(err, "could not delete from duplicate storage") log.WithValues("objectList", deletedS).Error(err, "could not delete from duplicate storage")
d.recordStorageDuration(true, mode2Str, d.kind, method, startStorage) d.recordStorageDuration(true, mode2Str, d.resource, method, startStorage)
} }
return deletedS, async, err return deletedS, async, err
} }
d.recordStorageDuration(false, mode2Str, d.kind, method, startStorage) d.recordStorageDuration(false, mode2Str, d.resource, method, startStorage)
areEqual := Compare(deletedS, deletedLS) areEqual := Compare(deletedS, deletedLS)
d.recordOutcome(mode2Str, name, areEqual, method) d.recordOutcome(mode2Str, name, areEqual, method)
@ -294,10 +298,10 @@ func (d *DualWriterMode2) Update(ctx context.Context, name string, objInfo rest.
obj, created, err := d.Legacy.Update(ctx, name, &updateWrapper{upstream: objInfo, updated: updated}, createValidation, updateValidation, forceAllowCreate, options) obj, created, err := d.Legacy.Update(ctx, name, &updateWrapper{upstream: objInfo, updated: updated}, createValidation, updateValidation, forceAllowCreate, options)
if err != nil { if err != nil {
log.WithValues("object", obj).Error(err, "could not update in legacy storage") log.WithValues("object", obj).Error(err, "could not update in legacy storage")
d.recordLegacyDuration(true, mode2Str, d.kind, "update", startLegacy) d.recordLegacyDuration(true, mode2Str, d.resource, "update", startLegacy)
return obj, created, err return obj, created, err
} }
d.recordLegacyDuration(false, mode2Str, d.kind, "update", startLegacy) d.recordLegacyDuration(false, mode2Str, d.resource, "update", startLegacy)
// if the object is found, create a new updateWrapper with the object found // if the object is found, create a new updateWrapper with the object found
if foundObj != nil { if foundObj != nil {
@ -316,7 +320,7 @@ func (d *DualWriterMode2) Update(ctx context.Context, name string, objInfo rest.
res, created, err := d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options) res, created, err := d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
if err != nil { if err != nil {
log.WithValues("object", res).Error(err, "could not update in storage") log.WithValues("object", res).Error(err, "could not update in storage")
d.recordStorageDuration(true, mode2Str, d.kind, "update", startStorage) d.recordStorageDuration(true, mode2Str, d.resource, "update", startStorage)
return res, created, err return res, created, err
} }
@ -430,7 +434,7 @@ func getList(ctx context.Context, obj rest.Lister, listOptions *metainternalvers
return meta.ExtractList(ll) return meta.ExtractList(ll)
} }
func mode2DataSyncer(ctx context.Context, legacy LegacyStorage, storage Storage, kind string, reg prometheus.Registerer, serverLockService ServerLockService, requestInfo *request.RequestInfo) (bool, error) { func mode2DataSyncer(ctx context.Context, legacy LegacyStorage, storage Storage, resource string, reg prometheus.Registerer, serverLockService ServerLockService, requestInfo *request.RequestInfo) (bool, error) {
metrics := &dualWriterMetrics{} metrics := &dualWriterMetrics{}
metrics.init(reg) metrics.init(reg)
@ -595,8 +599,8 @@ func mode2DataSyncer(ctx context.Context, legacy LegacyStorage, storage Storage,
everythingSynced = outOfSync == syncSuccess everythingSynced = outOfSync == syncSuccess
metrics.recordDataSyncerOutcome(mode2Str, kind, everythingSynced) metrics.recordDataSyncerOutcome(mode2Str, resource, everythingSynced)
metrics.recordDataSyncerDuration(err != nil, mode2Str, kind, startSync) metrics.recordDataSyncerDuration(err != nil, mode2Str, resource, startSync)
log.Info("finished syncing items", "items", len(itemsByName), "updated", syncSuccess, "failed", syncErr, "outcome", everythingSynced) log.Info("finished syncing items", "items", len(itemsByName), "updated", syncSuccess, "failed", syncErr, "outcome", everythingSynced)
}) })

View File

@ -18,14 +18,20 @@ type DualWriterMode3 struct {
Storage Storage Storage Storage
watchImp rest.Watcher // watch is only available in mode 3 and 4 watchImp rest.Watcher // watch is only available in mode 3 and 4
*dualWriterMetrics *dualWriterMetrics
kind string resource string
Log klog.Logger Log klog.Logger
} }
// newDualWriterMode3 returns a new DualWriter in mode 3. // newDualWriterMode3 returns a new DualWriter in mode 3.
// Mode 3 represents writing to LegacyStorage and Storage and reading from Storage. // Mode 3 represents writing to LegacyStorage and Storage and reading from Storage.
func newDualWriterMode3(legacy LegacyStorage, storage Storage, dwm *dualWriterMetrics, kind string) *DualWriterMode3 { func newDualWriterMode3(legacy LegacyStorage, storage Storage, dwm *dualWriterMetrics, resource string) *DualWriterMode3 {
return &DualWriterMode3{Legacy: legacy, Storage: storage, Log: klog.NewKlogr().WithName("DualWriterMode3").WithValues("mode", mode3Str, "kind", kind), dualWriterMetrics: dwm} return &DualWriterMode3{
Legacy: legacy,
Storage: storage,
Log: klog.NewKlogr().WithName("DualWriterMode3").WithValues("mode", mode3Str, "resource", resource),
dualWriterMetrics: dwm,
resource: resource,
}
} }
// Mode returns the mode of the dual writer. // Mode returns the mode of the dual writer.
@ -45,10 +51,10 @@ func (d *DualWriterMode3) Create(ctx context.Context, obj runtime.Object, create
created, err := d.Storage.Create(ctx, obj, createValidation, options) created, err := d.Storage.Create(ctx, obj, createValidation, options)
if err != nil { if err != nil {
log.Error(err, "unable to create object in storage") log.Error(err, "unable to create object in storage")
d.recordLegacyDuration(true, mode3Str, d.kind, method, startStorage) d.recordLegacyDuration(true, mode3Str, d.resource, method, startStorage)
return created, err return created, err
} }
d.recordStorageDuration(false, mode3Str, d.kind, method, startStorage) d.recordStorageDuration(false, mode3Str, d.resource, method, startStorage)
go func() { go func() {
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("legacy create timeout")) ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("legacy create timeout"))
@ -56,7 +62,7 @@ func (d *DualWriterMode3) Create(ctx context.Context, obj runtime.Object, create
startLegacy := time.Now() startLegacy := time.Now()
_, errObjectSt := d.Legacy.Create(ctx, obj, createValidation, options) _, errObjectSt := d.Legacy.Create(ctx, obj, createValidation, options)
d.recordLegacyDuration(errObjectSt != nil, mode3Str, d.kind, method, startLegacy) d.recordLegacyDuration(errObjectSt != nil, mode3Str, d.resource, method, startLegacy)
}() }()
return created, err return created, err
@ -73,7 +79,7 @@ func (d *DualWriterMode3) Get(ctx context.Context, name string, options *metav1.
if err != nil { if err != nil {
log.Error(err, "unable to get object in storage") log.Error(err, "unable to get object in storage")
} }
d.recordStorageDuration(err != nil, mode3Str, d.kind, method, startStorage) d.recordStorageDuration(err != nil, mode3Str, d.resource, method, startStorage)
return res, err return res, err
} }
@ -89,7 +95,7 @@ func (d *DualWriterMode3) List(ctx context.Context, options *metainternalversion
if err != nil { if err != nil {
log.Error(err, "unable to list object in storage") log.Error(err, "unable to list object in storage")
} }
d.recordStorageDuration(err != nil, mode3Str, d.kind, method, startStorage) d.recordStorageDuration(err != nil, mode3Str, d.resource, method, startStorage)
return res, err return res, err
} }
@ -103,7 +109,7 @@ func (d *DualWriterMode3) Delete(ctx context.Context, name string, deleteValidat
res, async, err := d.Storage.Delete(ctx, name, deleteValidation, options) res, async, err := d.Storage.Delete(ctx, name, deleteValidation, options)
if err != nil { if err != nil {
log.Error(err, "unable to delete object in storage") log.Error(err, "unable to delete object in storage")
d.recordStorageDuration(true, mode3Str, d.kind, method, startStorage) d.recordStorageDuration(true, mode3Str, d.resource, method, startStorage)
return res, async, err return res, async, err
} }
d.recordStorageDuration(false, mode3Str, name, method, startStorage) d.recordStorageDuration(false, mode3Str, name, method, startStorage)
@ -113,7 +119,7 @@ func (d *DualWriterMode3) Delete(ctx context.Context, name string, deleteValidat
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("legacy delete timeout")) ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("legacy delete timeout"))
defer cancel() defer cancel()
_, _, err := d.Legacy.Delete(ctx, name, deleteValidation, options) _, _, err := d.Legacy.Delete(ctx, name, deleteValidation, options)
d.recordLegacyDuration(err != nil, mode3Str, d.kind, method, startLegacy) d.recordLegacyDuration(err != nil, mode3Str, d.resource, method, startLegacy)
}() }()
return res, async, err return res, async, err
@ -129,10 +135,10 @@ func (d *DualWriterMode3) Update(ctx context.Context, name string, objInfo rest.
res, async, err := d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options) res, async, err := d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
if err != nil { if err != nil {
log.Error(err, "unable to update in storage") log.Error(err, "unable to update in storage")
d.recordLegacyDuration(true, mode3Str, d.kind, method, startStorage) d.recordLegacyDuration(true, mode3Str, d.resource, method, startStorage)
return res, async, err return res, async, err
} }
d.recordStorageDuration(false, mode3Str, d.kind, method, startStorage) d.recordStorageDuration(false, mode3Str, d.resource, method, startStorage)
go func() { go func() {
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("legacy update timeout")) ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("legacy update timeout"))
@ -140,7 +146,7 @@ func (d *DualWriterMode3) Update(ctx context.Context, name string, objInfo rest.
startLegacy := time.Now() startLegacy := time.Now()
defer cancel() defer cancel()
_, _, errObjectSt := d.Legacy.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options) _, _, errObjectSt := d.Legacy.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
d.recordLegacyDuration(errObjectSt != nil, mode3Str, d.kind, method, startLegacy) d.recordLegacyDuration(errObjectSt != nil, mode3Str, d.resource, method, startLegacy)
}() }()
return res, async, err return res, async, err
@ -156,17 +162,17 @@ func (d *DualWriterMode3) DeleteCollection(ctx context.Context, deleteValidation
res, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions) res, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions)
if err != nil { if err != nil {
log.Error(err, "unable to delete collection in storage") log.Error(err, "unable to delete collection in storage")
d.recordStorageDuration(true, mode3Str, d.kind, method, startStorage) d.recordStorageDuration(true, mode3Str, d.resource, method, startStorage)
return res, err return res, err
} }
d.recordStorageDuration(false, mode3Str, d.kind, method, startStorage) d.recordStorageDuration(false, mode3Str, d.resource, method, startStorage)
go func() { go func() {
startLegacy := time.Now() startLegacy := time.Now()
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("legacy deletecollection timeout")) ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("legacy deletecollection timeout"))
defer cancel() defer cancel()
_, err := d.Legacy.DeleteCollection(ctx, deleteValidation, options, listOptions) _, err := d.Legacy.DeleteCollection(ctx, deleteValidation, options, listOptions)
d.recordStorageDuration(err != nil, mode3Str, d.kind, method, startLegacy) d.recordStorageDuration(err != nil, mode3Str, d.resource, method, startLegacy)
}() }()
return res, err return res, err

View File

@ -17,7 +17,7 @@ type DualWriterMode4 struct {
Storage Storage Storage Storage
watchImp rest.Watcher // watch is only available in mode 3 and 4 watchImp rest.Watcher // watch is only available in mode 3 and 4
*dualWriterMetrics *dualWriterMetrics
kind string resource string
Log klog.Logger Log klog.Logger
} }
@ -25,8 +25,14 @@ const mode4Str = "4"
// newDualWriterMode4 returns a new DualWriter in mode 4. // newDualWriterMode4 returns a new DualWriter in mode 4.
// Mode 4 represents writing and reading from Storage. // Mode 4 represents writing and reading from Storage.
func newDualWriterMode4(legacy LegacyStorage, storage Storage, dwm *dualWriterMetrics, kind string) *DualWriterMode4 { func newDualWriterMode4(legacy LegacyStorage, storage Storage, dwm *dualWriterMetrics, resource string) *DualWriterMode4 {
return &DualWriterMode4{Legacy: legacy, Storage: storage, Log: klog.NewKlogr().WithName("DualWriterMode4").WithValues("mode", mode4Str, "kind", kind), dualWriterMetrics: dwm} return &DualWriterMode4{
Legacy: legacy,
Storage: storage,
Log: klog.NewKlogr().WithName("DualWriterMode4").WithValues("mode", mode4Str, "resource", resource),
dualWriterMetrics: dwm,
resource: resource,
}
} }
// Mode returns the mode of the dual writer. // Mode returns the mode of the dual writer.
@ -47,7 +53,7 @@ func (d *DualWriterMode4) Create(ctx context.Context, obj runtime.Object, create
if err != nil { if err != nil {
log.Error(err, "unable to create object in storage") log.Error(err, "unable to create object in storage")
} }
d.recordStorageDuration(err != nil, mode4Str, d.kind, method, startStorage) d.recordStorageDuration(err != nil, mode4Str, d.resource, method, startStorage)
return res, err return res, err
} }
@ -62,7 +68,7 @@ func (d *DualWriterMode4) Get(ctx context.Context, name string, options *metav1.
if err != nil { if err != nil {
log.Error(err, "unable to create object in storage") log.Error(err, "unable to create object in storage")
} }
d.recordStorageDuration(err != nil, mode4Str, d.kind, method, startStorage) d.recordStorageDuration(err != nil, mode4Str, d.resource, method, startStorage)
return res, err return res, err
} }
@ -76,7 +82,7 @@ func (d *DualWriterMode4) Delete(ctx context.Context, name string, deleteValidat
if err != nil { if err != nil {
log.Error(err, "unable to delete object in storage") log.Error(err, "unable to delete object in storage")
} }
d.recordStorageDuration(err != nil, mode4Str, d.kind, method, startStorage) d.recordStorageDuration(err != nil, mode4Str, d.resource, method, startStorage)
return res, async, err return res, async, err
} }
@ -91,14 +97,14 @@ func (d *DualWriterMode4) DeleteCollection(ctx context.Context, deleteValidation
if err != nil { if err != nil {
log.Error(err, "unable to delete collection in storage") log.Error(err, "unable to delete collection in storage")
} }
d.recordStorageDuration(err != nil, mode4Str, d.kind, method, startStorage) d.recordStorageDuration(err != nil, mode4Str, d.resource, method, startStorage)
return res, err return res, err
} }
// Update overrides the generic behavior of the Storage and writes only to US. // Update overrides the generic behavior of the Storage and writes only to US.
func (d *DualWriterMode4) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) { func (d *DualWriterMode4) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
var method = "update" var method = "update"
log := d.Log.WithValues("name", name, "kind", d.kind, "method", method) log := d.Log.WithValues("name", name, "resource", d.resource, "method", method)
ctx = klog.NewContext(ctx, log) ctx = klog.NewContext(ctx, log)
startStorage := time.Now() startStorage := time.Now()
@ -106,7 +112,7 @@ func (d *DualWriterMode4) Update(ctx context.Context, name string, objInfo rest.
if err != nil { if err != nil {
log.Error(err, "unable to update object in storage") log.Error(err, "unable to update object in storage")
} }
d.recordStorageDuration(err != nil, mode4Str, d.kind, method, startStorage) d.recordStorageDuration(err != nil, mode4Str, d.resource, method, startStorage)
return res, async, err return res, async, err
} }
@ -120,7 +126,7 @@ func (d *DualWriterMode4) List(ctx context.Context, options *metainternalversion
if err != nil { if err != nil {
log.Error(err, "unable to list objects in storage") log.Error(err, "unable to list objects in storage")
} }
d.recordStorageDuration(err != nil, mode4Str, d.kind, method, startStorage) d.recordStorageDuration(err != nil, mode4Str, d.resource, method, startStorage)
return res, err return res, err
} }

View File

@ -22,7 +22,7 @@ var DualWriterStorageDuration = prometheus.NewHistogramVec(prometheus.HistogramO
Help: "Histogram for the runtime of dual writer storage duration per mode", Help: "Histogram for the runtime of dual writer storage duration per mode",
Namespace: "grafana", Namespace: "grafana",
NativeHistogramBucketFactor: 1.1, NativeHistogramBucketFactor: 1.1,
}, []string{"is_error", "mode", "kind", "method"}) }, []string{"is_error", "mode", "resource", "method"})
// DualWriterLegacyDuration is a metric summary for dual writer legacy duration per mode // DualWriterLegacyDuration is a metric summary for dual writer legacy duration per mode
var DualWriterLegacyDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ var DualWriterLegacyDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
@ -30,7 +30,7 @@ var DualWriterLegacyDuration = prometheus.NewHistogramVec(prometheus.HistogramOp
Help: "Histogram for the runtime of dual writer legacy duration per mode", Help: "Histogram for the runtime of dual writer legacy duration per mode",
Namespace: "grafana", Namespace: "grafana",
NativeHistogramBucketFactor: 1.1, NativeHistogramBucketFactor: 1.1,
}, []string{"is_error", "mode", "kind", "method"}) }, []string{"is_error", "mode", "resource", "method"})
// DualWriterOutcome is a metric summary for dual writer outcome comparison between the 2 stores per mode // DualWriterOutcome is a metric summary for dual writer outcome comparison between the 2 stores per mode
var DualWriterOutcome = prometheus.NewHistogramVec(prometheus.HistogramOpts{ var DualWriterOutcome = prometheus.NewHistogramVec(prometheus.HistogramOpts{
@ -44,7 +44,7 @@ var DualWriterReadLegacyCounts = prometheus.NewCounterVec(prometheus.CounterOpts
Name: "dual_writer_read_legacy_count", Name: "dual_writer_read_legacy_count",
Help: "Histogram for the runtime of dual writer reads from legacy", Help: "Histogram for the runtime of dual writer reads from legacy",
Namespace: "grafana", Namespace: "grafana",
}, []string{"kind", "method"}) }, []string{"resource", "method"})
// DualWriterSyncerDuration is a metric summary for dual writer sync duration per mode // DualWriterSyncerDuration is a metric summary for dual writer sync duration per mode
var DualWriterSyncerDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ var DualWriterSyncerDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
@ -52,7 +52,7 @@ var DualWriterSyncerDuration = prometheus.NewHistogramVec(prometheus.HistogramOp
Help: "Histogram for the runtime of dual writer data syncer duration per mode", Help: "Histogram for the runtime of dual writer data syncer duration per mode",
Namespace: "grafana", Namespace: "grafana",
NativeHistogramBucketFactor: 1.1, NativeHistogramBucketFactor: 1.1,
}, []string{"is_error", "mode", "kind"}) }, []string{"is_error", "mode", "resource"})
// DualWriterDataSyncerOutcome is a metric summary for dual writer data syncer outcome comparison between the 2 stores per mode // DualWriterDataSyncerOutcome is a metric summary for dual writer data syncer outcome comparison between the 2 stores per mode
var DualWriterDataSyncerOutcome = prometheus.NewHistogramVec(prometheus.HistogramOpts{ var DualWriterDataSyncerOutcome = prometheus.NewHistogramVec(prometheus.HistogramOpts{
@ -60,7 +60,7 @@ var DualWriterDataSyncerOutcome = prometheus.NewHistogramVec(prometheus.Histogra
Help: "Histogram for the runtime of dual writer data syncer outcome comparison between the 2 stores per mode", Help: "Histogram for the runtime of dual writer data syncer outcome comparison between the 2 stores per mode",
Namespace: "grafana", Namespace: "grafana",
NativeHistogramBucketFactor: 1.1, NativeHistogramBucketFactor: 1.1,
}, []string{"mode", "kind"}) }, []string{"mode", "resource"})
func (m *dualWriterMetrics) init(reg prometheus.Registerer) { func (m *dualWriterMetrics) init(reg prometheus.Registerer) {
log := klog.NewKlogr() log := klog.NewKlogr()
@ -79,14 +79,14 @@ func (m *dualWriterMetrics) init(reg prometheus.Registerer) {
} }
} }
func (m *dualWriterMetrics) recordLegacyDuration(isError bool, mode string, kind string, method string, startFrom time.Time) { func (m *dualWriterMetrics) recordLegacyDuration(isError bool, mode string, resource string, method string, startFrom time.Time) {
duration := time.Since(startFrom).Seconds() duration := time.Since(startFrom).Seconds()
m.legacy.WithLabelValues(strconv.FormatBool(isError), mode, kind, method).Observe(duration) m.legacy.WithLabelValues(strconv.FormatBool(isError), mode, resource, method).Observe(duration)
} }
func (m *dualWriterMetrics) recordStorageDuration(isError bool, mode string, kind string, method string, startFrom time.Time) { func (m *dualWriterMetrics) recordStorageDuration(isError bool, mode string, resource string, method string, startFrom time.Time) {
duration := time.Since(startFrom).Seconds() duration := time.Since(startFrom).Seconds()
m.storage.WithLabelValues(strconv.FormatBool(isError), mode, kind, method).Observe(duration) m.storage.WithLabelValues(strconv.FormatBool(isError), mode, resource, method).Observe(duration)
} }
func (m *dualWriterMetrics) recordOutcome(mode string, name string, areEqual bool, method string) { func (m *dualWriterMetrics) recordOutcome(mode string, name string, areEqual bool, method string) {
@ -97,15 +97,15 @@ func (m *dualWriterMetrics) recordOutcome(mode string, name string, areEqual boo
m.outcome.WithLabelValues(mode, name, method).Observe(observeValue) m.outcome.WithLabelValues(mode, name, method).Observe(observeValue)
} }
func (m *dualWriterMetrics) recordDataSyncerDuration(isError bool, mode string, kind string, startFrom time.Time) { func (m *dualWriterMetrics) recordDataSyncerDuration(isError bool, mode string, resource string, startFrom time.Time) {
duration := time.Since(startFrom).Seconds() duration := time.Since(startFrom).Seconds()
m.syncer.WithLabelValues(strconv.FormatBool(isError), mode, kind).Observe(duration) m.syncer.WithLabelValues(strconv.FormatBool(isError), mode, resource).Observe(duration)
} }
func (m *dualWriterMetrics) recordDataSyncerOutcome(mode string, kind string, synced bool) { func (m *dualWriterMetrics) recordDataSyncerOutcome(mode string, resource string, synced bool) {
var observeValue float64 var observeValue float64
if !synced { if !synced {
observeValue = 1 observeValue = 1
} }
m.syncerOutcome.WithLabelValues(mode, kind).Observe(observeValue) m.syncerOutcome.WithLabelValues(mode, resource).Observe(observeValue)
} }