Compare results when reading/writing between unified_storage and legacy (#89773)

* Compare results when reading/writing between unified_storage and legacy

* Always use name when comparing objects

* Compare on get method

* Update pkg/apiserver/rest/dualwriter.go

Co-authored-by: Dan Cech <dcech@grafana.com>

* Add new metric to count how many times we read from legacy in mode 2

* Move counter

* Add name in mode1

---------

Co-authored-by: Dan Cech <dcech@grafana.com>
This commit is contained in:
Leonor Oliveira
2024-07-05 13:01:05 +01:00
committed by GitHub
parent 1b3597d795
commit ec6a939815
4 changed files with 197 additions and 68 deletions

View File

@@ -8,6 +8,7 @@ import (
"fmt"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/registry/rest"
@@ -216,6 +217,9 @@ var defaultConverter = runtime.UnstructuredConverter(runtime.DefaultUnstructured
// Compare asserts on the equality of objects returned from both stores (object storage and legacy storage)
func Compare(storageObj, legacyObj runtime.Object) bool {
if storageObj == nil || legacyObj == nil {
return storageObj == nil && legacyObj == nil
}
return bytes.Equal(removeMeta(storageObj), removeMeta(legacyObj))
}
@@ -226,10 +230,23 @@ func removeMeta(obj runtime.Object) []byte {
return nil
}
// we don't want to compare meta fields
delete(unstObj, "meta")
delete(unstObj, "metadata")
jsonObj, err := json.Marshal(cpy)
if err != nil {
return nil
}
return jsonObj
}
func getName(o runtime.Object) string {
if o == nil {
return ""
}
accessor, err := meta.Accessor(o)
if err != nil {
klog.Error("failed to get object name: ", err)
return ""
}
return accessor.GetName()
}

View File

@@ -35,9 +35,9 @@ func (d *DualWriterMode1) Mode() DualWriterMode {
// Create overrides the behavior of the generic DualWriter and writes only to LegacyStorage.
func (d *DualWriterMode1) Create(ctx context.Context, original runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
log := d.Log.WithValues("kind", options.Kind)
ctx = klog.NewContext(ctx, log)
var method = "create"
log := d.Log.WithValues("kind", options.Kind, "method", method)
ctx = klog.NewContext(ctx, log)
startLegacy := time.Now()
created, err := d.Legacy.Create(ctx, original, createValidation, options)
@@ -50,7 +50,7 @@ func (d *DualWriterMode1) Create(ctx context.Context, original runtime.Object, c
createdCopy := created.DeepCopyObject()
go func() {
go func(createdCopy runtime.Object) {
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage create timeout"))
defer cancel()
@@ -59,18 +59,26 @@ func (d *DualWriterMode1) Create(ctx context.Context, original runtime.Object, c
}
startStorage := time.Now()
_, errObjectSt := d.Storage.Create(ctx, createdCopy, createValidation, options)
storageObj, errObjectSt := d.Storage.Create(ctx, createdCopy, createValidation, options)
d.recordStorageDuration(errObjectSt != nil, mode1Str, options.Kind, method, startStorage)
}()
if err != nil {
cancel()
}
areEqual := Compare(storageObj, createdCopy)
d.recordOutcome(mode1Str, getName(createdCopy), areEqual, method)
if !areEqual {
log.Info("object from legacy and storage are not equal")
}
}(createdCopy)
return created, err
}
// Get overrides the behavior of the generic DualWriter and reads only from LegacyStorage.
func (d *DualWriterMode1) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
log := d.Log.WithValues("kind", options.Kind)
ctx = klog.NewContext(ctx, log)
var method = "get"
log := d.Log.WithValues("kind", options.Kind, "method", method, "name", name)
ctx = klog.NewContext(ctx, log)
startLegacy := time.Now()
res, errLegacy := d.Legacy.Get(ctx, name, options)
@@ -79,22 +87,32 @@ func (d *DualWriterMode1) Get(ctx context.Context, name string, options *metav1.
}
d.recordLegacyDuration(errLegacy != nil, mode1Str, options.Kind, method, startLegacy)
go func() {
go func(res runtime.Object) {
startStorage := time.Now()
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage get timeout"))
defer cancel()
_, err := d.Storage.Get(ctx, name, options)
storageObj, err := d.Storage.Get(ctx, name, options)
d.recordStorageDuration(err != nil, mode1Str, options.Kind, method, startStorage)
}()
if err != nil {
log.Error(err, "unable to get object in storage")
cancel()
}
areEqual := Compare(storageObj, res)
d.recordOutcome(mode1Str, name, areEqual, method)
if !areEqual {
log.WithValues("name", name).Info("object from legacy and storage are not equal")
}
}(res)
return res, errLegacy
}
// List overrides the behavior of the generic DualWriter and reads only from LegacyStorage.
func (d *DualWriterMode1) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
log := d.Log.WithValues("kind", options.Kind, "resourceVersion", options.ResourceVersion, "kind", options.Kind)
ctx = klog.NewContext(ctx, log)
var method = "list"
log := d.Log.WithValues("kind", options.Kind, "resourceVersion", options.ResourceVersion, "kind", options.Kind, "method", method)
ctx = klog.NewContext(ctx, log)
startLegacy := time.Now()
res, errLegacy := d.Legacy.List(ctx, options)
@@ -103,21 +121,29 @@ func (d *DualWriterMode1) List(ctx context.Context, options *metainternalversion
}
d.recordLegacyDuration(errLegacy != nil, mode1Str, options.Kind, method, startLegacy)
go func() {
go func(res runtime.Object) {
startStorage := time.Now()
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage list timeout"))
defer cancel()
_, err := d.Storage.List(ctx, options)
storageObj, err := d.Storage.List(ctx, options)
d.recordStorageDuration(err != nil, mode1Str, options.Kind, method, startStorage)
}()
if err != nil {
cancel()
}
areEqual := Compare(storageObj, res)
d.recordOutcome(mode1Str, getName(res), areEqual, method)
if !areEqual {
log.Info("object from legacy and storage are not equal")
}
}(res)
return res, errLegacy
}
func (d *DualWriterMode1) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
log := d.Log.WithValues("name", name, "kind", options.Kind)
ctx = klog.NewContext(ctx, d.Log)
var method = "delete"
log := d.Log.WithValues("name", name, "kind", options.Kind, "method", method, "name", name)
ctx = klog.NewContext(ctx, d.Log)
startLegacy := time.Now()
res, async, err := d.Legacy.Delete(ctx, name, deleteValidation, options)
@@ -128,22 +154,30 @@ func (d *DualWriterMode1) Delete(ctx context.Context, name string, deleteValidat
}
d.recordLegacyDuration(false, mode1Str, name, method, startLegacy)
go func() {
go func(res runtime.Object) {
startStorage := time.Now()
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage delete timeout"))
defer cancel()
_, _, err := d.Storage.Delete(ctx, name, deleteValidation, options)
storageObj, _, err := d.Storage.Delete(ctx, name, deleteValidation, options)
d.recordStorageDuration(err != nil, mode1Str, options.Kind, method, startStorage)
}()
if err != nil {
cancel()
}
areEqual := Compare(storageObj, res)
d.recordOutcome(mode1Str, name, areEqual, method)
if !areEqual {
log.Info("object from legacy and storage are not equal")
}
}(res)
return res, async, err
}
// DeleteCollection overrides the behavior of the generic DualWriter and deletes only from LegacyStorage.
func (d *DualWriterMode1) DeleteCollection(ctx context.Context, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) (runtime.Object, error) {
log := d.Log.WithValues("kind", options.Kind, "resourceVersion", listOptions.ResourceVersion)
ctx = klog.NewContext(ctx, log)
var method = "delete-collection"
log := d.Log.WithValues("kind", options.Kind, "resourceVersion", listOptions.ResourceVersion, "method", method)
ctx = klog.NewContext(ctx, log)
startLegacy := time.Now()
res, err := d.Legacy.DeleteCollection(ctx, deleteValidation, options, listOptions)
@@ -154,21 +188,29 @@ func (d *DualWriterMode1) DeleteCollection(ctx context.Context, deleteValidation
}
d.recordLegacyDuration(false, mode1Str, options.Kind, method, startLegacy)
go func() {
go func(res runtime.Object) {
startStorage := time.Now()
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage deletecollection timeout"))
defer cancel()
_, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions)
storageObj, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions)
d.recordStorageDuration(err != nil, mode1Str, options.Kind, method, startStorage)
}()
if err != nil {
cancel()
}
areEqual := Compare(storageObj, res)
d.recordOutcome(mode1Str, getName(res), areEqual, method)
if !areEqual {
log.Info("object from legacy and storage are not equal")
}
}(res)
return res, err
}
func (d *DualWriterMode1) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
log := d.Log.WithValues("name", name, "kind", options.Kind)
ctx = klog.NewContext(ctx, log)
var method = "update"
log := d.Log.WithValues("name", name, "kind", options.Kind, "method", method, "name", name)
ctx = klog.NewContext(ctx, log)
startLegacy := time.Now()
res, async, err := d.Legacy.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
@@ -179,7 +221,7 @@ func (d *DualWriterMode1) Update(ctx context.Context, name string, objInfo rest.
}
d.recordLegacyDuration(false, mode1Str, options.Kind, method, startLegacy)
go func() {
go func(res runtime.Object) {
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage update timeout"))
resCopy := res.DeepCopyObject()
@@ -212,9 +254,17 @@ func (d *DualWriterMode1) Update(ctx context.Context, name string, objInfo rest.
}
startStorage := time.Now()
defer cancel()
_, _, errObjectSt := d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
storageObj, _, errObjectSt := d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
d.recordStorageDuration(errObjectSt != nil, mode1Str, options.Kind, method, startStorage)
}()
if err != nil {
cancel()
}
areEqual := Compare(storageObj, res)
d.recordOutcome(mode1Str, name, areEqual, method)
if !areEqual {
log.WithValues("name", name).Info("object from legacy and storage are not equal")
}
}(res)
return res, async, err
}

