K8s: Use wire to initalize the resource client (#93221)

This commit is contained in:
Ryan McKinley 2024-09-12 17:22:27 +03:00 committed by GitHub
parent 85fbc8e8e5
commit 45eb72e95a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 134 additions and 73 deletions

View File

@ -1,17 +1,6 @@
package dashboard
import (
"github.com/prometheus/client_golang/prometheus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/rest"
genericapiserver "k8s.io/apiserver/pkg/server"
common "k8s.io/kube-openapi/pkg/common"
"k8s.io/kube-openapi/pkg/spec3"
dashboard "github.com/grafana/grafana/pkg/apis/dashboard/v0alpha1"
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
@ -28,9 +17,23 @@ import (
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/legacysql"
"github.com/grafana/grafana/pkg/storage/unified/apistore"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/prometheus/client_golang/prometheus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/rest"
genericapiserver "k8s.io/apiserver/pkg/server"
common "k8s.io/kube-openapi/pkg/common"
"k8s.io/kube-openapi/pkg/spec3"
)
var _ builder.APIGroupBuilder = (*DashboardsAPIBuilder)(nil)
var (
_ builder.APIGroupBuilder = (*DashboardsAPIBuilder)(nil)
_ builder.OpenAPIPostProcessor = (*DashboardsAPIBuilder)(nil)
)
// This is used just so wire has something unique to return
type DashboardsAPIBuilder struct {
@ -51,6 +54,7 @@ func RegisterAPIService(cfg *setting.Cfg, features featuremgmt.FeatureToggles,
reg prometheus.Registerer,
sql db.DB,
tracing *tracing.TracingService,
unified resource.ResourceClient,
) *DashboardsAPIBuilder {
if !features.IsEnabledGlobally(featuremgmt.FlagGrafanaAPIServerWithExperimentalAPIs) {
return nil // skip registration unless opting into experimental apis

View File

@ -153,6 +153,7 @@ import (
"github.com/grafana/grafana/pkg/services/user"
"github.com/grafana/grafana/pkg/services/user/userimpl"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified"
"github.com/grafana/grafana/pkg/tsdb/azuremonitor"
cloudmonitoring "github.com/grafana/grafana/pkg/tsdb/cloud-monitoring"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch"
@ -205,6 +206,7 @@ var wireBasicSet = wire.NewSet(
mysql.ProvideService,
mssql.ProvideService,
store.ProvideEntityEventsService,
unified.ProvideUnifiedStorageClient,
httpclientprovider.New,
wire.Bind(new(httpclient.Provider), new(*sdkhttpclient.Provider)),
serverlock.ProvideService,

View File

@ -4,12 +4,11 @@ import (
"fmt"
"net"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/setting"
"github.com/spf13/pflag"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/options"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/setting"
)
type StorageType string
@ -22,11 +21,19 @@ const (
StorageTypeUnifiedGrpc StorageType = "unified-grpc"
)
type StorageOptions struct {
StorageType StorageType
DataPath string
Address string
UnifiedStorageConfig map[string]setting.UnifiedStorageConfig
type StorageOptions struct { // The desired storage type
StorageType StorageType
// For unified-grpc, the address is required
Address string
// For file storage, this is the requested path
DataPath string
// {resource}.{group} = 1|2|3|4
UnifiedStorageConfig map[string]setting.UnifiedStorageConfig
// TODO... this will be moved to UnifiedStorageConfig
DualWriterDataSyncJobEnabled map[string]bool
}

View File

@ -8,20 +8,6 @@ import (
"github.com/grafana/dskit/services"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apiserver/pkg/endpoints/responsewriter"
genericapiserver "k8s.io/apiserver/pkg/server"
clientrest "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
dataplaneaggregator "github.com/grafana/grafana/pkg/aggregator/apiserver"
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/apimachinery/identity"
@ -50,7 +36,16 @@ import (
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/apistore"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/sql"
"github.com/prometheus/client_golang/prometheus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apiserver/pkg/endpoints/responsewriter"
genericapiserver "k8s.io/apiserver/pkg/server"
clientrest "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
)
var (
@ -128,6 +123,7 @@ type service struct {
datasources datasource.ScopedPluginDatasourceProvider
contextProvider datasource.PluginContextWrapper
pluginStore pluginstore.Store
unified resource.ResourceClient
}
func ProvideService(
@ -143,6 +139,7 @@ func ProvideService(
datasources datasource.ScopedPluginDatasourceProvider,
contextProvider datasource.PluginContextWrapper,
pluginStore pluginstore.Store,
unified resource.ResourceClient,
) (*service, error) {
s := &service{
cfg: cfg,
@ -161,6 +158,7 @@ func ProvideService(
contextProvider: contextProvider,
pluginStore: pluginStore,
serverLockService: serverLockService,
unified: unified,
}
// This will be used when running as a dskit service
@ -284,47 +282,17 @@ func (s *service) start(ctx context.Context) error {
serverConfig.LoopbackClientConfig.Transport = transport
serverConfig.LoopbackClientConfig.TLSClientConfig = clientrest.TLSClientConfig{}
switch o.StorageOptions.StorageType {
case grafanaapiserveroptions.StorageTypeEtcd:
if o.StorageOptions.StorageType == grafanaapiserveroptions.StorageTypeEtcd {
if err := o.RecommendedOptions.Etcd.Validate(); len(err) > 0 {
return err[0]
}
if err := o.RecommendedOptions.Etcd.ApplyTo(&serverConfig.Config); err != nil {
return err
}
case grafanaapiserveroptions.StorageTypeUnified:
server, err := sql.ProvideResourceServer(s.db, s.cfg, s.features, s.tracing)
if err != nil {
return err
}
client := resource.NewLocalResourceClient(server)
serverConfig.Config.RESTOptionsGetter = apistore.NewRESTOptionsGetterForClient(client,
o.RecommendedOptions.Etcd.StorageConfig)
case grafanaapiserveroptions.StorageTypeUnifiedGrpc:
opts := []grpc.DialOption{
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
grpc.WithTransportCredentials(insecure.NewCredentials()),
}
// Create a connection to the gRPC server
conn, err := grpc.NewClient(o.StorageOptions.Address, opts...)
if err != nil {
return err
}
// Create a client instance
client := resource.NewResourceClient(conn)
serverConfig.Config.RESTOptionsGetter = apistore.NewRESTOptionsGetterForClient(client, o.RecommendedOptions.Etcd.StorageConfig)
case grafanaapiserveroptions.StorageTypeLegacy:
fallthrough
case grafanaapiserveroptions.StorageTypeFile:
restOptionsGetter, err := apistore.NewRESTOptionsGetterForFile(o.StorageOptions.DataPath, o.RecommendedOptions.Etcd.StorageConfig)
if err != nil {
return err
}
serverConfig.RESTOptionsGetter = restOptionsGetter
} else {
// Use unified storage client
serverConfig.Config.RESTOptionsGetter = apistore.NewRESTOptionsGetterForClient(
s.unified, o.RecommendedOptions.Etcd.StorageConfig)
}
// Add OpenAPI specs for each group+version

View File

@ -0,0 +1,80 @@
package unified
import (
"context"
"path/filepath"
infraDB "github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/apiserver/options"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/sql"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"gocloud.dev/blob/fileblob"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// This adds a UnifiedStorage client into the wire dependency tree
func ProvideUnifiedStorageClient(
cfg *setting.Cfg,
features featuremgmt.FeatureToggles,
db infraDB.DB,
tracer tracing.Tracer,
) (resource.ResourceClient, error) {
// See: apiserver.ApplyGrafanaConfig(cfg, features, o)
apiserverCfg := cfg.SectionWithEnvOverrides("grafana-apiserver")
opts := options.StorageOptions{
StorageType: options.StorageType(apiserverCfg.Key("storage_type").MustString(string(options.StorageTypeLegacy))),
DataPath: apiserverCfg.Key("storage_path").MustString(filepath.Join(cfg.DataPath, "grafana-apiserver")),
Address: apiserverCfg.Key("address").MustString(""),
}
switch opts.StorageType {
case options.StorageTypeFile:
if opts.DataPath == "" {
opts.DataPath = filepath.Join(cfg.DataPath, "grafana-apiserver")
}
bucket, err := fileblob.OpenBucket(filepath.Join(opts.DataPath, "resource"), &fileblob.Options{
CreateDir: true,
Metadata: fileblob.MetadataDontWrite, // skip
})
if err != nil {
return nil, err
}
backend, err := resource.NewCDKBackend(context.Background(), resource.CDKBackendOptions{
Bucket: bucket,
})
if err != nil {
return nil, err
}
server, err := resource.NewResourceServer(resource.ResourceServerOptions{
Backend: backend,
})
if err != nil {
return nil, err
}
return resource.NewLocalResourceClient(server), nil
case options.StorageTypeUnifiedGrpc:
// Create a connection to the gRPC server
conn, err := grpc.NewClient(opts.Address,
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return nil, err
}
return resource.NewResourceClient(conn), nil
// Use the local SQL
default:
server, err := sql.NewResourceServer(db, cfg, features, tracer)
if err != nil {
return nil, err
}
return resource.NewLocalResourceClient(server), nil
}
}

View File

@ -9,8 +9,8 @@ import (
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
)
// Creates a ResourceServer
func ProvideResourceServer(db infraDB.DB, cfg *setting.Cfg, features featuremgmt.FeatureToggles, tracer tracing.Tracer) (resource.ResourceServer, error) {
// Creates a new ResourceServer
func NewResourceServer(db infraDB.DB, cfg *setting.Cfg, features featuremgmt.FeatureToggles, tracer tracing.Tracer) (resource.ResourceServer, error) {
opts := resource.ResourceServerOptions{
Tracer: tracer,
}

View File

@ -84,7 +84,7 @@ func ProvideUnifiedStorageGrpcService(
}
func (s *service) start(ctx context.Context) error {
server, err := ProvideResourceServer(s.db, s.cfg, s.features, s.tracing)
server, err := NewResourceServer(s.db, s.cfg, s.features, s.tracing)
if err != nil {
return err
}