diff --git a/pkg/apimachinery/utils/meta.go b/pkg/apimachinery/utils/meta.go index d6574c52748..47248f8f10d 100644 --- a/pkg/apimachinery/utils/meta.go +++ b/pkg/apimachinery/utils/meta.go @@ -242,8 +242,9 @@ func (m *grafanaMetaAccessor) GetBlob() *BlobInfo { func (m *grafanaMetaAccessor) SetBlob(info *BlobInfo) { if info == nil { m.SetAnnotation(AnnoKeyBlob, "") // delete + } else { + m.SetAnnotation(AnnoKeyBlob, info.String()) } - m.SetAnnotation(AnnoKeyBlob, info.String()) } func (m *grafanaMetaAccessor) GetFolder() string { diff --git a/pkg/registry/apis/dashboard/large.go b/pkg/registry/apis/dashboard/large.go new file mode 100644 index 00000000000..0b710de4f4a --- /dev/null +++ b/pkg/registry/apis/dashboard/large.go @@ -0,0 +1,52 @@ +package dashboard + +import ( + "encoding/json" + "fmt" + + "k8s.io/apimachinery/pkg/runtime" + + commonV0 "github.com/grafana/grafana/pkg/apimachinery/apis/common/v0alpha1" + dashboard "github.com/grafana/grafana/pkg/apis/dashboard/v0alpha1" + "github.com/grafana/grafana/pkg/storage/unified/apistore" +) + +func newDashboardLargeObjectSupport() *apistore.BasicLargeObjectSupport { + return &apistore.BasicLargeObjectSupport{ + TheGroupResource: dashboard.DashboardResourceInfo.GroupResource(), + + // byte size, while testing lets do almost everything (10bytes) + ThresholdSize: 10, + + // 10mb -- we should check what the largest ones are... might be bigger + MaxByteSize: 10 * 1024 * 1024, + + ReduceSpec: func(obj runtime.Object) error { + dash, ok := obj.(*dashboard.Dashboard) + if !ok { + return fmt.Errorf("expected dashboard") + } + old := dash.Spec.Object + spec := commonV0.Unstructured{Object: make(map[string]any)} + dash.Spec = spec + dash.SetManagedFields(nil) // this could be bigger than the object! + + keep := []string{"title", "description", "schemaVersion"} + for _, k := range keep { + v, ok := old[k] + if ok { + spec.Object[k] = v + } + } + return nil + }, + + RebuildSpec: func(obj runtime.Object, blob []byte) error { + dash, ok := obj.(*dashboard.Dashboard) + if !ok { + return fmt.Errorf("expected dashboard") + } + return json.Unmarshal(blob, &dash.Spec) + }, + } +} diff --git a/pkg/registry/apis/dashboard/large_test.go b/pkg/registry/apis/dashboard/large_test.go new file mode 100644 index 00000000000..ff469863ec5 --- /dev/null +++ b/pkg/registry/apis/dashboard/large_test.go @@ -0,0 +1,60 @@ +package dashboard + +import ( + "encoding/json" + "os" + "testing" + + "github.com/stretchr/testify/require" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + dashboard "github.com/grafana/grafana/pkg/apis/dashboard/v0alpha1" +) + +func TestLargeDashboardSupport(t *testing.T) { + devdash := "../../../../devenv/dev-dashboards/all-panels.json" + + // nolint:gosec + // We can ignore the gosec G304 warning because this is a test with hardcoded input values + f, err := os.ReadFile(devdash) + require.NoError(t, err) + + dash := &dashboard.Dashboard{ + ObjectMeta: v1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + } + err = json.Unmarshal(f, &dash.Spec) + require.NoError(t, err) + + expectedPanelCount := 19 + panels, found, err := unstructured.NestedSlice(dash.Spec.Object, "panels") + require.NoError(t, err) + require.True(t, found) + require.Len(t, panels, expectedPanelCount) + + largeObject := newDashboardLargeObjectSupport() + + // Convert the dashboard to a small value + err = largeObject.ReduceSpec(dash) + require.NoError(t, err) + + small, err := json.MarshalIndent(&dash.Spec, "", " ") + require.NoError(t, err) + require.JSONEq(t, `{ + "schemaVersion": 33, + "title": "Panel tests - All panels" + }`, string(small)) + + // Now make it big again + err = largeObject.RebuildSpec(dash, f) + require.NoError(t, err) + + // check that all panels exist again + panels, found, err = unstructured.NestedSlice(dash.Spec.Object, "panels") + require.NoError(t, err) + require.True(t, found) + require.Len(t, panels, expectedPanelCount) +} diff --git a/pkg/registry/apis/dashboard/legacy_storage.go b/pkg/registry/apis/dashboard/legacy_storage.go index 42a88385a23..c2178ca1bd7 100644 --- a/pkg/registry/apis/dashboard/legacy_storage.go +++ b/pkg/registry/apis/dashboard/legacy_storage.go @@ -1,8 +1,6 @@ package dashboard import ( - "context" - "github.com/prometheus/client_golang/prometheus" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apiserver/pkg/registry/generic" @@ -48,16 +46,8 @@ func (s *dashboardStorage) newStore(scheme *runtime.Scheme, defaultOptsGetter ge return nil, err } client := resource.NewLocalResourceClient(server) - // This is needed as the apistore doesn't allow any core grafana dependencies. We extract the needed features - // to a map, to check them in the apistore itself. - features := make(map[string]any) - if s.features.IsEnabled(context.Background(), featuremgmt.FlagUnifiedStorageBigObjectsSupport) { - features[featuremgmt.FlagUnifiedStorageBigObjectsSupport] = struct{}{} - } optsGetter := apistore.NewRESTOptionsGetterForClient(client, defaultOpts.StorageConfig.Config, - features, ) - return grafanaregistry.NewRegistryStore(scheme, resourceInfo, optsGetter) } diff --git a/pkg/registry/apis/dashboard/register.go b/pkg/registry/apis/dashboard/register.go index 40892d7556b..1e10e1e6ff8 100644 --- a/pkg/registry/apis/dashboard/register.go +++ b/pkg/registry/apis/dashboard/register.go @@ -137,6 +137,15 @@ func (b *DashboardsAPIBuilder) UpdateAPIGroupInfo(apiGroupInfo *genericapiserver return err } + // Split dashboards when they are large + var largeObjects apistore.LargeObjectSupport + if b.legacy.features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageBigObjectsSupport) { + largeObjects = newDashboardLargeObjectSupport() + opts.StorageOptions(dash.GroupResource(), apistore.StorageOptions{ + LargeObjectSupport: largeObjects, + }) + } + storage := map[string]rest.Storage{} storage[dash.StoragePath()] = legacyStore storage[dash.StoragePath("history")] = apistore.NewHistoryConnector( @@ -157,7 +166,7 @@ func (b *DashboardsAPIBuilder) UpdateAPIGroupInfo(apiGroupInfo *genericapiserver } // Register the DTO endpoint that will consolidate all dashboard bits - storage[dash.StoragePath("dto")], err = newDTOConnector(storage[dash.StoragePath()], b) + storage[dash.StoragePath("dto")], err = newDTOConnector(storage[dash.StoragePath()], largeObjects, b) if err != nil { return err } diff --git a/pkg/registry/apis/dashboard/sub_dto.go b/pkg/registry/apis/dashboard/sub_dto.go index 85c4c8ccbc3..ff3bc435868 100644 --- a/pkg/registry/apis/dashboard/sub_dto.go +++ b/pkg/registry/apis/dashboard/sub_dto.go @@ -6,6 +6,10 @@ import ( "net/http" "strconv" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apiserver/pkg/registry/rest" + "github.com/grafana/authlib/claims" "github.com/grafana/grafana/pkg/apimachinery/identity" "github.com/grafana/grafana/pkg/apimachinery/utils" @@ -17,10 +21,8 @@ import ( "github.com/grafana/grafana/pkg/services/apiserver/endpoints/request" "github.com/grafana/grafana/pkg/services/dashboards" "github.com/grafana/grafana/pkg/services/guardian" + "github.com/grafana/grafana/pkg/storage/unified/apistore" "github.com/grafana/grafana/pkg/storage/unified/resource" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apiserver/pkg/registry/rest" ) // The DTO returns everything the UI needs in a single request @@ -28,16 +30,18 @@ type DTOConnector struct { getter rest.Getter legacy legacy.DashboardAccess unified resource.ResourceClient + largeObjects apistore.LargeObjectSupport accessControl accesscontrol.AccessControl log log.Logger } -func newDTOConnector(dash rest.Storage, builder *DashboardsAPIBuilder) (rest.Storage, error) { +func newDTOConnector(dash rest.Storage, largeObjects apistore.LargeObjectSupport, builder *DashboardsAPIBuilder) (rest.Storage, error) { ok := false v := &DTOConnector{ legacy: builder.legacy.access, accessControl: builder.accessControl, unified: builder.unified, + largeObjects: largeObjects, log: builder.log, } v.getter, ok = dash.(rest.Getter) @@ -86,7 +90,7 @@ func (r *DTOConnector) Connect(ctx context.Context, name string, opts runtime.Ob return nil, err } - rawobj, err := r.getter.Get(ctx, name, &v1.GetOptions{}) + rawobj, err := r.getter.Get(ctx, name, &metav1.GetOptions{}) if err != nil { return nil, err } @@ -137,8 +141,17 @@ func (r *DTOConnector) Connect(ctx context.Context, name string, opts runtime.Ob // Check for blob info blobInfo := obj.GetBlob() - if blobInfo != nil { - fmt.Printf("TODO, load full blob from storage %+v\n", blobInfo) + if blobInfo != nil && r.largeObjects != nil { + gr := r.largeObjects.GroupResource() + err = r.largeObjects.Reconstruct(ctx, &resource.ResourceKey{ + Group: gr.Group, + Resource: gr.Resource, + Namespace: obj.GetNamespace(), + Name: obj.GetName(), + }, r.unified, obj) + if err != nil { + return nil, err + } } access.Slug = slugify.Slugify(dash.Spec.GetNestedString("title")) diff --git a/pkg/services/apiserver/aggregator/aggregator_test.go b/pkg/services/apiserver/aggregator/aggregator_test.go index 30af4a352da..fe1eb1cd52b 100644 --- a/pkg/services/apiserver/aggregator/aggregator_test.go +++ b/pkg/services/apiserver/aggregator/aggregator_test.go @@ -5,7 +5,6 @@ import ( "testing" "time" - "github.com/grafana/grafana/pkg/storage/unified/apistore" "github.com/stretchr/testify/require" openapinamer "k8s.io/apiserver/pkg/endpoints/openapi" genericapiserver "k8s.io/apiserver/pkg/server" @@ -17,6 +16,8 @@ import ( "k8s.io/kube-aggregator/pkg/apiserver" aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme" aggregatoropenapi "k8s.io/kube-aggregator/pkg/generated/openapi" + + "github.com/grafana/grafana/pkg/storage/unified/apistore" ) // TestAggregatorPostStartHooks tests that the kube-aggregator server has the expected default post start hooks enabled. @@ -41,7 +42,7 @@ func TestAggregatorPostStartHooks(t *testing.T) { cfg.GenericConfig.SharedInformerFactory = informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 10*time.Minute) // override the RESTOptionsGetter to use the in memory storage options - restOptionsGetter, err := apistore.NewRESTOptionsGetterMemory(*storagebackend.NewDefaultConfig("memory", nil), make(map[string]any)) + restOptionsGetter, err := apistore.NewRESTOptionsGetterMemory(*storagebackend.NewDefaultConfig("memory", nil)) require.NoError(t, err) cfg.GenericConfig.RESTOptionsGetter = restOptionsGetter diff --git a/pkg/services/apiserver/builder/common.go b/pkg/services/apiserver/builder/common.go index 7356a648f6f..6d70c6c3b3e 100644 --- a/pkg/services/apiserver/builder/common.go +++ b/pkg/services/apiserver/builder/common.go @@ -15,6 +15,7 @@ import ( "k8s.io/kube-openapi/pkg/spec3" grafanarest "github.com/grafana/grafana/pkg/apiserver/rest" + "github.com/grafana/grafana/pkg/storage/unified/apistore" ) // TODO: this (or something like it) belongs in grafana-app-sdk, @@ -64,6 +65,7 @@ type APIGroupOptions struct { OptsGetter generic.RESTOptionsGetter DualWriteBuilder grafanarest.DualWriteBuilder MetricsRegister prometheus.Registerer + StorageOptions apistore.StorageOptionsRegister } // Builders that implement OpenAPIPostProcessor are given a chance to modify the schema directly diff --git a/pkg/services/apiserver/builder/helper.go b/pkg/services/apiserver/builder/helper.go index 251d62912f0..911f76093af 100644 --- a/pkg/services/apiserver/builder/helper.go +++ b/pkg/services/apiserver/builder/helper.go @@ -7,7 +7,6 @@ import ( "regexp" "time" - "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/prometheus/client_golang/prometheus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -25,6 +24,8 @@ import ( "k8s.io/klog/v2" "k8s.io/kube-openapi/pkg/common" + "github.com/grafana/grafana/pkg/storage/unified/apistore" + "github.com/grafana/grafana/pkg/apiserver/endpoints/filters" grafanarest "github.com/grafana/grafana/pkg/apiserver/rest" "github.com/grafana/grafana/pkg/services/apiserver/endpoints/request" @@ -167,7 +168,7 @@ func InstallAPIs( namespaceMapper request.NamespaceMapper, kvStore grafanarest.NamespacedKVStore, serverLock ServerLockService, - features featuremgmt.FeatureToggles, + optsregister apistore.StorageOptionsRegister, ) error { // dual writing is only enabled when the storage type is not legacy. // this is needed to support setting a default RESTOptionsGetter for new APIs that don't @@ -243,6 +244,7 @@ func InstallAPIs( OptsGetter: optsGetter, DualWriteBuilder: dualWrite, MetricsRegister: reg, + StorageOptions: optsregister, }); err != nil { return err } diff --git a/pkg/services/apiserver/options/grafana-aggregator.go b/pkg/services/apiserver/options/grafana-aggregator.go index 33a3961605c..693f27ddd34 100644 --- a/pkg/services/apiserver/options/grafana-aggregator.go +++ b/pkg/services/apiserver/options/grafana-aggregator.go @@ -74,7 +74,7 @@ func (o *GrafanaAggregatorOptions) ApplyTo(aggregatorConfig *aggregatorapiserver return err } // override the RESTOptionsGetter to use the in memory storage options - restOptionsGetter, err := apistore.NewRESTOptionsGetterMemory(etcdOptions.StorageConfig, make(map[string]any)) + restOptionsGetter, err := apistore.NewRESTOptionsGetterMemory(etcdOptions.StorageConfig) if err != nil { return err } diff --git a/pkg/services/apiserver/options/kube-aggregator.go b/pkg/services/apiserver/options/kube-aggregator.go index cf6937f5099..1186747f379 100644 --- a/pkg/services/apiserver/options/kube-aggregator.go +++ b/pkg/services/apiserver/options/kube-aggregator.go @@ -81,7 +81,7 @@ func (o *KubeAggregatorOptions) ApplyTo(aggregatorConfig *aggregatorapiserver.Co return err } // override the RESTOptionsGetter to use the in memory storage options - restOptionsGetter, err := apistore.NewRESTOptionsGetterMemory(etcdOptions.StorageConfig, make(map[string]any)) + restOptionsGetter, err := apistore.NewRESTOptionsGetterMemory(etcdOptions.StorageConfig) if err != nil { return err } diff --git a/pkg/services/apiserver/service.go b/pkg/services/apiserver/service.go index be56eb1e435..401c0baa25e 100644 --- a/pkg/services/apiserver/service.go +++ b/pkg/services/apiserver/service.go @@ -6,8 +6,6 @@ import ( "net/http" "path" - "github.com/grafana/dskit/services" - "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/prometheus/client_golang/prometheus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -19,6 +17,8 @@ import ( "k8s.io/client-go/tools/clientcmd" aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver" + "github.com/grafana/dskit/services" + "github.com/grafana/grafana-plugin-sdk-go/backend" dataplaneaggregator "github.com/grafana/grafana/pkg/aggregator/apiserver" "github.com/grafana/grafana/pkg/api/routing" "github.com/grafana/grafana/pkg/apimachinery/identity" @@ -295,6 +295,8 @@ func (s *service) start(ctx context.Context) error { serverConfig.LoopbackClientConfig.Transport = transport serverConfig.LoopbackClientConfig.TLSClientConfig = clientrest.TLSClientConfig{} + var optsregister apistore.StorageOptionsRegister + if o.StorageOptions.StorageType == grafanaapiserveroptions.StorageTypeEtcd { if err := o.RecommendedOptions.Etcd.Validate(); len(err) > 0 { return err[0] @@ -303,14 +305,11 @@ func (s *service) start(ctx context.Context) error { return err } } else { - // This is needed as the apistore doesn't allow any core grafana dependencies. - features := make(map[string]any) - if s.features.IsEnabled(context.Background(), featuremgmt.FlagUnifiedStorageBigObjectsSupport) { - features[featuremgmt.FlagUnifiedStorageBigObjectsSupport] = struct{}{} - } + getter := apistore.NewRESTOptionsGetterForClient(s.unified, o.RecommendedOptions.Etcd.StorageConfig) + optsregister = getter.RegisterOptions + // Use unified storage client - serverConfig.Config.RESTOptionsGetter = apistore.NewRESTOptionsGetterForClient( - s.unified, o.RecommendedOptions.Etcd.StorageConfig, features) + serverConfig.Config.RESTOptionsGetter = getter } // Add OpenAPI specs for each group+version @@ -337,7 +336,9 @@ func (s *service) start(ctx context.Context) error { // Install the API group+version err = builder.InstallAPIs(Scheme, Codecs, server, serverConfig.RESTOptionsGetter, builders, o.StorageOptions, // Required for the dual writer initialization - s.metrics, request.GetNamespaceMapper(s.cfg), kvstore.WithNamespace(s.kvStore, 0, "storage.dualwriting"), s.serverLockService, s.features, + s.metrics, request.GetNamespaceMapper(s.cfg), kvstore.WithNamespace(s.kvStore, 0, "storage.dualwriting"), + s.serverLockService, + optsregister, ) if err != nil { return err diff --git a/pkg/storage/unified/apistore/large.go b/pkg/storage/unified/apistore/large.go new file mode 100644 index 00000000000..9958cba60dc --- /dev/null +++ b/pkg/storage/unified/apistore/large.go @@ -0,0 +1,166 @@ +package apistore + +import ( + "context" + "encoding/json" + "fmt" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + + common "github.com/grafana/grafana/pkg/apimachinery/apis/common/v0alpha1" + "github.com/grafana/grafana/pkg/apimachinery/utils" + "github.com/grafana/grafana/pkg/storage/unified/resource" +) + +type LargeObjectSupport interface { + // The resource this can process + GroupResource() schema.GroupResource + + // The size that triggers delegating part of the object to blob storage + Threshold() int + + // Each resource may have a maximum size that is different than the global maximum + // for example, we know we will allow dashboards up to 10mb, however most + // resources should have a smaller limit (1mb?) + MaxSize() int + + // Deconstruct takes a large object, write most of it to blob storage and leave a few metadata bits around to help with list + // NOTE: changes to the object must be handled by mutating the input obj + Deconstruct(ctx context.Context, key *resource.ResourceKey, client resource.BlobStoreClient, obj utils.GrafanaMetaAccessor, raw []byte) error + + // Reconstruct will join the resource+blob back into a complete resource + // NOTE: changes to the object must be handled by mutating the input obj + Reconstruct(ctx context.Context, key *resource.ResourceKey, client resource.BlobStoreClient, obj utils.GrafanaMetaAccessor) error +} + +var _ LargeObjectSupport = (*BasicLargeObjectSupport)(nil) + +type BasicLargeObjectSupport struct { + TheGroupResource schema.GroupResource + ThresholdSize int + MaxByteSize int + + // Mutate the spec so it only has the small properties + ReduceSpec func(obj runtime.Object) error + + // Update the spec so it has the full object + // This is used to support server-side apply + RebuildSpec func(obj runtime.Object, blob []byte) error +} + +func (s *BasicLargeObjectSupport) GroupResource() schema.GroupResource { + return s.TheGroupResource +} + +// Threshold implements LargeObjectSupport. +func (s *BasicLargeObjectSupport) Threshold() int { + return s.ThresholdSize +} + +// MaxSize implements LargeObjectSupport. +func (s *BasicLargeObjectSupport) MaxSize() int { + return s.MaxByteSize +} + +// Deconstruct implements LargeObjectSupport. +func (s *BasicLargeObjectSupport) Deconstruct(ctx context.Context, key *resource.ResourceKey, client resource.BlobStoreClient, obj utils.GrafanaMetaAccessor, raw []byte) error { + if key.Group != s.TheGroupResource.Group { + return fmt.Errorf("requested group mismatch") + } + if key.Resource != s.TheGroupResource.Resource { + return fmt.Errorf("requested resource mismatch") + } + + spec, err := obj.GetSpec() + if err != nil { + return err + } + + var val []byte + + // :( could not figure out custom JSON marshaling + // with pointer receiver... this is a quick fix to support dashboards + u, ok := spec.(common.Unstructured) + if ok { + val, err = json.Marshal(u.Object) + } else { + val, err = json.Marshal(spec) + } + + // Write only the spec + if err != nil { + return err + } + + rt, ok := obj.GetRuntimeObject() + if !ok { + return fmt.Errorf("expected runtime object") + } + + err = s.ReduceSpec(rt) + if err != nil { + return err + } + + // Save the blob + info, err := client.PutBlob(ctx, &resource.PutBlobRequest{ + ContentType: "application/json", + Value: val, + Resource: key, + }) + if err != nil { + return err + } + + // Update the resource metadata with the blob info + obj.SetBlob(&utils.BlobInfo{ + UID: info.Uid, + Size: info.Size, + Hash: info.Hash, + MimeType: info.MimeType, + Charset: info.Charset, + }) + return err +} + +// Reconstruct implements LargeObjectSupport. +func (s *BasicLargeObjectSupport) Reconstruct(ctx context.Context, key *resource.ResourceKey, client resource.BlobStoreClient, obj utils.GrafanaMetaAccessor) error { + blobInfo := obj.GetBlob() + if blobInfo == nil { + return fmt.Errorf("the object does not have a blob") + } + + rv, err := obj.GetResourceVersionInt64() + if err != nil { + return err + } + rsp, err := client.GetBlob(ctx, &resource.GetBlobRequest{ + Resource: &resource.ResourceKey{ + Group: s.TheGroupResource.Group, + Resource: s.TheGroupResource.Resource, + Namespace: obj.GetNamespace(), + Name: obj.GetName(), + }, + MustProxyBytes: true, + ResourceVersion: rv, + }) + if err != nil { + return err + } + if rsp.Error != nil { + return fmt.Errorf("error loading value from object store %+v", rsp.Error) + } + + // Replace the spec with the value saved in the blob store + if len(rsp.Value) == 0 { + return fmt.Errorf("empty blob value") + } + + rt, ok := obj.GetRuntimeObject() + if !ok { + return fmt.Errorf("unable to get raw object") + } + obj.SetBlob(nil) // remove the blob info + return s.RebuildSpec(rt, rsp.Value) +} diff --git a/pkg/storage/unified/apistore/prepare.go b/pkg/storage/unified/apistore/prepare.go index 877a96bff00..ed379fd73c9 100644 --- a/pkg/storage/unified/apistore/prepare.go +++ b/pkg/storage/unified/apistore/prepare.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "math" "time" "k8s.io/apimachinery/pkg/runtime" @@ -15,6 +16,23 @@ import ( "github.com/grafana/grafana/pkg/storage/unified/resource" ) +func logN(n, b float64) float64 { + return math.Log(n) / math.Log(b) +} + +// Slightly modified function from https://github.com/dustin/go-humanize (MIT). +func formatBytes(numBytes int) string { + base := 1024.0 + sizes := []string{"B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB"} + if numBytes < 10 { + return fmt.Sprintf("%d B", numBytes) + } + e := math.Floor(logN(float64(numBytes), base)) + suffix := sizes[int(e)] + val := math.Floor(float64(numBytes)/math.Pow(base, e)*10+0.5) / 10 + return fmt.Sprintf("%.1f %s", val, suffix) +} + // Called on create func (s *Storage) prepareObjectForStorage(ctx context.Context, newObject runtime.Object) ([]byte, error) { user, err := identity.GetRequester(ctx) @@ -51,11 +69,7 @@ func (s *Storage) prepareObjectForStorage(ctx context.Context, newObject runtime if err = s.codec.Encode(newObject, &buf); err != nil { return nil, err } - - if s.largeObjectSupport { - return s.handleLargeResources(ctx, obj, buf) - } - return buf.Bytes(), nil + return s.handleLargeResources(ctx, obj, buf) } // Called on update @@ -106,29 +120,41 @@ func (s *Storage) prepareObjectForUpdate(ctx context.Context, updateObject runti if err = s.codec.Encode(updateObject, &buf); err != nil { return nil, err } - if s.largeObjectSupport { - return s.handleLargeResources(ctx, obj, buf) - } - return buf.Bytes(), nil + return s.handleLargeResources(ctx, obj, buf) } func (s *Storage) handleLargeResources(ctx context.Context, obj utils.GrafanaMetaAccessor, buf bytes.Buffer) ([]byte, error) { - if buf.Len() > 1000 { - // !!! Currently just write the whole thing - // in reality we may only want to write the spec.... - _, err := s.store.PutBlob(ctx, &resource.PutBlobRequest{ - ContentType: "application/json", - Value: buf.Bytes(), - Resource: &resource.ResourceKey{ - Group: s.gr.Group, - Resource: s.gr.Resource, - Namespace: obj.GetNamespace(), - Name: obj.GetName(), - }, - }) + support := s.opts.LargeObjectSupport + if support != nil { + size := buf.Len() + if size > support.Threshold() { + if support.MaxSize() > 0 && size > support.MaxSize() { + return nil, fmt.Errorf("request object is too big (%s > %s)", formatBytes(size), formatBytes(support.MaxSize())) + } + } + + key := &resource.ResourceKey{ + Group: s.gr.Group, + Resource: s.gr.Resource, + Namespace: obj.GetNamespace(), + Name: obj.GetName(), + } + + err := support.Deconstruct(ctx, key, s.store, obj, buf.Bytes()) if err != nil { return nil, err } + + buf.Reset() + orig, ok := obj.GetRuntimeObject() + if !ok { + return nil, fmt.Errorf("error using object as runtime object") + } + + // Now encode the smaller version + if err = s.codec.Encode(orig, &buf); err != nil { + return nil, err + } } return buf.Bytes(), nil } diff --git a/pkg/storage/unified/apistore/restoptions.go b/pkg/storage/unified/apistore/restoptions.go index 8fba3747ccc..b9cffcfe17d 100644 --- a/pkg/storage/unified/apistore/restoptions.go +++ b/pkg/storage/unified/apistore/restoptions.go @@ -24,25 +24,25 @@ import ( var _ generic.RESTOptionsGetter = (*RESTOptionsGetter)(nil) -// This is a copy of the original flag, as we are not allowed to import grafana core. -const bigObjectSupportFlag = "unifiedStorageBigObjectsSupport" +type StorageOptionsRegister func(gr schema.GroupResource, opts StorageOptions) type RESTOptionsGetter struct { client resource.ResourceClient original storagebackend.Config - // As we are not allowed to import the feature management directly, we pass a map of enabled features. - features map[string]any + + // Each group+resource may need custom options + options map[string]StorageOptions } -func NewRESTOptionsGetterForClient(client resource.ResourceClient, original storagebackend.Config, features map[string]any) *RESTOptionsGetter { +func NewRESTOptionsGetterForClient(client resource.ResourceClient, original storagebackend.Config) *RESTOptionsGetter { return &RESTOptionsGetter{ client: client, original: original, - features: features, + options: make(map[string]StorageOptions), } } -func NewRESTOptionsGetterMemory(originalStorageConfig storagebackend.Config, features map[string]any) (*RESTOptionsGetter, error) { +func NewRESTOptionsGetterMemory(originalStorageConfig storagebackend.Config) (*RESTOptionsGetter, error) { backend, err := resource.NewCDKBackend(context.Background(), resource.CDKBackendOptions{ Bucket: memblob.OpenBucket(&memblob.Options{}), }) @@ -58,7 +58,6 @@ func NewRESTOptionsGetterMemory(originalStorageConfig storagebackend.Config, fea return NewRESTOptionsGetterForClient( resource.NewLocalResourceClient(server), originalStorageConfig, - features, ), nil } @@ -94,10 +93,13 @@ func NewRESTOptionsGetterForFile(path string, return NewRESTOptionsGetterForClient( resource.NewLocalResourceClient(server), originalStorageConfig, - features, ), nil } +func (r *RESTOptionsGetter) RegisterOptions(gr schema.GroupResource, opts StorageOptions) { + r.options[gr.String()] = opts +} + // TODO: The RESTOptionsGetter interface added a new example object parameter to help determine the default // storage version for a resource. This is not currently used in this implementation. func (r *RESTOptionsGetter) GetRESTOptions(resource schema.GroupResource, _ runtime.Object) (generic.RESTOptions, error) { @@ -131,12 +133,8 @@ func (r *RESTOptionsGetter) GetRESTOptions(resource schema.GroupResource, _ runt trigger storage.IndexerFuncs, indexers *cache.Indexers, ) (storage.Interface, factory.DestroyFunc, error) { - if _, enabled := r.features[bigObjectSupportFlag]; enabled { - return NewStorage(config, r.client, keyFunc, nil, newFunc, newListFunc, getAttrsFunc, - trigger, indexers, LargeObjectSupportEnabled) - } return NewStorage(config, r.client, keyFunc, nil, newFunc, newListFunc, getAttrsFunc, - trigger, indexers, LargeObjectSupportDisabled) + trigger, indexers, r.options[resource.String()]) }, DeleteCollectionWorkers: 0, EnableGarbageCollection: false, diff --git a/pkg/storage/unified/apistore/store.go b/pkg/storage/unified/apistore/store.go index 86a6563e854..8b19007dd8b 100644 --- a/pkg/storage/unified/apistore/store.go +++ b/pkg/storage/unified/apistore/store.go @@ -41,6 +41,11 @@ const ( var _ storage.Interface = (*Storage)(nil) +// Optional settings that apply to a single resource +type StorageOptions struct { + LargeObjectSupport LargeObjectSupport +} + // Storage implements storage.Interface and storage resources as JSON files on disk. type Storage struct { gr schema.GroupResource @@ -57,9 +62,8 @@ type Storage struct { versioner storage.Versioner - // Defines if we want to outsource large objects to another storage type. - // By default, this feature is disabled. - largeObjectSupport bool + // Resource options like large object support + opts StorageOptions } // ErrFileNotExists means the file doesn't actually exist. @@ -79,7 +83,7 @@ func NewStorage( getAttrsFunc storage.AttrFunc, trigger storage.IndexerFuncs, indexers *cache.Indexers, - largeObjectSupport bool, + opts StorageOptions, ) (storage.Interface, factory.DestroyFunc, error) { s := &Storage{ store: store, @@ -96,7 +100,7 @@ func NewStorage( versioner: &storage.APIObjectVersioner{}, - largeObjectSupport: largeObjectSupport, + opts: opts, } // The key parsing callback allows us to support the hardcoded paths from upstream tests @@ -480,6 +484,14 @@ func (s *Storage) GuaranteedUpdate( continue } } + + // restore the full original object before tryUpdate + if s.opts.LargeObjectSupport != nil && mmm.GetBlob() != nil { + err = s.opts.LargeObjectSupport.Reconstruct(ctx, req.Key, s.store, mmm) + if err != nil { + return err + } + } } else if !ignoreNotFound { return apierrors.NewNotFound(s.gr, req.Key.Name) } diff --git a/pkg/storage/unified/apistore/watcher_test.go b/pkg/storage/unified/apistore/watcher_test.go index 23ae5215784..8a8b278577f 100644 --- a/pkg/storage/unified/apistore/watcher_test.go +++ b/pkg/storage/unified/apistore/watcher_test.go @@ -176,7 +176,7 @@ func testSetup(t testing.TB, opts ...setupOption) (context.Context, storage.Inte storage.DefaultNamespaceScopedAttr, make(map[string]storage.IndexerFunc, 0), nil, - LargeObjectSupportDisabled, + StorageOptions{}, ) if err != nil { return nil, nil, nil, err diff --git a/pkg/storage/unified/resource/cdk_blob.go b/pkg/storage/unified/resource/cdk_blob.go index 00ddec4ff9d..8fd59fafaf1 100644 --- a/pkg/storage/unified/resource/cdk_blob.go +++ b/pkg/storage/unified/resource/cdk_blob.go @@ -7,14 +7,16 @@ import ( "encoding/hex" "fmt" "mime" + "strings" "time" "github.com/google/uuid" - "github.com/grafana/grafana/pkg/apimachinery/utils" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace/noop" "gocloud.dev/blob" + "github.com/grafana/grafana/pkg/apimachinery/utils" + // Supported drivers _ "gocloud.dev/blob/azureblob" _ "gocloud.dev/blob/fileblob" @@ -32,6 +34,14 @@ type CDKBlobSupportOptions struct { // Called in a context that loaded the possible drivers func OpenBlobBucket(ctx context.Context, url string) (*blob.Bucket, error) { + if strings.HasPrefix(url, "file:") { + // Don't write metadata attributes + if strings.Contains(url, "?") { + url += "&metadata=skip" + } else { + url += "?metadata=skip" + } + } return blob.OpenBucket(ctx, url) } diff --git a/pkg/storage/unified/resource/keys.go b/pkg/storage/unified/resource/keys.go index 319cf107f5b..a4b554476a6 100644 --- a/pkg/storage/unified/resource/keys.go +++ b/pkg/storage/unified/resource/keys.go @@ -1,5 +1,18 @@ package resource +func verifyRequestKey(key *ResourceKey) *ErrorResult { + if key == nil { + return NewBadRequestError("missing resource key") + } + if key.Group == "" { + return NewBadRequestError("request key is missing group") + } + if key.Resource == "" { + return NewBadRequestError("request key is missing resource") + } + return nil +} + func matchesQueryKey(query *ResourceKey, key *ResourceKey) bool { if query.Group != key.Group { return false diff --git a/pkg/storage/unified/resource/server.go b/pkg/storage/unified/resource/server.go index 53d33eb4b19..e45e5bbff9a 100644 --- a/pkg/storage/unified/resource/server.go +++ b/pkg/storage/unified/resource/server.go @@ -10,15 +10,16 @@ import ( "sync/atomic" "time" - "github.com/grafana/authlib/claims" - "github.com/grafana/grafana/pkg/apimachinery/identity" - "github.com/grafana/grafana/pkg/apimachinery/utils" "github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace/noop" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + "github.com/grafana/authlib/claims" + "github.com/grafana/grafana/pkg/apimachinery/identity" + "github.com/grafana/grafana/pkg/apimachinery/utils" ) // ResourceServer implements all gRPC services @@ -821,6 +822,10 @@ func (s *server) PutBlob(ctx context.Context, req *PutBlobRequest) (*PutBlobResp } func (s *server) getPartialObject(ctx context.Context, key *ResourceKey, rv int64) (utils.GrafanaMetaAccessor, *ErrorResult) { + if r := verifyRequestKey(key); r != nil { + return nil, r + } + rsp := s.backend.ReadResource(ctx, &ReadRequest{ Key: key, ResourceVersion: rv,