View File

@@ -37,9 +37,9 @@ func (d *DualWriterMode2) Mode() DualWriterMode {
// Create overrides the behavior of the generic DualWriter and writes to LegacyStorage and Storage.
func (d *DualWriterMode2) Create(ctx context.Context, original runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
log := d.Log.WithValues("kind", options.Kind)
ctx = klog.NewContext(ctx, log)
var method = "create"
log := d.Log.WithValues("kind", options.Kind, "method", method)
ctx = klog.NewContext(ctx, log)
startLegacy := time.Now()
created, err := d.Legacy.Create(ctx, original, createValidation, options)
@@ -59,47 +59,65 @@ func (d *DualWriterMode2) Create(ctx context.Context, original runtime.Object, c
if err != nil {
log.WithValues("name").Error(err, "unable to create object in storage")
d.recordStorageDuration(true, mode2Str, options.Kind, method, startStorage)
return rsp, err
}
d.recordStorageDuration(false, mode2Str, options.Kind, method, startStorage)
areEqual := Compare(rsp, created)
d.recordOutcome(mode2Str, getName(rsp), areEqual, method)
if !areEqual {
log.Info("object from legacy and storage are not equal")
}
return rsp, err
}
// It retrieves an object from Storage if possible, and if not it falls back to LegacyStorage.
func (d *DualWriterMode2) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
log := d.Log.WithValues("name", name, "resourceVersion", options.ResourceVersion, "kind", options.Kind)
ctx = klog.NewContext(ctx, log)
var method = "get"
log := d.Log.WithValues("name", name, "resourceVersion", options.ResourceVersion, "kind", options.Kind, "method", method)
ctx = klog.NewContext(ctx, log)
startStorage := time.Now()
res, err := d.Storage.Get(ctx, name, options)
objStorage, err := d.Storage.Get(ctx, name, options)
d.recordStorageDuration(err != nil, mode2Str, options.Kind, method, startStorage)
if err != nil {
// if it errors because it's not found, we try to fetch it from the legacy storage
if apierrors.IsNotFound(err) {
d.recordStorageDuration(false, mode2Str, options.Kind, method, startStorage)
log.Info("object not found in storage, fetching from legacy")
startLegacy := time.Now()
res, err = d.Legacy.Get(ctx, name, options)
if err != nil {
log.Error(err, "unable to fetch object from legacy")
d.recordLegacyDuration(true, mode2Str, options.Kind, method, startLegacy)
}
d.recordLegacyDuration(false, mode2Str, options.Kind, method, startLegacy)
return res, err
if !apierrors.IsNotFound(err) {
log.Error(err, "unable to fetch object from storage")
return objStorage, err
}
d.recordStorageDuration(true, mode2Str, options.Kind, method, startStorage)
log.Error(err, "unable to fetch object from storage")
return res, err
log.Info("object not found in storage, fetching from legacy")
}
return res, err
startLegacy := time.Now()
objLegacy, err := d.Legacy.Get(ctx, name, options)
if err != nil {
log.Error(err, "unable to fetch object from legacy")
d.recordLegacyDuration(true, mode2Str, options.Kind, method, startLegacy)
return objLegacy, err
}
d.recordLegacyDuration(false, mode2Str, options.Kind, method, startLegacy)
areEqual := Compare(objStorage, objLegacy)
d.recordOutcome(mode2Str, name, areEqual, method)
if !areEqual {
log.Info("object from legacy and storage are not equal")
}
// if there is no object in storage, we return the object from legacy
if objStorage == nil {
d.recordReadLegacyCount(options.Kind, method)
return objLegacy, nil
}
return objStorage, err
}
// List overrides the behavior of the generic DualWriter.
// It returns Storage entries if possible and falls back to LegacyStorage entries if not.
func (d *DualWriterMode2) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
log := d.Log.WithValues("kind", options.Kind, "resourceVersion", options.ResourceVersion, "kind", options.Kind)
ctx = klog.NewContext(ctx, log)
var method = "list"
log := d.Log.WithValues("kind", options.Kind, "resourceVersion", options.ResourceVersion, "kind", options.Kind, "method", method)
ctx = klog.NewContext(ctx, log)
startLegacy := time.Now()
ll, err := d.Legacy.List(ctx, options)
@@ -123,7 +141,6 @@ func (d *DualWriterMode2) List(ctx context.Context, options *metainternalversion
return nil, err
}
// TODO: why do we need this?
if optionsStorage.LabelSelector == nil {
return ll, nil
}
@@ -148,22 +165,35 @@ func (d *DualWriterMode2) List(ctx context.Context, options *metainternalversion
if err != nil {
return nil, err
}
if legacyIndex, ok := indexMap[accessor.GetName()]; ok {
name := accessor.GetName()
if legacyIndex, ok := indexMap[name]; ok {
legacyList[legacyIndex] = obj
areEqual := Compare(obj, legacyList[legacyIndex])
d.recordOutcome(mode2Str, name, areEqual, method)
if !areEqual {
log.WithValues("name", name).Info("object from legacy and storage are not equal")
}
}
}
if err = meta.SetList(ll, legacyList); err != nil {
return nil, err
}
// if the number of items in the legacy list and the storage list are the same, we can return the storage list
if len(storageList) == len(legacyList) {
return sl, nil
}
log.Info("lists from legacy and storage are not the same size")
d.recordReadLegacyCount(options.Kind, method)
return ll, nil
}
// DeleteCollection overrides the behavior of the generic DualWriter and deletes from both LegacyStorage and Storage.
func (d *DualWriterMode2) DeleteCollection(ctx context.Context, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) (runtime.Object, error) {
log := d.Log.WithValues("kind", options.Kind, "resourceVersion", listOptions.ResourceVersion)
ctx = klog.NewContext(ctx, log)
var method = "delete-collection"
log := d.Log.WithValues("kind", options.Kind, "resourceVersion", listOptions.ResourceVersion, "method", method)
ctx = klog.NewContext(ctx, log)
startLegacy := time.Now()
deleted, err := d.Legacy.DeleteCollection(ctx, deleteValidation, options, listOptions)
@@ -194,16 +224,23 @@ func (d *DualWriterMode2) DeleteCollection(ctx context.Context, deleteValidation
if err != nil {
log.WithValues("deleted", res).Error(err, "failed to delete collection successfully from Storage")
d.recordStorageDuration(true, mode2Str, options.Kind, method, startStorage)
return res, err
}
d.recordStorageDuration(false, mode2Str, options.Kind, method, startStorage)
areEqual := Compare(res, deleted)
d.recordOutcome(mode2Str, getName(res), areEqual, method)
if !areEqual {
log.Info("object from legacy and storage are not equal")
}
return res, err
}
func (d *DualWriterMode2) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
log := d.Log.WithValues("name", name, "kind", options.Kind)
ctx = klog.NewContext(ctx, log)
var method = "delete"
log := d.Log.WithValues("name", name, "kind", options.Kind, "method", method)
ctx = klog.NewContext(ctx, log)
startLegacy := time.Now()
deletedLS, async, err := d.Legacy.Delete(ctx, name, deleteValidation, options)
@@ -223,14 +260,23 @@ func (d *DualWriterMode2) Delete(ctx context.Context, name string, deleteValidat
log.WithValues("objectList", deletedS).Error(err, "could not delete from duplicate storage")
d.recordStorageDuration(true, mode2Str, options.Kind, method, startStorage)
}
return deletedS, async, err
}
d.recordStorageDuration(false, mode2Str, options.Kind, method, startStorage)
return deletedLS, async, err
areEqual := Compare(deletedS, deletedLS)
d.recordOutcome(mode2Str, name, areEqual, method)
if !areEqual {
log.WithValues("name", name).Info("object from legacy and storage are not equal")
}
return deletedS, async, err
}
// Update overrides the generic behavior of the Storage and writes first to the legacy storage and then to storage.
func (d *DualWriterMode2) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
log := d.Log.WithValues("name", name, "kind", options.Kind)
var method = "update"
log := d.Log.WithValues("name", name, "kind", options.Kind, "method", method)
ctx = klog.NewContext(ctx, log)
// get foundObj and (updated) object so they can be stored in legacy store
@@ -276,8 +322,15 @@ func (d *DualWriterMode2) Update(ctx context.Context, name string, objInfo rest.
res, created, err := d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
if err != nil {
log.WithValues("object", res).Error(err, "could not update in storage")
d.recordStorageDuration(true, mode2Str, options.Kind, "update", startStorage)
return res, created, err
}
areEqual := Compare(res, obj)
d.recordOutcome(mode2Str, name, areEqual, method)
if !areEqual {
log.WithValues("name", name).Info("object from legacy and storage are not equal")
}
d.recordStorageDuration(err != nil, mode2Str, options.Kind, "update", startStorage)
return res, created, err
}
@@ -346,7 +399,6 @@ func enrichLegacyObject(originalObj, returnedObj runtime.Object) error {
}
accessorReturned.SetAnnotations(ac)
// otherwise, we propagate the original RV and UID
accessorReturned.SetResourceVersion(accessorOriginal.GetResourceVersion())
accessorReturned.SetUID(accessorOriginal.GetUID())
return nil

View File

@@ -9,9 +9,10 @@ import (
)
type dualWriterMetrics struct {
legacy *prometheus.HistogramVec
storage *prometheus.HistogramVec
outcome *prometheus.HistogramVec
legacy *prometheus.HistogramVec
storage *prometheus.HistogramVec
outcome *prometheus.HistogramVec
legacyReads *prometheus.CounterVec
}
// DualWriterStorageDuration is a metric summary for dual writer storage duration per mode
@@ -38,6 +39,12 @@ var DualWriterOutcome = prometheus.NewHistogramVec(prometheus.HistogramOpts{
NativeHistogramBucketFactor: 1.1,
}, []string{"mode", "name", "method"})
var DualWriterReadLegacyCounts = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "dual_writer_read_legacy_count",
Help: "Histogram for the runtime of dual writer reads from legacy",
Namespace: "grafana",
}, []string{"kind", "method"})
func (m *dualWriterMetrics) init(reg prometheus.Registerer) {
log := klog.NewKlogr()
m.legacy = DualWriterLegacyDuration
@@ -61,7 +68,6 @@ func (m *dualWriterMetrics) recordStorageDuration(isError bool, mode string, nam
m.storage.WithLabelValues(strconv.FormatBool(isError), mode, name, method).Observe(duration)
}
// nolint:unused
func (m *dualWriterMetrics) recordOutcome(mode string, name string, outcome bool, method string) {
var observeValue float64
if outcome {
@@ -69,3 +75,7 @@ func (m *dualWriterMetrics) recordOutcome(mode string, name string, outcome bool
}
m.outcome.WithLabelValues(mode, name, method).Observe(observeValue)
}
func (m *dualWriterMetrics) recordReadLegacyCount(kind string, method string) {
m.legacyReads.WithLabelValues(kind, method).Inc()
}