DualWrite: Cleanup and centralize the dual write creation (#90013)

This commit is contained in:
Ryan McKinley 2024-07-11 13:23:31 -07:00 committed by GitHub
parent 4b5b599982
commit d2bc4f3255
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 125 additions and 150 deletions

View File

@ -11,6 +11,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/klog/v2"
)
@ -25,6 +26,9 @@ var (
_ rest.SingularNameProvider = (DualWriter)(nil)
)
// Function that will create a dual writer
type DualWriteBuilder func(gr schema.GroupResource, legacy LegacyStorage, storage Storage) (Storage, error)
// Storage is a storage implementation that satisfies the same interfaces as genericregistry.Store.
type Storage interface {
rest.Storage
@ -152,7 +156,12 @@ func SetDualWritingMode(
entity string,
desiredMode DualWriterMode,
reg prometheus.Registerer,
) (DualWriter, error) {
) (DualWriterMode, error) {
// Mode0 means no DualWriter
if desiredMode == Mode0 {
return Mode0, nil
}
toMode := map[string]DualWriterMode{
// It is not possible to initialize a mode 0 dual writer. Mode 0 represents
// writing to legacy storage without `unifiedStorage` enabled.
@ -166,7 +175,7 @@ func SetDualWritingMode(
// Use entity name as key
m, ok, err := kvs.Get(ctx, entity)
if err != nil {
return nil, errors.New("failed to fetch current dual writing mode")
return Mode0, errors.New("failed to fetch current dual writing mode")
}
currentMode, valid := toMode[m]
@ -182,7 +191,7 @@ func SetDualWritingMode(
err := kvs.Set(ctx, entity, fmt.Sprint(currentMode))
if err != nil {
return nil, errDualWriterSetCurrentMode
return Mode0, errDualWriterSetCurrentMode
}
}
@ -194,7 +203,7 @@ func SetDualWritingMode(
err := kvs.Set(ctx, entity, fmt.Sprint(currentMode))
if err != nil {
return nil, errDualWriterSetCurrentMode
return Mode0, errDualWriterSetCurrentMode
}
}
if (desiredMode == Mode1) && (currentMode == Mode2) {
@ -204,13 +213,13 @@ func SetDualWritingMode(
err := kvs.Set(ctx, entity, fmt.Sprint(currentMode))
if err != nil {
return nil, errDualWriterSetCurrentMode
return Mode0, errDualWriterSetCurrentMode
}
}
// #TODO add support for other combinations of desired and current modes
return NewDualWriter(currentMode, legacy, storage, reg), nil
return currentMode, nil
}
var defaultConverter = runtime.UnstructuredConverter(runtime.DefaultUnstructuredConverter)

View File

@ -46,9 +46,9 @@ func TestSetDualWritingMode(t *testing.T) {
kvStore := &fakeNamespacedKV{data: make(map[string]string), namespace: "storage.dualwriting." + tt.stackID}
p := prometheus.NewRegistry()
dw, err := SetDualWritingMode(context.Background(), kvStore, ls, us, "playlist.grafana.app/v0alpha1", tt.desiredMode, p)
dwMode, err := SetDualWritingMode(context.Background(), kvStore, ls, us, "playlist.grafana.app/v0alpha1", tt.desiredMode, p)
assert.NoError(t, err)
assert.Equal(t, tt.expectedMode, dw.Mode())
assert.Equal(t, tt.expectedMode, dwMode)
// check kv store
val, ok, err := kvStore.Get(context.Background(), "playlist.grafana.app/v0alpha1")

View File

@ -166,7 +166,9 @@ func (o *APIServerOptions) RunAPIServer(config *genericapiserver.RecommendedConf
// Install the API Group+version
// #TODO figure out how to configure storage type in o.Options.StorageOptions
err = builder.InstallAPIs(grafanaAPIServer.Scheme, grafanaAPIServer.Codecs, server, config.RESTOptionsGetter, o.builders, o.Options.StorageOptions, o.Options.MetricsOptions.MetricsRegisterer)
err = builder.InstallAPIs(grafanaAPIServer.Scheme, grafanaAPIServer.Codecs, server, config.RESTOptionsGetter, o.builders, o.Options.StorageOptions,
o.Options.MetricsOptions.MetricsRegisterer, nil, nil, // no need for server lock in standalone
)
if err != nil {
return err
}

View File

@ -9,8 +9,6 @@ import (
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/registry/rest"
"github.com/prometheus/client_golang/prometheus"
model "github.com/grafana/grafana/pkg/apis/alerting_notifications/v0alpha1"
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
@ -33,9 +31,9 @@ func NewStorage(
legacySvc ReceiverService,
namespacer request.NamespaceMapper,
scheme *runtime.Scheme,
desiredMode grafanarest.DualWriterMode,
optsGetter generic.RESTOptionsGetter,
reg prometheus.Registerer) (rest.Storage, error) {
dualWriteBuilder grafanarest.DualWriteBuilder,
) (rest.Storage, error) {
legacyStore := &legacyStorage{
service: legacySvc,
namespacer: namespacer,
@ -57,7 +55,7 @@ func NewStorage(
return nil, fmt.Errorf("expected resource or info")
}),
}
if optsGetter != nil && desiredMode != grafanarest.Mode0 {
if optsGetter != nil && dualWriteBuilder != nil {
strategy := grafanaregistry.NewStrategy(scheme)
s := &genericregistry.Store{
NewFunc: resourceInfo.NewFunc,
@ -74,7 +72,7 @@ func NewStorage(
if err := s.CompleteWithOptions(options); err != nil {
return nil, err
}
return grafanarest.NewDualWriter(desiredMode, legacyStore, storage{Store: s}, reg), nil
return dualWriteBuilder(resourceInfo.GroupResource(), legacyStore, storage{Store: s})
}
return legacyStore, nil
}

View File

@ -4,7 +4,6 @@ import (
"context"
"fmt"
"github.com/prometheus/client_golang/prometheus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
@ -37,11 +36,6 @@ type NotificationsAPIBuilder struct {
gv schema.GroupVersion
}
func (t NotificationsAPIBuilder) GetDesiredDualWriterMode(dualWrite bool, toMode map[string]grafanarest.DualWriterMode) grafanarest.DualWriterMode {
// Add required configuration support in order to enable other modes. For an example, see pkg/registry/apis/playlist/register.go
return grafanarest.Mode0
}
func RegisterAPIService(
features featuremgmt.FeatureToggles,
apiregistration builder.APIRegistrar,
@ -77,17 +71,16 @@ func (t NotificationsAPIBuilder) GetAPIGroupInfo(
scheme *runtime.Scheme,
codecs serializer.CodecFactory,
optsGetter generic.RESTOptionsGetter,
desiredMode grafanarest.DualWriterMode,
reg prometheus.Registerer,
dualWriteBuilder grafanarest.DualWriteBuilder,
) (*genericapiserver.APIGroupInfo, error) {
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(notificationsModels.GROUP, scheme, metav1.ParameterCodec, codecs)
intervals, err := timeInterval.NewStorage(t.ng.Api.MuteTimings, t.namespacer, scheme, desiredMode, optsGetter, reg)
intervals, err := timeInterval.NewStorage(t.ng.Api.MuteTimings, t.namespacer, scheme, optsGetter, dualWriteBuilder)
if err != nil {
return nil, fmt.Errorf("failed to initialize time-interval storage: %w", err)
}
recvStorage, err := receiver.NewStorage(nil, t.namespacer, scheme, desiredMode, optsGetter, reg) // TODO: add receiver service
recvStorage, err := receiver.NewStorage(nil, t.namespacer, scheme, optsGetter, dualWriteBuilder) // TODO: add receiver service
if err != nil {
return nil, fmt.Errorf("failed to initialize receiver storage: %w", err)
}

View File

@ -12,8 +12,6 @@ import (
"k8s.io/apiserver/pkg/registry/rest"
apistore "k8s.io/apiserver/pkg/storage"
"github.com/prometheus/client_golang/prometheus"
model "github.com/grafana/grafana/pkg/apis/alerting_notifications/v0alpha1"
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
@ -36,9 +34,9 @@ func NewStorage(
legacySvc TimeIntervalService,
namespacer request.NamespaceMapper,
scheme *runtime.Scheme,
desiredMode grafanarest.DualWriterMode,
optsGetter generic.RESTOptionsGetter,
reg prometheus.Registerer) (rest.Storage, error) {
dualWriteBuilder grafanarest.DualWriteBuilder,
) (rest.Storage, error) {
legacyStore := &legacyStorage{
service: legacySvc,
namespacer: namespacer,
@ -59,7 +57,7 @@ func NewStorage(
return nil, fmt.Errorf("expected resource or info")
}),
}
if optsGetter != nil && desiredMode != grafanarest.Mode0 {
if optsGetter != nil && dualWriteBuilder != nil {
strategy := grafanaregistry.NewStrategy(scheme)
s := &genericregistry.Store{
NewFunc: resourceInfo.NewFunc,
@ -78,7 +76,7 @@ func NewStorage(
if err := s.CompleteWithOptions(options); err != nil {
return nil, err
}
return grafanarest.NewDualWriter(desiredMode, legacyStore, storage{Store: s}, reg), nil
return dualWriteBuilder(resourceInfo.GroupResource(), legacyStore, storage{Store: s})
}
return legacyStore, nil
}

View File

@ -76,11 +76,6 @@ func (b *DashboardsAPIBuilder) GetGroupVersion() schema.GroupVersion {
return v0alpha1.DashboardResourceInfo.GroupVersion()
}
func (b *DashboardsAPIBuilder) GetDesiredDualWriterMode(dualWrite bool, modeMap map[string]grafanarest.DualWriterMode) grafanarest.DualWriterMode {
// Add required configuration support in order to enable other modes. For an example, see pkg/registry/apis/playlist/register.go
return grafanarest.Mode0
}
func addKnownTypes(scheme *runtime.Scheme, gv schema.GroupVersion) {
scheme.AddKnownTypes(gv,
&v0alpha1.Dashboard{},
@ -115,8 +110,7 @@ func (b *DashboardsAPIBuilder) GetAPIGroupInfo(
scheme *runtime.Scheme,
codecs serializer.CodecFactory, // pointer?
optsGetter generic.RESTOptionsGetter,
desiredMode grafanarest.DualWriterMode,
reg prometheus.Registerer,
dualWriteBuilder grafanarest.DualWriteBuilder,
) (*genericapiserver.APIGroupInfo, error) {
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(v0alpha1.GROUP, scheme, metav1.ParameterCodec, codecs)
@ -142,12 +136,15 @@ func (b *DashboardsAPIBuilder) GetAPIGroupInfo(
}
// Dual writes if a RESTOptionsGetter is provided
if desiredMode != grafanarest.Mode0 && optsGetter != nil {
if optsGetter != nil && dualWriteBuilder != nil {
options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: grafanaregistry.GetAttrs}
if err := store.CompleteWithOptions(options); err != nil {
return nil, err
}
storage[resourceInfo.StoragePath()] = grafanarest.NewDualWriter(grafanarest.Mode1, legacyStore, store, reg)
storage[resourceInfo.StoragePath()], err = dualWriteBuilder(resourceInfo.GroupResource(), legacyStore, store)
if err != nil {
return nil, err
}
}
apiGroupInfo.VersionedResourcesStorageMap[v0alpha1.VERSION] = storage

View File

@ -89,11 +89,6 @@ func (b *SnapshotsAPIBuilder) GetGroupVersion() schema.GroupVersion {
return resourceInfo.GroupVersion()
}
func (b *SnapshotsAPIBuilder) GetDesiredDualWriterMode(dualWrite bool, modeMap map[string]grafanarest.DualWriterMode) grafanarest.DualWriterMode {
// Add required configuration support in order to enable other modes. For an example, see pkg/registry/apis/playlist/register.go
return grafanarest.Mode0
}
func addKnownTypes(scheme *runtime.Scheme, gv schema.GroupVersion) {
scheme.AddKnownTypes(gv,
&dashboardsnapshot.DashboardSnapshot{},
@ -130,8 +125,7 @@ func (b *SnapshotsAPIBuilder) GetAPIGroupInfo(
scheme *runtime.Scheme,
codecs serializer.CodecFactory, // pointer?
optsGetter generic.RESTOptionsGetter,
_ grafanarest.DualWriterMode,
_ prometheus.Registerer,
_ grafanarest.DualWriteBuilder,
) (*genericapiserver.APIGroupInfo, error) {
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(dashboardsnapshot.GROUP, scheme, metav1.ParameterCodec, codecs)
storage := map[string]rest.Storage{}

View File

@ -156,11 +156,6 @@ func (b *DataSourceAPIBuilder) GetGroupVersion() schema.GroupVersion {
return b.connectionResourceInfo.GroupVersion()
}
func (b *DataSourceAPIBuilder) GetDesiredDualWriterMode(dualWrite bool, modeMap map[string]grafanarest.DualWriterMode) grafanarest.DualWriterMode {
// Add required configuration support in order to enable other modes. For an example, see pkg/registry/apis/playlist/register.go
return grafanarest.Mode0
}
func addKnownTypes(scheme *runtime.Scheme, gv schema.GroupVersion) {
scheme.AddKnownTypes(gv,
&datasource.DataSourceConnection{},
@ -208,8 +203,7 @@ func (b *DataSourceAPIBuilder) GetAPIGroupInfo(
scheme *runtime.Scheme,
codecs serializer.CodecFactory, // pointer?
_ generic.RESTOptionsGetter,
_ grafanarest.DualWriterMode,
_ prometheus.Registerer,
_ grafanarest.DualWriteBuilder,
) (*genericapiserver.APIGroupInfo, error) {
storage := map[string]rest.Storage{}

View File

@ -13,13 +13,14 @@ import (
"k8s.io/kube-openapi/pkg/spec3"
"k8s.io/kube-openapi/pkg/validation/spec"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/grafana/pkg/apis/featuretoggle/v0alpha1"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
"github.com/grafana/grafana/pkg/services/accesscontrol"
"github.com/grafana/grafana/pkg/services/apiserver/builder"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/setting"
"github.com/prometheus/client_golang/prometheus"
)
var _ builder.APIGroupBuilder = (*FeatureFlagAPIBuilder)(nil)
@ -52,11 +53,6 @@ func (b *FeatureFlagAPIBuilder) GetGroupVersion() schema.GroupVersion {
return gv
}
func (b *FeatureFlagAPIBuilder) GetDesiredDualWriterMode(dualWrite bool, modeMap map[string]grafanarest.DualWriterMode) grafanarest.DualWriterMode {
// Add required configuration support in order to enable other modes. For an example, see pkg/registry/apis/playlist/register.go
return grafanarest.Mode0
}
func addKnownTypes(scheme *runtime.Scheme, gv schema.GroupVersion) {
scheme.AddKnownTypes(gv,
&v0alpha1.Feature{},
@ -90,8 +86,7 @@ func (b *FeatureFlagAPIBuilder) GetAPIGroupInfo(
scheme *runtime.Scheme,
codecs serializer.CodecFactory, // pointer?
_ generic.RESTOptionsGetter,
_ grafanarest.DualWriterMode,
_ prometheus.Registerer,
_ grafanarest.DualWriteBuilder,
) (*genericapiserver.APIGroupInfo, error) {
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(v0alpha1.GROUP, scheme, metav1.ParameterCodec, codecs)

View File

@ -69,11 +69,6 @@ func (b *FolderAPIBuilder) GetGroupVersion() schema.GroupVersion {
return b.gv
}
func (b *FolderAPIBuilder) GetDesiredDualWriterMode(dualWrite bool, modeMap map[string]grafanarest.DualWriterMode) grafanarest.DualWriterMode {
// Add required configuration support in order to enable other modes. For an example, see pkg/registry/apis/playlist/register.go
return grafanarest.Mode0
}
func addKnownTypes(scheme *runtime.Scheme, gv schema.GroupVersion) {
scheme.AddKnownTypes(gv,
&v0alpha1.Folder{},
@ -107,8 +102,7 @@ func (b *FolderAPIBuilder) GetAPIGroupInfo(
scheme *runtime.Scheme,
codecs serializer.CodecFactory, // pointer?
optsGetter generic.RESTOptionsGetter,
desiredMode grafanarest.DualWriterMode,
reg prometheus.Registerer,
dualWriteBuilder grafanarest.DualWriteBuilder,
) (*genericapiserver.APIGroupInfo, error) {
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(v0alpha1.GROUP, scheme, metav1.ParameterCodec, codecs)
@ -142,13 +136,16 @@ func (b *FolderAPIBuilder) GetAPIGroupInfo(
storage[resourceInfo.StoragePath("count")] = &subCountREST{b.folderSvc}
storage[resourceInfo.StoragePath("access")] = &subAccessREST{b.folderSvc}
// enable dual writes if a RESTOptionsGetter is provided
if optsGetter != nil && desiredMode != grafanarest.Mode0 {
// enable dual writer
if optsGetter != nil && dualWriteBuilder != nil {
store, err := newStorage(scheme, optsGetter, legacyStore)
if err != nil {
return nil, err
}
storage[resourceInfo.StoragePath()] = grafanarest.NewDualWriter(grafanarest.Mode1, legacyStore, store, reg)
storage[resourceInfo.StoragePath()], err = dualWriteBuilder(resourceInfo.GroupResource(), legacyStore, store)
if err != nil {
return nil, err
}
}
apiGroupInfo.VersionedResourcesStorageMap[v0alpha1.VERSION] = storage

View File

@ -13,11 +13,12 @@ import (
"k8s.io/kube-openapi/pkg/spec3"
"k8s.io/kube-openapi/pkg/validation/spec"
"github.com/prometheus/client_golang/prometheus"
peakq "github.com/grafana/grafana/pkg/apis/peakq/v0alpha1"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
"github.com/grafana/grafana/pkg/services/apiserver/builder"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/prometheus/client_golang/prometheus"
)
var _ builder.APIGroupBuilder = (*PeakQAPIBuilder)(nil)
@ -46,11 +47,6 @@ func (b *PeakQAPIBuilder) GetGroupVersion() schema.GroupVersion {
return peakq.SchemeGroupVersion
}
func (b *PeakQAPIBuilder) GetDesiredDualWriterMode(dualWrite bool, modeMap map[string]grafanarest.DualWriterMode) grafanarest.DualWriterMode {
// Add required configuration support in order to enable other modes. For an example, see pkg/registry/apis/playlist/register.go
return grafanarest.Mode0
}
func (b *PeakQAPIBuilder) InstallSchema(scheme *runtime.Scheme) error {
gv := peakq.SchemeGroupVersion
err := peakq.AddToScheme(scheme)
@ -73,8 +69,7 @@ func (b *PeakQAPIBuilder) GetAPIGroupInfo(
scheme *runtime.Scheme,
codecs serializer.CodecFactory,
optsGetter generic.RESTOptionsGetter,
_ grafanarest.DualWriterMode, // dual write desired mode (not relevant)
_ prometheus.Registerer, // prometheus registerer
_ grafanarest.DualWriteBuilder,
) (*genericapiserver.APIGroupInfo, error) {
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(peakq.GROUP, scheme, metav1.ParameterCodec, codecs)

View File

@ -1,7 +1,6 @@
package playlist
import (
"context"
"fmt"
"time"
@ -15,6 +14,8 @@ import (
genericapiserver "k8s.io/apiserver/pkg/server"
common "k8s.io/kube-openapi/pkg/common"
"github.com/prometheus/client_golang/prometheus"
playlist "github.com/grafana/grafana/pkg/apis/playlist/v0alpha1"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
"github.com/grafana/grafana/pkg/infra/kvstore"
@ -23,7 +24,6 @@ import (
gapiutil "github.com/grafana/grafana/pkg/services/apiserver/utils"
playlistsvc "github.com/grafana/grafana/pkg/services/playlist"
"github.com/grafana/grafana/pkg/setting"
"github.com/prometheus/client_golang/prometheus"
)
var _ builder.APIGroupBuilder = (*PlaylistAPIBuilder)(nil)
@ -33,7 +33,6 @@ type PlaylistAPIBuilder struct {
service playlistsvc.Service
namespacer request.NamespaceMapper
gv schema.GroupVersion
kvStore *kvstore.NamespacedKVStore
}
func RegisterAPIService(p playlistsvc.Service,
@ -46,8 +45,6 @@ func RegisterAPIService(p playlistsvc.Service,
service: p,
namespacer: request.GetNamespaceMapper(cfg),
gv: playlist.PlaylistResourceInfo.GroupVersion(),
kvStore: kvstore.WithNamespace(kvStore, 0, "storage.dualwriting"),
// register: newMetrics(registerer),
}
apiregistration.RegisterAPI(builder)
return builder
@ -57,15 +54,6 @@ func (b *PlaylistAPIBuilder) GetGroupVersion() schema.GroupVersion {
return b.gv
}
func (b *PlaylistAPIBuilder) GetDesiredDualWriterMode(dualWrite bool, modeMap map[string]grafanarest.DualWriterMode) grafanarest.DualWriterMode {
m, ok := modeMap[playlist.GROUPRESOURCE]
if !dualWrite || !ok {
return grafanarest.Mode0
}
return m
}
func addKnownTypes(scheme *runtime.Scheme, gv schema.GroupVersion) {
scheme.AddKnownTypes(gv,
&playlist.Playlist{},
@ -96,8 +84,7 @@ func (b *PlaylistAPIBuilder) GetAPIGroupInfo(
scheme *runtime.Scheme,
codecs serializer.CodecFactory, // pointer?
optsGetter generic.RESTOptionsGetter,
desiredMode grafanarest.DualWriterMode,
reg prometheus.Registerer,
dualWriteBuilder grafanarest.DualWriteBuilder,
) (*genericapiserver.APIGroupInfo, error) {
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(playlist.GROUP, scheme, metav1.ParameterCodec, codecs)
storage := map[string]rest.Storage{}
@ -131,13 +118,13 @@ func (b *PlaylistAPIBuilder) GetAPIGroupInfo(
storage[resource.StoragePath()] = legacyStore
// enable dual writes if a RESTOptionsGetter is provided
if optsGetter != nil && desiredMode != grafanarest.Mode0 {
if optsGetter != nil && dualWriteBuilder != nil {
store, err := newStorage(scheme, optsGetter, legacyStore)
if err != nil {
return nil, err
}
dualWriter, err := grafanarest.SetDualWritingMode(context.Background(), b.kvStore, legacyStore, store, playlist.GROUPRESOURCE, desiredMode, reg)
dualWriter, err := dualWriteBuilder(resourceInfo.GroupResource(), legacyStore, store)
if err != nil {
return nil, err
}

View File

@ -122,11 +122,6 @@ func (b *QueryAPIBuilder) GetGroupVersion() schema.GroupVersion {
return query.SchemeGroupVersion
}
func (b *QueryAPIBuilder) GetDesiredDualWriterMode(dualWrite bool, modeMap map[string]grafanarest.DualWriterMode) grafanarest.DualWriterMode {
// Add required configuration support in order to enable other modes. For an example, see pkg/registry/apis/playlist/register.go
return grafanarest.Mode0
}
func addKnownTypes(scheme *runtime.Scheme, gv schema.GroupVersion) {
scheme.AddKnownTypes(gv,
&query.DataSourceApiServer{},
@ -148,8 +143,7 @@ func (b *QueryAPIBuilder) GetAPIGroupInfo(
scheme *runtime.Scheme,
codecs serializer.CodecFactory, // pointer?
optsGetter generic.RESTOptionsGetter,
_ grafanarest.DualWriterMode,
_ prometheus.Registerer,
_ grafanarest.DualWriteBuilder,
) (*genericapiserver.APIGroupInfo, error) {
gv := query.SchemeGroupVersion
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(gv.Group, scheme, metav1.ParameterCodec, codecs)

View File

@ -15,11 +15,12 @@ import (
"k8s.io/kube-openapi/pkg/spec3"
"k8s.io/kube-openapi/pkg/validation/spec"
"github.com/prometheus/client_golang/prometheus"
scope "github.com/grafana/grafana/pkg/apis/scope/v0alpha1"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
"github.com/grafana/grafana/pkg/services/apiserver/builder"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/prometheus/client_golang/prometheus"
)
var _ builder.APIGroupBuilder = (*ScopeAPIBuilder)(nil)
@ -48,11 +49,6 @@ func (b *ScopeAPIBuilder) GetGroupVersion() schema.GroupVersion {
return scope.SchemeGroupVersion
}
func (b *ScopeAPIBuilder) GetDesiredDualWriterMode(dualWrite bool, modeMap map[string]grafanarest.DualWriterMode) grafanarest.DualWriterMode {
// Add required configuration support in order to enable other modes. For an example, see pkg/registry/apis/playlist/register.go
return grafanarest.Mode0
}
func (b *ScopeAPIBuilder) InstallSchema(scheme *runtime.Scheme) error {
err := scope.AddToScheme(scheme)
if err != nil {
@ -121,8 +117,7 @@ func (b *ScopeAPIBuilder) GetAPIGroupInfo(
scheme *runtime.Scheme,
codecs serializer.CodecFactory,
optsGetter generic.RESTOptionsGetter,
_ grafanarest.DualWriterMode, // dual write desired mode (not relevant)
_ prometheus.Registerer, // prometheus registerer
_ grafanarest.DualWriteBuilder,
) (*genericapiserver.APIGroupInfo, error) {
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(scope.GROUP, scheme, metav1.ParameterCodec, codecs)

View File

@ -11,11 +11,12 @@ import (
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/kube-openapi/pkg/common"
"github.com/prometheus/client_golang/prometheus"
service "github.com/grafana/grafana/pkg/apis/service/v0alpha1"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
"github.com/grafana/grafana/pkg/services/apiserver/builder"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/prometheus/client_golang/prometheus"
)
var _ builder.APIGroupBuilder = (*ServiceAPIBuilder)(nil)
@ -45,11 +46,6 @@ func (b *ServiceAPIBuilder) GetGroupVersion() schema.GroupVersion {
return service.SchemeGroupVersion
}
func (b *ServiceAPIBuilder) GetDesiredDualWriterMode(dualWrite bool, modeMap map[string]grafanarest.DualWriterMode) grafanarest.DualWriterMode {
// Add required configuration support in order to enable other modes. For an example, see pkg/registry/apis/playlist/register.go
return grafanarest.Mode0
}
func addKnownTypes(scheme *runtime.Scheme, gv schema.GroupVersion) {
scheme.AddKnownTypes(gv,
&service.ExternalName{},
@ -79,8 +75,7 @@ func (b *ServiceAPIBuilder) GetAPIGroupInfo(
scheme *runtime.Scheme,
codecs serializer.CodecFactory,
optsGetter generic.RESTOptionsGetter,
_ grafanarest.DualWriterMode,
_ prometheus.Registerer,
_ grafanarest.DualWriteBuilder,
) (*genericapiserver.APIGroupInfo, error) {
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(service.GROUP, scheme, metav1.ParameterCodec, codecs)

View File

@ -22,8 +22,6 @@ import (
"sync"
"time"
servicev0alpha1 "github.com/grafana/grafana/pkg/apis/service/v0alpha1"
"github.com/grafana/grafana/pkg/registry/apis/service"
"github.com/prometheus/client_golang/prometheus"
"gopkg.in/yaml.v3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -47,7 +45,9 @@ import (
"k8s.io/kube-aggregator/pkg/controllers"
"k8s.io/kube-aggregator/pkg/controllers/autoregister"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
servicev0alpha1 "github.com/grafana/grafana/pkg/apis/service/v0alpha1"
"github.com/grafana/grafana/pkg/registry/apis/service"
servicev0alpha1applyconfiguration "github.com/grafana/grafana/pkg/generated/applyconfiguration/service/v0alpha1"
serviceclientset "github.com/grafana/grafana/pkg/generated/clientset/versioned"
informersv0alpha1 "github.com/grafana/grafana/pkg/generated/informers/externalversions"
@ -290,8 +290,7 @@ func CreateAggregatorServer(config *Config, delegateAPIServer genericapiserver.D
aggregatorscheme.Scheme,
aggregatorscheme.Codecs,
aggregatorConfig.GenericConfig.RESTOptionsGetter,
grafanarest.Mode0,
reg,
nil, // no dual writer
)
if err != nil {
return nil, err

View File

@ -3,8 +3,6 @@ package builder
import (
"net/http"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
@ -13,6 +11,8 @@ import (
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/kube-openapi/pkg/common"
"k8s.io/kube-openapi/pkg/spec3"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
)
// TODO: this (or something like it) belongs in grafana-app-sdk,
@ -29,8 +29,7 @@ type APIGroupBuilder interface {
scheme *runtime.Scheme,
codecs serializer.CodecFactory,
optsGetter generic.RESTOptionsGetter,
desiredMode grafanarest.DualWriterMode,
reg prometheus.Registerer,
dualWrite grafanarest.DualWriteBuilder,
) (*genericapiserver.APIGroupInfo, error)
// Get OpenAPI definitions
@ -43,11 +42,6 @@ type APIGroupBuilder interface {
// Standard namespace checking will happen before this is called, specifically
// the namespace must matches an org|stack that the user belongs to
GetAuthorizer() authorizer.Authorizer
// Get the desired dual writing mode. These are modes 1, 2, 3 and 4 if
// the feature flag `unifiedStorage` is enabled and mode 0 if it is not enabled.
// #TODO add type for map[string]grafanarest.DualWriterMode?
GetDesiredDualWriterMode(dualWrite bool, toMode map[string]grafanarest.DualWriterMode) grafanarest.DualWriterMode
}
// Builders that implement OpenAPIPostProcessor are given a chance to modify the schema directly

View File

@ -1,6 +1,7 @@
package builder
import (
"context"
"fmt"
"net/http"
"regexp"
@ -10,11 +11,11 @@ import (
"strings"
"time"
"github.com/grafana/grafana/pkg/web"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/mod/semver"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/version"
@ -26,7 +27,10 @@ import (
k8stracing "k8s.io/component-base/tracing"
"k8s.io/kube-openapi/pkg/common"
"github.com/grafana/grafana/pkg/web"
"github.com/grafana/grafana/pkg/apiserver/endpoints/filters"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
"github.com/grafana/grafana/pkg/services/apiserver/options"
)
@ -136,6 +140,10 @@ func SetupConfig(
return nil
}
type ServerLockService interface {
LockExecuteAndRelease(ctx context.Context, actionName string, maxInterval time.Duration, fn func(ctx context.Context)) error
}
func InstallAPIs(
scheme *runtime.Scheme,
codecs serializer.CodecFactory,
@ -144,15 +152,40 @@ func InstallAPIs(
builders []APIGroupBuilder,
storageOpts *options.StorageOptions,
reg prometheus.Registerer,
kvStore grafanarest.NamespacedKVStore,
serverLock ServerLockService,
) error {
// dual writing is only enabled when the storage type is not legacy.
// this is needed to support setting a default RESTOptionsGetter for new APIs that don't
// support the legacy storage type.
dualWriteEnabled := storageOpts.StorageType != options.StorageTypeLegacy
var dualWrite grafanarest.DualWriteBuilder
if storageOpts.StorageType != options.StorageTypeLegacy {
dualWrite = func(gr schema.GroupResource, legacy grafanarest.LegacyStorage, storage grafanarest.Storage) (grafanarest.Storage, error) {
key := gr.String() // ${resource}.{group} eg playlists.playlist.grafana.app
// Get the option from custom.ini/command line
// when missing this will default to mode zero (legacy only)
mode := storageOpts.DualWriterDesiredModes[key]
// Moving from one version to the next can only happen after the previous step has
// successfully synchronized.
currentMode, err := grafanarest.SetDualWritingMode(context.Background(), kvStore, legacy, storage, key, mode, reg)
if err != nil {
return nil, err
}
switch currentMode {
case grafanarest.Mode0:
return legacy, nil
case grafanarest.Mode4:
return storage, nil
default:
}
return grafanarest.NewDualWriter(currentMode, legacy, storage, reg), nil
}
}
for _, b := range builders {
mode := b.GetDesiredDualWriterMode(dualWriteEnabled, storageOpts.DualWriterDesiredModes)
g, err := b.GetAPIGroupInfo(scheme, codecs, optsGetter, mode, reg)
g, err := b.GetAPIGroupInfo(scheme, codecs, optsGetter, dualWrite)
if err != nil {
return err
}

View File

@ -59,6 +59,11 @@ func applyGrafanaConfig(cfg *setting.Cfg, features featuremgmt.FeatureToggles, o
o.StorageOptions.DualWriterDesiredModes = map[string]grafanarest.DualWriterMode{
playlist.GROUPRESOURCE: grafanarest.DualWriterMode(unifiedStorageModeCfg.Key(playlist.GROUPRESOURCE).MustInt(0)),
}
// TODO: ensure backwards compatibility with production
// remove this after changing the unified_storage_mode key format in HGAPI
o.StorageOptions.DualWriterDesiredModes[playlist.RESOURCE+"."+playlist.GROUP] = o.StorageOptions.DualWriterDesiredModes[playlist.GROUPRESOURCE]
o.ExtraOptions.DevMode = features.IsEnabledGlobally(featuremgmt.FlagGrafanaAPIServerEnsureKubectlAccess)
o.ExtraOptions.ExternalAddress = host
o.ExtraOptions.APIURL = apiURL

View File

@ -27,7 +27,9 @@ import (
filestorage "github.com/grafana/grafana/pkg/apiserver/storage/file"
"github.com/grafana/grafana/pkg/infra/appcontext"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/kvstore"
"github.com/grafana/grafana/pkg/infra/metrics"
"github.com/grafana/grafana/pkg/infra/serverlock"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/middleware"
"github.com/grafana/grafana/pkg/modules"
@ -119,7 +121,9 @@ type service struct {
tracing *tracing.TracingService
metrics prometheus.Registerer
authorizer *authorizer.GrafanaAuthorizer
authorizer *authorizer.GrafanaAuthorizer
serverLockService builder.ServerLockService
kvStore kvstore.KVStore
}
func ProvideService(
@ -128,7 +132,9 @@ func ProvideService(
rr routing.RouteRegister,
orgService org.Service,
tracing *tracing.TracingService,
serverLockService *serverlock.ServerLockService,
db db.DB,
kvStore kvstore.KVStore,
) (*service, error) {
s := &service{
cfg: cfg,
@ -141,6 +147,7 @@ func ProvideService(
tracing: tracing,
db: db, // For Unified storage
metrics: metrics.ProvideRegisterer(),
kvStore: kvStore,
}
// This will be used when running as a dskit service
@ -378,7 +385,10 @@ func (s *service) start(ctx context.Context) error {
}
// Install the API group+version
err = builder.InstallAPIs(Scheme, Codecs, server, serverConfig.RESTOptionsGetter, builders, o.StorageOptions, s.metrics)
err = builder.InstallAPIs(Scheme, Codecs, server, serverConfig.RESTOptionsGetter, builders, o.StorageOptions,
// Required for the dual writer initialization
s.metrics, kvstore.WithNamespace(s.kvStore, 0, "storage.dualwriting"), s.serverLockService,
)
if err != nil {
return err
}

View File

@ -15,7 +15,6 @@ import (
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/registry/apis/datasource"
"github.com/grafana/grafana/pkg/registry/apis/featuretoggle"
"github.com/grafana/grafana/pkg/registry/apis/query"
"github.com/grafana/grafana/pkg/registry/apis/query/client"
"github.com/grafana/grafana/pkg/services/accesscontrol/actest"
@ -23,7 +22,6 @@ import (
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
"github.com/grafana/grafana/pkg/services/apiserver/options"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/setting"
testdatasource "github.com/grafana/grafana/pkg/tsdb/grafana-testdata-datasource"
)
@ -95,13 +93,6 @@ func (p *DummyAPIFactory) MakeAPIServer(_ context.Context, tracer tracing.Tracer
tracer,
)
case "featuretoggle.grafana.app":
return featuretoggle.NewFeatureFlagAPIBuilder(
featuremgmt.WithFeatureManager(setting.FeatureMgmtSettings{}, nil), // none... for now
&actest.FakeAccessControl{ExpectedEvaluate: false},
&setting.Cfg{},
), nil
case "testdata.datasource.grafana.app":
return datasource.NewDataSourceAPIBuilder(
plugins.JSONData{