Also call storage on mode1. Measure latency (#87739)

* Also call storage on mode1. Add metrics

* Update comment

* Don't use compare function for now

* Remove very important space

* Finish add logging in mode2.
Also call US in mode1 in a non blocking way

* Improve code readability on modes 1 and 2

* Fix tests

* Rename vars

* Lint

* Return error from legacy write

* Renume useless defer

* [REVIEW] improvements

* Pass kind instead of name

* Use kind instead of name in metrics

* Only call latency metrics once

* Return error on writes to legacystore in mode1

* Move accesssor logic into the goroutine as well
This commit is contained in:
Leonor Oliveira 2024-05-22 09:23:29 +01:00 committed by GitHub
parent 7c5c62f617
commit dd771e818e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 244 additions and 70 deletions

View File

@ -2,7 +2,10 @@ package rest
import (
"context"
"errors"
"time"
"k8s.io/apimachinery/pkg/api/meta"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -17,6 +20,10 @@ type DualWriterMode1 struct {
*dualWriterMetrics
}
const (
mode1Str = "1"
)
// NewDualWriterMode1 returns a new DualWriter in mode 1.
// Mode 1 represents writing to and reading from LegacyStorage.
func NewDualWriterMode1(legacy LegacyStorage, storage Storage) *DualWriterMode1 {
@ -27,36 +34,195 @@ func NewDualWriterMode1(legacy LegacyStorage, storage Storage) *DualWriterMode1
// Create overrides the behavior of the generic DualWriter and writes only to LegacyStorage.
func (d *DualWriterMode1) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
ctx = klog.NewContext(ctx, d.Log)
return d.Legacy.Create(ctx, obj, createValidation, options)
log := d.Log.WithValues("kind", options.Kind)
ctx = klog.NewContext(ctx, log)
var method = "create"
startLegacy := time.Now()
res, err := d.Legacy.Create(ctx, obj, createValidation, options)
if err != nil {
log.Error(err, "unable to create object in legacy storage")
d.recordLegacyDuration(true, mode1Str, options.Kind, method, startLegacy)
return res, err
}
d.recordLegacyDuration(false, mode1Str, options.Kind, method, startLegacy)
go func() {
accessorCreated, err := meta.Accessor(res)
if err != nil {
log.Error(err, "unable to get accessor for created object")
}
accessorOld, err := meta.Accessor(obj)
if err != nil {
log.Error(err, "unable to get accessor for old object")
}
enrichObject(accessorOld, accessorCreated)
startStorage := time.Now()
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage create timeout"))
defer cancel()
_, errObjectSt := d.Storage.Create(ctx, obj, createValidation, options)
d.recordStorageDuration(errObjectSt != nil, mode1Str, options.Kind, method, startStorage)
}()
return res, nil
}
// Get overrides the behavior of the generic DualWriter and reads only from LegacyStorage.
func (d *DualWriterMode1) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
ctx = klog.NewContext(ctx, d.Log)
return d.Legacy.Get(ctx, name, options)
log := d.Log.WithValues("name", name, "resourceVersion", options.ResourceVersion, "kind", options.Kind)
ctx = klog.NewContext(ctx, log)
var method = "get"
startLegacy := time.Now()
res, errLegacy := d.Legacy.Get(ctx, name, options)
if errLegacy != nil {
log.Error(errLegacy, "unable to get object in legacy storage")
}
d.recordLegacyDuration(errLegacy != nil, mode1Str, options.Kind, method, startLegacy)
go func() {
startStorage := time.Now()
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage get timeout"))
defer cancel()
_, err := d.Storage.Get(ctx, name, options)
d.recordStorageDuration(err != nil, mode1Str, options.Kind, method, startStorage)
}()
return res, errLegacy
}
// List overrides the behavior of the generic DualWriter and reads only from LegacyStorage.
func (d *DualWriterMode1) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
ctx = klog.NewContext(ctx, d.Log)
return d.Legacy.List(ctx, options)
log := d.Log.WithValues("kind", options.Kind, "resourceVersion", options.ResourceVersion, "kind", options.Kind)
ctx = klog.NewContext(ctx, log)
var method = "list"
startLegacy := time.Now()
res, errLegacy := d.Legacy.List(ctx, options)
if errLegacy != nil {
log.Error(errLegacy, "unable to list object in legacy storage")
}
d.recordLegacyDuration(errLegacy != nil, mode1Str, options.Kind, method, startLegacy)
go func() {
startStorage := time.Now()
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage list timeout"))
defer cancel()
_, err := d.Storage.List(ctx, options)
d.recordStorageDuration(err != nil, mode1Str, options.Kind, method, startStorage)
}()
return res, errLegacy
}
func (d *DualWriterMode1) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
log := d.Log.WithValues("name", name, "kind", options.Kind)
ctx = klog.NewContext(ctx, d.Log)
return d.Legacy.Delete(ctx, name, deleteValidation, options)
var method = "delete"
startLegacy := time.Now()
res, async, err := d.Legacy.Delete(ctx, name, deleteValidation, options)
if err != nil {
log.Error(err, "unable to delete object in legacy storage")
d.recordLegacyDuration(true, mode1Str, options.Kind, method, startLegacy)
return res, async, err
}
d.recordLegacyDuration(false, mode1Str, options.Kind, method, startLegacy)
go func() {
startStorage := time.Now()
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage delete timeout"))
defer cancel()
_, _, err := d.Storage.Delete(ctx, name, deleteValidation, options)
d.recordStorageDuration(err != nil, mode1Str, options.Kind, method, startStorage)
}()
return res, async, nil
}
// DeleteCollection overrides the behavior of the generic DualWriter and deletes only from LegacyStorage.
func (d *DualWriterMode1) DeleteCollection(ctx context.Context, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) (runtime.Object, error) {
ctx = klog.NewContext(ctx, d.Log)
return d.Legacy.DeleteCollection(ctx, deleteValidation, options, listOptions)
log := d.Log.WithValues("kind", options.Kind, "resourceVersion", listOptions.ResourceVersion)
ctx = klog.NewContext(ctx, log)
var method = "delete-collection"
startLegacy := time.Now()
res, err := d.Legacy.DeleteCollection(ctx, deleteValidation, options, listOptions)
if err != nil {
log.Error(err, "unable to delete collection in legacy storage")
d.recordLegacyDuration(true, mode1Str, options.Kind, method, startLegacy)
return res, err
}
d.recordLegacyDuration(false, mode1Str, options.Kind, method, startLegacy)
go func() {
startStorage := time.Now()
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage deletecollection timeout"))
defer cancel()
_, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions)
d.recordStorageDuration(err != nil, mode1Str, options.Kind, method, startStorage)
}()
return res, nil
}
func (d *DualWriterMode1) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
ctx = klog.NewContext(ctx, d.Log)
return d.Legacy.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
log := d.Log.WithValues("name", name, "kind", options.Kind)
ctx = klog.NewContext(ctx, log)
var method = "update"
startLegacy := time.Now()
res, async, err := d.Legacy.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
if err != nil {
log.Error(err, "unable to update in legacy storage")
d.recordLegacyDuration(true, mode1Str, options.Kind, method, startLegacy)
return res, async, err
}
d.recordLegacyDuration(false, mode1Str, options.Kind, method, startLegacy)
go func() {
updated, err := objInfo.UpdatedObject(ctx, res)
if err != nil {
log.WithValues("object", updated).Error(err, "could not update or create object")
}
// get the object to be updated
foundObj, err := d.Storage.Get(ctx, name, &metav1.GetOptions{})
if err != nil {
log.WithValues("object", foundObj).Error(err, "could not get object to update")
}
// if the object is found, create a new updateWrapper with the object found
if foundObj != nil {
accessorOld, err := meta.Accessor(foundObj)
if err != nil {
log.Error(err, "unable to get accessor for original updated object")
}
accessor, err := meta.Accessor(res)
if err != nil {
log.Error(err, "unable to get accessor for updated object")
}
accessor.SetResourceVersion(accessorOld.GetResourceVersion())
accessor.SetUID(accessorOld.GetUID())
enrichObject(accessorOld, accessor)
objInfo = &updateWrapper{
upstream: objInfo,
updated: res,
}
}
startStorage := time.Now()
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage update timeout"))
defer cancel()
_, _, errObjectSt := d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
d.recordStorageDuration(errObjectSt != nil, mode1Str, options.Kind, method, startStorage)
}()
return res, async, nil
}
func (d *DualWriterMode1) Destroy() {

View File

@ -13,11 +13,11 @@ import (
"k8s.io/apiserver/pkg/apis/example"
)
var exampleObj = &example.Pod{TypeMeta: metav1.TypeMeta{}, ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1"}, Spec: example.PodSpec{}, Status: example.PodStatus{}}
var exampleObjDifferentRV = &example.Pod{TypeMeta: metav1.TypeMeta{}, ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "3"}, Spec: example.PodSpec{}, Status: example.PodStatus{}}
var anotherObj = &example.Pod{TypeMeta: metav1.TypeMeta{}, ObjectMeta: metav1.ObjectMeta{Name: "bar", ResourceVersion: "2"}, Spec: example.PodSpec{}, Status: example.PodStatus{}}
var failingObj = &example.Pod{TypeMeta: metav1.TypeMeta{}, ObjectMeta: metav1.ObjectMeta{Name: "object-fail", ResourceVersion: "2"}, Spec: example.PodSpec{}, Status: example.PodStatus{}}
var exampleList = &example.PodList{TypeMeta: metav1.TypeMeta{}, ListMeta: metav1.ListMeta{}, Items: []example.Pod{*exampleObj}}
var exampleObj = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1"}, Spec: example.PodSpec{}, Status: example.PodStatus{}}
var exampleObjDifferentRV = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "3"}, Spec: example.PodSpec{}, Status: example.PodStatus{}}
var anotherObj = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "bar", ResourceVersion: "2"}, Spec: example.PodSpec{}, Status: example.PodStatus{}}
var failingObj = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "object-fail", ResourceVersion: "2"}, Spec: example.PodSpec{}, Status: example.PodStatus{}}
var exampleList = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{}, Items: []example.Pod{*exampleObj}}
var anotherList = &example.PodList{Items: []example.Pod{*anotherObj}}
func TestMode1_Create(t *testing.T) {

View File

@ -32,11 +32,12 @@ func NewDualWriterMode2(legacy LegacyStorage, storage Storage) *DualWriterMode2
// Create overrides the behavior of the generic DualWriter and writes to LegacyStorage and Storage.
func (d *DualWriterMode2) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
ctx = klog.NewContext(ctx, d.Log)
log := d.Log.WithValues("kind", options.Kind)
ctx = klog.NewContext(ctx, log)
created, err := d.Legacy.Create(ctx, obj, createValidation, options)
if err != nil {
d.Log.Error(err, "unable to create object in legacy storage")
log.Error(err, "unable to create object in legacy storage")
return created, err
}
@ -58,7 +59,7 @@ func (d *DualWriterMode2) Create(ctx context.Context, obj runtime.Object, create
rsp, err := d.Storage.Create(ctx, created, createValidation, options)
if err != nil {
d.Log.WithValues("name", accessorCreated.GetName(), "resourceVersion", accessorCreated.GetResourceVersion()).Error(err, "unable to create object in duplicate storage")
log.WithValues("name", accessorCreated.GetName(), "resourceVersion", accessorCreated.GetResourceVersion()).Error(err, "unable to create object in storage")
}
return rsp, err
}
@ -66,7 +67,7 @@ func (d *DualWriterMode2) Create(ctx context.Context, obj runtime.Object, create
// Get overrides the behavior of the generic DualWriter.
// It retrieves an object from Storage if possible, and if not it falls back to LegacyStorage.
func (d *DualWriterMode2) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
log := d.Log.WithValues("name", name, "resourceVersion", options.ResourceVersion)
log := d.Log.WithValues("name", name, "resourceVersion", options.ResourceVersion, "kind", options.Kind)
ctx = klog.NewContext(ctx, log)
s, err := d.Storage.Get(ctx, name, &metav1.GetOptions{})
if err != nil {
@ -83,7 +84,7 @@ func (d *DualWriterMode2) Get(ctx context.Context, name string, options *metav1.
// List overrides the behavior of the generic DualWriter.
// It returns Storage entries if possible and falls back to LegacyStorage entries if not.
func (d *DualWriterMode2) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
log := d.Log.WithValues("kind", options.Kind, "resourceVersion", options.ResourceVersion)
log := d.Log.WithValues("kind", options.Kind, "resourceVersion", options.ResourceVersion, "kind", options.Kind)
ctx = klog.NewContext(ctx, log)
ll, err := d.Legacy.List(ctx, options)
if err != nil {
@ -167,7 +168,7 @@ func (d *DualWriterMode2) DeleteCollection(ctx context.Context, deleteValidation
}
func (d *DualWriterMode2) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
log := d.Log.WithValues("name", name)
log := d.Log.WithValues("name", name, "kind", options.Kind)
ctx = klog.NewContext(ctx, log)
deletedLS, async, err := d.Legacy.Delete(ctx, name, deleteValidation, options)
@ -190,23 +191,21 @@ func (d *DualWriterMode2) Delete(ctx context.Context, name string, deleteValidat
// Update overrides the generic behavior of the Storage and writes first to the legacy storage and then to storage.
func (d *DualWriterMode2) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
var notFound bool
log := d.Log.WithValues("name", name)
log := d.Log.WithValues("name", name, "kind", options.Kind)
ctx = klog.NewContext(ctx, log)
// get old and new (updated) object so they can be stored in legacy store
old, err := d.Storage.Get(ctx, name, &metav1.GetOptions{})
// get foundObj and new (updated) object so they can be stored in legacy store
foundObj, err := d.Storage.Get(ctx, name, &metav1.GetOptions{})
if err != nil {
if !apierrors.IsNotFound(err) {
log.WithValues("object", old).Error(err, "could not get object to update")
log.WithValues("object", foundObj).Error(err, "could not get object to update")
return nil, false, err
}
notFound = true
log.Info("object not found for update, creating one")
}
// obj can be populated in case it's found or empty in case it's not found
updated, err := objInfo.UpdatedObject(ctx, old)
updated, err := objInfo.UpdatedObject(ctx, foundObj)
if err != nil {
log.WithValues("object", updated).Error(err, "could not update or create object")
return nil, false, err
@ -218,31 +217,28 @@ func (d *DualWriterMode2) Update(ctx context.Context, name string, objInfo rest.
return obj, created, err
}
if notFound {
return d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
// if the object is found, create a new updateWrapper with the object found
if foundObj != nil {
accessorOld, err := meta.Accessor(foundObj)
if err != nil {
log.Error(err, "unable to get accessor for original updated object")
}
accessor, err := meta.Accessor(obj)
if err != nil {
log.Error(err, "unable to get accessor for updated object")
}
enrichObject(accessorOld, accessor)
accessor.SetResourceVersion(accessorOld.GetResourceVersion())
accessor.SetUID(accessorOld.GetUID())
objInfo = &updateWrapper{
upstream: objInfo,
updated: obj,
}
}
accessor, err := meta.Accessor(obj)
if err != nil {
return nil, false, err
}
// only if object exists
accessorOld, err := meta.Accessor(old)
if err != nil {
return nil, false, err
}
enrichObject(accessorOld, accessor)
// keep the same UID and resource_version
accessor.SetResourceVersion(accessorOld.GetResourceVersion())
accessor.SetUID(accessorOld.GetUID())
objInfo = &updateWrapper{
upstream: objInfo,
updated: obj,
}
// TODO: relies on GuaranteedUpdate creating the object if
// it doesn't exist: https://github.com/grafana/grafana/pull/85206
return d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
@ -309,6 +305,9 @@ func enrichObject(accessorO, accessorC metav1.Object) {
accessorC.SetLabels(accessorO.GetLabels())
ac := accessorC.GetAnnotations()
if ac == nil {
ac = map[string]string{}
}
for k, v := range accessorO.GetAnnotations() {
ac[k] = v
}

View File

@ -424,8 +424,8 @@ func TestMode2_Update(t *testing.T) {
wantErr: true,
},
{
name: "error updating storage",
input: "object-fail",
name: "error updating storage with not found object",
input: "not-found",
setupLegacyFn: func(m *mock.Mock, input string) {
m.On("Update", mock.Anything, input, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil)
},
@ -433,7 +433,7 @@ func TestMode2_Update(t *testing.T) {
m.On("Update", mock.Anything, input, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, false, errors.New("error"))
},
setupGetFn: func(m *mock.Mock, input string) {
m.On("Get", mock.Anything, input, mock.Anything).Return(exampleObj, nil)
m.On("Get", mock.Anything, input, mock.Anything).Return(nil, errors.New(""))
},
wantErr: true,
},

View File

@ -1,6 +1,11 @@
package rest
import "github.com/prometheus/client_golang/prometheus"
import (
"strconv"
"time"
"github.com/prometheus/client_golang/prometheus"
)
type dualWriterMetrics struct {
legacy *prometheus.HistogramVec
@ -14,7 +19,7 @@ var DualWriterStorageDuration = prometheus.NewHistogramVec(prometheus.HistogramO
Help: "Histogram for the runtime of dual writer storage duration per mode",
Namespace: "grafana",
NativeHistogramBucketFactor: 1.1,
}, []string{"status_code", "mode", "name", "method"})
}, []string{"is_error", "mode", "kind", "method"})
// DualWriterLegacyDuration is a metric summary for dual writer legacy duration per mode
var DualWriterLegacyDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
@ -22,7 +27,7 @@ var DualWriterLegacyDuration = prometheus.NewHistogramVec(prometheus.HistogramOp
Help: "Histogram for the runtime of dual writer legacy duration per mode",
Namespace: "grafana",
NativeHistogramBucketFactor: 1.1,
}, []string{"status_code", "mode", "name", "method"})
}, []string{"is_error", "mode", "kind", "method"})
// DualWriterOutcome is a metric summary for dual writer outcome comparison between the 2 stores per mode
var DualWriterOutcome = prometheus.NewHistogramVec(prometheus.HistogramOpts{
@ -30,7 +35,7 @@ var DualWriterOutcome = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Help: "Histogram for the runtime of dual writer outcome comparison between the 2 stores per mode",
Namespace: "grafana",
NativeHistogramBucketFactor: 1.1,
}, []string{"mode", "name", "outcome", "method"})
}, []string{"mode", "name", "method"})
func (m *dualWriterMetrics) init() {
m.legacy = DualWriterLegacyDuration
@ -38,17 +43,21 @@ func (m *dualWriterMetrics) init() {
m.outcome = DualWriterOutcome
}
// nolint:unused
func (m *dualWriterMetrics) recordLegacyDuration(statusCode string, mode string, name string, method string, duration float64) {
m.legacy.WithLabelValues(statusCode, mode, name, method).Observe(duration)
func (m *dualWriterMetrics) recordLegacyDuration(isError bool, mode string, name string, method string, startFrom time.Time) {
duration := time.Since(startFrom).Seconds()
m.legacy.WithLabelValues(strconv.FormatBool(isError), mode, name, method).Observe(duration)
}
func (m *dualWriterMetrics) recordStorageDuration(isError bool, mode string, name string, method string, startFrom time.Time) {
duration := time.Since(startFrom).Seconds()
m.storage.WithLabelValues(strconv.FormatBool(isError), mode, name, method).Observe(duration)
}
// nolint:unused
func (m *dualWriterMetrics) recordStorageDuration(statusCode string, mode string, name string, method string, duration float64) {
m.storage.WithLabelValues(statusCode, mode, name, method).Observe(duration)
}
// nolint:unused
func (m *dualWriterMetrics) recordOutcome(mode string, name string, outcome string, method string) {
m.outcome.WithLabelValues(mode, name, outcome, method).Observe(1)
func (m *dualWriterMetrics) recordOutcome(mode string, name string, outcome bool, method string) {
var observeValue float64
if outcome {
observeValue = 1
}
m.outcome.WithLabelValues(mode, name, method).Observe(observeValue)
}