mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
* Extract returned object hydration into function * Finish writing tests for utils func * Lint * Update pkg/apiserver/rest/dualwriter_mode2.go Co-authored-by: Arati R. <33031346+suntala@users.noreply.github.com> * Better var naming * Remove duplicated logic * Lint * Fix test * Lint * Make type private * Fix one more test * Fix test --------- Co-authored-by: Arati R. <33031346+suntala@users.noreply.github.com>
320 lines
10 KiB
Go
320 lines
10 KiB
Go
package rest
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/grafana/grafana/pkg/services/apiserver/utils"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
"k8s.io/apimachinery/pkg/api/meta"
|
|
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/labels"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/selection"
|
|
"k8s.io/apiserver/pkg/registry/rest"
|
|
"k8s.io/klog/v2"
|
|
)
|
|
|
|
type DualWriterMode2 struct {
|
|
Storage Storage
|
|
Legacy LegacyStorage
|
|
*dualWriterMetrics
|
|
Log klog.Logger
|
|
}
|
|
|
|
// NewDualWriterMode2 returns a new DualWriter in mode 2.
|
|
// Mode 2 represents writing to LegacyStorage and Storage and reading from LegacyStorage.
|
|
func NewDualWriterMode2(legacy LegacyStorage, storage Storage) *DualWriterMode2 {
|
|
metrics := &dualWriterMetrics{}
|
|
metrics.init()
|
|
return &DualWriterMode2{Legacy: legacy, Storage: storage, Log: klog.NewKlogr().WithName("DualWriterMode2"), dualWriterMetrics: metrics}
|
|
}
|
|
|
|
// Mode returns the mode of the dual writer.
|
|
func (d *DualWriterMode2) Mode() DualWriterMode {
|
|
return Mode2
|
|
}
|
|
|
|
// Create overrides the behavior of the generic DualWriter and writes to LegacyStorage and Storage.
|
|
func (d *DualWriterMode2) Create(ctx context.Context, original runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
|
|
log := d.Log.WithValues("kind", options.Kind)
|
|
ctx = klog.NewContext(ctx, log)
|
|
|
|
created, err := d.Legacy.Create(ctx, original, createValidation, options)
|
|
if err != nil {
|
|
log.Error(err, "unable to create object in legacy storage")
|
|
return created, err
|
|
}
|
|
|
|
createdLegacy, err := enrichLegacyObject(original, created, true)
|
|
if err != nil {
|
|
return createdLegacy, err
|
|
}
|
|
|
|
rsp, err := d.Storage.Create(ctx, created, createValidation, options)
|
|
if err != nil {
|
|
log.WithValues("name").Error(err, "unable to create object in storage")
|
|
return rsp, err
|
|
}
|
|
return rsp, nil
|
|
}
|
|
|
|
// It retrieves an object from Storage if possible, and if not it falls back to LegacyStorage.
|
|
func (d *DualWriterMode2) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
|
|
log := d.Log.WithValues("name", name, "resourceVersion", options.ResourceVersion, "kind", options.Kind)
|
|
ctx = klog.NewContext(ctx, log)
|
|
s, err := d.Storage.Get(ctx, name, &metav1.GetOptions{})
|
|
if err != nil {
|
|
if apierrors.IsNotFound(err) {
|
|
log.Info("object not found in storage")
|
|
return d.Legacy.Get(ctx, name, &metav1.GetOptions{})
|
|
}
|
|
log.Error(err, "unable to fetch object from storage")
|
|
return d.Legacy.Get(ctx, name, &metav1.GetOptions{})
|
|
}
|
|
return s, nil
|
|
}
|
|
|
|
// List overrides the behavior of the generic DualWriter.
|
|
// It returns Storage entries if possible and falls back to LegacyStorage entries if not.
|
|
func (d *DualWriterMode2) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
|
|
log := d.Log.WithValues("kind", options.Kind, "resourceVersion", options.ResourceVersion, "kind", options.Kind)
|
|
ctx = klog.NewContext(ctx, log)
|
|
ll, err := d.Legacy.List(ctx, options)
|
|
if err != nil {
|
|
log.Error(err, "unable to list objects from legacy storage")
|
|
return nil, err
|
|
}
|
|
legacyList, err := meta.ExtractList(ll)
|
|
if err != nil {
|
|
log.Error(err, "unable to extract list from legacy storage")
|
|
return nil, err
|
|
}
|
|
|
|
// Record the index of each LegacyStorage object so it can later be replaced by
|
|
// an equivalent Storage object if it exists.
|
|
optionsStorage, indexMap, err := parseList(legacyList)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if optionsStorage.LabelSelector == nil {
|
|
return ll, nil
|
|
}
|
|
|
|
sl, err := d.Storage.List(ctx, &optionsStorage)
|
|
if err != nil {
|
|
log.Error(err, "unable to list objects from storage")
|
|
return nil, err
|
|
}
|
|
storageList, err := meta.ExtractList(sl)
|
|
if err != nil {
|
|
log.Error(err, "unable to extract list from storage")
|
|
return nil, err
|
|
}
|
|
|
|
for _, obj := range storageList {
|
|
accessor, err := meta.Accessor(obj)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if legacyIndex, ok := indexMap[accessor.GetName()]; ok {
|
|
legacyList[legacyIndex] = obj
|
|
}
|
|
}
|
|
|
|
if err = meta.SetList(ll, legacyList); err != nil {
|
|
return nil, err
|
|
}
|
|
return ll, nil
|
|
}
|
|
|
|
// DeleteCollection overrides the behavior of the generic DualWriter and deletes from both LegacyStorage and Storage.
|
|
func (d *DualWriterMode2) DeleteCollection(ctx context.Context, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) (runtime.Object, error) {
|
|
log := d.Log.WithValues("kind", options.Kind, "resourceVersion", listOptions.ResourceVersion)
|
|
ctx = klog.NewContext(ctx, log)
|
|
deleted, err := d.Legacy.DeleteCollection(ctx, deleteValidation, options, listOptions)
|
|
if err != nil {
|
|
log.WithValues("deleted", deleted).Error(err, "failed to delete collection successfully from legacy storage")
|
|
return nil, err
|
|
}
|
|
legacyList, err := meta.ExtractList(deleted)
|
|
if err != nil {
|
|
log.Error(err, "unable to extract list from legacy storage")
|
|
return nil, err
|
|
}
|
|
|
|
// Only the items deleted by the legacy DeleteCollection call are selected for deletion by Storage.
|
|
optionsStorage, _, err := parseList(legacyList)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if optionsStorage.LabelSelector == nil {
|
|
return deleted, nil
|
|
}
|
|
|
|
res, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, &optionsStorage)
|
|
if err != nil {
|
|
log.WithValues("deleted", res).Error(err, "failed to delete collection successfully from Storage")
|
|
}
|
|
|
|
return res, err
|
|
}
|
|
|
|
func (d *DualWriterMode2) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
|
|
log := d.Log.WithValues("name", name, "kind", options.Kind)
|
|
ctx = klog.NewContext(ctx, log)
|
|
|
|
deletedLS, async, err := d.Legacy.Delete(ctx, name, deleteValidation, options)
|
|
if err != nil {
|
|
if !apierrors.IsNotFound(err) {
|
|
log.WithValues("objectList", deletedLS).Error(err, "could not delete from legacy store")
|
|
return deletedLS, async, err
|
|
}
|
|
}
|
|
|
|
deletedS, _, errUS := d.Storage.Delete(ctx, name, deleteValidation, options)
|
|
if errUS != nil {
|
|
if !apierrors.IsNotFound(errUS) {
|
|
log.WithValues("objectList", deletedS).Error(errUS, "could not delete from duplicate storage")
|
|
}
|
|
}
|
|
|
|
return deletedLS, async, err
|
|
}
|
|
|
|
// Update overrides the generic behavior of the Storage and writes first to the legacy storage and then to storage.
|
|
func (d *DualWriterMode2) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
|
|
log := d.Log.WithValues("name", name, "kind", options.Kind)
|
|
ctx = klog.NewContext(ctx, log)
|
|
|
|
// get foundObj and (updated) object so they can be stored in legacy store
|
|
foundObj, err := d.Storage.Get(ctx, name, &metav1.GetOptions{})
|
|
if err != nil {
|
|
if !apierrors.IsNotFound(err) {
|
|
log.WithValues("object", foundObj).Error(err, "could not get object to update")
|
|
return nil, false, err
|
|
}
|
|
log.Info("object not found for update, creating one")
|
|
}
|
|
|
|
// obj can be populated in case it's found or empty in case it's not found
|
|
updated, err := objInfo.UpdatedObject(ctx, foundObj)
|
|
if err != nil {
|
|
log.WithValues("object", updated).Error(err, "could not update or create object")
|
|
return nil, false, err
|
|
}
|
|
|
|
obj, created, err := d.Legacy.Update(ctx, name, &updateWrapper{upstream: objInfo, updated: updated}, createValidation, updateValidation, forceAllowCreate, options)
|
|
if err != nil {
|
|
log.WithValues("object", obj).Error(err, "could not update in legacy storage")
|
|
return obj, created, err
|
|
}
|
|
|
|
if foundObj != nil {
|
|
obj, err = enrichLegacyObject(foundObj, obj, false)
|
|
if err != nil {
|
|
return obj, false, err
|
|
}
|
|
|
|
objInfo = &updateWrapper{
|
|
upstream: objInfo,
|
|
updated: obj,
|
|
}
|
|
}
|
|
|
|
return d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
|
|
}
|
|
|
|
func (d *DualWriterMode2) Destroy() {
|
|
d.Storage.Destroy()
|
|
d.Legacy.Destroy()
|
|
}
|
|
|
|
func (d *DualWriterMode2) GetSingularName() string {
|
|
return d.Storage.GetSingularName()
|
|
}
|
|
|
|
func (d *DualWriterMode2) NamespaceScoped() bool {
|
|
return d.Storage.NamespaceScoped()
|
|
}
|
|
|
|
func (d *DualWriterMode2) New() runtime.Object {
|
|
return d.Storage.New()
|
|
}
|
|
|
|
func (d *DualWriterMode2) NewList() runtime.Object {
|
|
return d.Storage.NewList()
|
|
}
|
|
|
|
func (d *DualWriterMode2) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) {
|
|
return d.Storage.ConvertToTable(ctx, object, tableOptions)
|
|
}
|
|
|
|
func parseList(legacyList []runtime.Object) (metainternalversion.ListOptions, map[string]int, error) {
|
|
options := metainternalversion.ListOptions{}
|
|
originKeys := []string{}
|
|
indexMap := map[string]int{}
|
|
|
|
for i, obj := range legacyList {
|
|
metaAccessor, err := utils.MetaAccessor(obj)
|
|
if err != nil {
|
|
return options, nil, err
|
|
}
|
|
originKeys = append(originKeys, metaAccessor.GetOriginKey())
|
|
|
|
accessor, err := meta.Accessor(obj)
|
|
if err != nil {
|
|
return options, nil, err
|
|
}
|
|
indexMap[accessor.GetName()] = i
|
|
}
|
|
|
|
if len(originKeys) == 0 {
|
|
return options, nil, nil
|
|
}
|
|
|
|
r, err := labels.NewRequirement(utils.AnnoKeyOriginKey, selection.In, originKeys)
|
|
if err != nil {
|
|
return options, nil, err
|
|
}
|
|
options.LabelSelector = labels.NewSelector().Add(*r)
|
|
|
|
return options, indexMap, nil
|
|
}
|
|
|
|
func enrichLegacyObject(originalObj, returnedObj runtime.Object, created bool) (runtime.Object, error) {
|
|
accessorReturned, err := meta.Accessor(returnedObj)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
accessorOriginal, err := meta.Accessor(originalObj)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
accessorReturned.SetLabels(accessorOriginal.GetLabels())
|
|
|
|
ac := accessorReturned.GetAnnotations()
|
|
if ac == nil {
|
|
ac = map[string]string{}
|
|
}
|
|
for k, v := range accessorOriginal.GetAnnotations() {
|
|
ac[k] = v
|
|
}
|
|
accessorReturned.SetAnnotations(ac)
|
|
|
|
// if the object is created, we need to reset the resource version and UID
|
|
// create method expects an empty resource version
|
|
if created {
|
|
accessorReturned.SetResourceVersion("")
|
|
accessorReturned.SetUID("")
|
|
return returnedObj, nil
|
|
}
|
|
// otherwise, we propagate the original RV and UID
|
|
accessorReturned.SetResourceVersion(accessorOriginal.GetResourceVersion())
|
|
accessorReturned.SetUID(accessorOriginal.GetUID())
|
|
return returnedObj, nil
|
|
}
|