From 87c3d3b029e6b65ddf3f768c15b2684272bda801 Mon Sep 17 00:00:00 2001 From: Charandas Date: Wed, 17 Jan 2024 12:21:24 -0800 Subject: [PATCH] K8s: add the CRD server to the grafana-aggregator component (pkg/cmd) (#80759) --- .vscode/launch.json | 10 + go.mod | 2 + go.sum | 2 + pkg/aggregator/README.md | 7 +- pkg/aggregator/aggregator.go | 299 ++++++++++++++------ pkg/aggregator/config.go | 57 ++++ pkg/aggregator/crdRegistrationController.go | 214 ++++++++++++++ pkg/cmd/grafana/apiserver/cmd.go | 59 ++-- 8 files changed, 537 insertions(+), 113 deletions(-) create mode 100644 pkg/aggregator/config.go create mode 100644 pkg/aggregator/crdRegistrationController.go diff --git a/.vscode/launch.json b/.vscode/launch.json index 708b555d30d..f29f1d70114 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -21,6 +21,16 @@ "cwd": "${workspaceFolder}", "args": ["apiserver", "--secure-port=8443", "testdata.datasource.grafana.app"] }, + { + "name": "Run API Server (aggregator)", + "type": "go", + "request": "launch", + "mode": "auto", + "program": "${workspaceFolder}/pkg/cmd/grafana/", + "env": {}, + "cwd": "${workspaceFolder}", + "args": ["aggregator", "--secure-port", "8443"] + }, { "name": "Attach to Chrome", "port": 9222, diff --git a/go.mod b/go.mod index 5917ebeab8f..7ca8848b698 100644 --- a/go.mod +++ b/go.mod @@ -482,6 +482,8 @@ require ( github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect ) +require k8s.io/apiextensions-apiserver v0.29.0 // @grafana/grafana-app-platform-squad + // Use fork of crewjam/saml with fixes for some issues until changes get merged into upstream replace github.com/crewjam/saml => github.com/grafana/saml v0.4.15-0.20231025143828-a6c0e9b86a4c diff --git a/go.sum b/go.sum index f92df56c8dc..d36ac96ab9d 100644 --- a/go.sum +++ b/go.sum @@ -4153,6 +4153,8 @@ howett.net/plist v0.0.0-20181124034731-591f970eefbb/go.mod h1:vMygbs4qMhSZSc4lCU k8s.io/api v0.26.2/go.mod h1:1kjMQsFE+QHPfskEcVNgL3+Hp88B80uj0QtSOlj8itU= k8s.io/api v0.29.0 h1:NiCdQMY1QOp1H8lfRyeEf8eOwV6+0xA6XEE44ohDX2A= k8s.io/api v0.29.0/go.mod h1:sdVmXoz2Bo/cb77Pxi71IPTSErEW32xa4aXwKH7gfBA= +k8s.io/apiextensions-apiserver v0.29.0 h1:0VuspFG7Hj+SxyF/Z/2T0uFbI5gb5LRgEyUVE3Q4lV0= +k8s.io/apiextensions-apiserver v0.29.0/go.mod h1:TKmpy3bTS0mr9pylH0nOt/QzQRrW7/h7yLdRForMZwc= k8s.io/apimachinery v0.26.2/go.mod h1:ats7nN1LExKHvJ9TmwootT00Yz05MuYqPXEXaVeOy5I= k8s.io/apimachinery v0.29.0 h1:+ACVktwyicPz0oc6MTMLwa2Pw3ouLAfAon1wPLtG48o= k8s.io/apimachinery v0.29.0/go.mod h1:eVBxQ/cwiJxH58eK/jd/vAk4mrxmVlnpBH5J2GbMeis= diff --git a/pkg/aggregator/README.md b/pkg/aggregator/README.md index cb3252d0e30..3ba1df70605 100644 --- a/pkg/aggregator/README.md +++ b/pkg/aggregator/README.md @@ -35,10 +35,13 @@ equivalent to the idea with the same name under `core/v1/Service`. our `externalname` (`service.grafana.app`) implementation. We register separate `PostStartHooks` using `AddPostStartHookOrDie` on the GenericAPIServer to start the corresponding custom controller as well as requisite informer factories for our own `externalname` Kind. +4. For now, we bundle apiextensions-apiserver under our aggregator component. This is slightly different from K8s +where kube-apiserver is called the top-level component and controlplane, aggregator and apiextensions-apiserver +live under that instead. ### Gotchas (Pay Attention) -1. `grafana-aggregator` uses file storage under `/tmp`. System restarts won't preserve any configuration. - 1. Ensure any `externalname` and `APIService` configuration is in place post system restarts when developing locally. +1. `grafana-aggregator` uses file storage under `data/grafana-aggregator` (`apiregistration.k8s.io`, +`service.grafana.app`) and `data/grafana-apiextensions` (`apiextensions.k8s.io`). 2. Since `grafana-aggregator` outputs configuration (TLS and kubeconfig) that is used in the invocation of aggregated servers, ensure you start the aggregated service after launching the aggregator during local development. diff --git a/pkg/aggregator/aggregator.go b/pkg/aggregator/aggregator.go index b3a3d6364c3..74bc2118847 100644 --- a/pkg/aggregator/aggregator.go +++ b/pkg/aggregator/aggregator.go @@ -5,6 +5,9 @@ // Provenance-includes-location: https://github.com/kubernetes/kubernetes/blob/master/cmd/kube-apiserver/app/server.go // Provenance-includes-license: Apache-2.0 // Provenance-includes-copyright: The Kubernetes Authors. +// Provenance-includes-location: https://github.com/kubernetes/kubernetes/blob/master/pkg/controlplane/apiserver/apiextensions.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Kubernetes Authors. package aggregator @@ -14,12 +17,11 @@ import ( "io" "net" "net/http" + "path" "strings" "sync" "time" - "github.com/spf13/pflag" - servicev0alpha1 "github.com/grafana/grafana/pkg/apis/service/v0alpha1" serviceclientset "github.com/grafana/grafana/pkg/generated/clientset/versioned" informersv0alpha1 "github.com/grafana/grafana/pkg/generated/informers/externalversions" @@ -27,12 +29,18 @@ import ( grafanaAPIServer "github.com/grafana/grafana/pkg/services/grafana-apiserver" filestorage "github.com/grafana/grafana/pkg/services/grafana-apiserver/storage/file" + apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + apiextensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver" + apiextensionsinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions" + apiextensionsopenapi "k8s.io/apiextensions-apiserver/pkg/generated/openapi" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apiserver/pkg/endpoints/discovery/aggregated" openapinamer "k8s.io/apiserver/pkg/endpoints/openapi" genericfeatures "k8s.io/apiserver/pkg/features" genericapiserver "k8s.io/apiserver/pkg/server" @@ -40,34 +48,31 @@ import ( "k8s.io/apiserver/pkg/server/options" "k8s.io/apiserver/pkg/server/resourceconfig" utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/apiserver/pkg/util/openapi" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" v1helper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper" - "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1" + apiregistrationv1beta1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1" aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver" aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme" apiregistrationclientset "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset" apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1" apiregistrationInformers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1" "k8s.io/kube-aggregator/pkg/controllers/autoregister" + apiserver "k8s.io/kube-aggregator/pkg/controllers/status" aggregatoropenapi "k8s.io/kube-aggregator/pkg/generated/openapi" "k8s.io/kube-openapi/pkg/common" ) -type ExtraOptions struct { - ProxyClientCertFile string - ProxyClientKeyFile string -} - // AggregatorServerOptions contains the state for the aggregator apiserver type AggregatorServerOptions struct { - Builders []grafanaAPIServer.APIGroupBuilder - RecommendedOptions *options.RecommendedOptions - ExtraOptions *ExtraOptions - AlternateDNS []string + Builders []grafanaAPIServer.APIGroupBuilder + AlternateDNS []string + Config *Config + serviceResolver ServiceResolver sharedInformerFactory informersv0alpha1.SharedInformerFactory @@ -75,15 +80,59 @@ type AggregatorServerOptions struct { StdErr io.Writer } -func NewAggregatorServerOptions(out, errOut io.Writer) *AggregatorServerOptions { - return &AggregatorServerOptions{ - StdOut: out, - StdErr: errOut, - ExtraOptions: &ExtraOptions{}, - Builders: []grafanaAPIServer.APIGroupBuilder{ - service.NewServiceAPIBuilder(), - }, +func NewAggregatorServerOptions(out, errOut io.Writer, + options *options.RecommendedOptions, + extraConfig *ExtraConfig, +) (*AggregatorServerOptions, error) { + sharedConfig, err := initSharedConfig(options, aggregatorscheme.Codecs, nil) + if err != nil { + klog.Errorf("Error creating shared config: %s", err) + return nil, err } + + sharedInformerFactory, err := initSharedInformerFactory(sharedConfig) + if err != nil { + klog.Errorf("Error creating shared informer factory: %s", err) + return nil, err + } + + serviceResolver, err := initServiceResolver(sharedInformerFactory) + if err != nil { + klog.Errorf("Error creating service resolver: %s", err) + return nil, err + } + + fakeInformers := informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 10*time.Minute) + builders := []grafanaAPIServer.APIGroupBuilder{ + service.NewServiceAPIBuilder(), + } + + extensionsConfig, err := initApiExtensionsConfig(options, sharedConfig, fakeInformers, serviceResolver, extraConfig.DataPath) + if err != nil { + klog.Errorf("Error creating extensions config: %s", err) + return nil, err + } + + aggregatorConfig, err := initAggregatorConfig(options, sharedConfig, extraConfig, fakeInformers, builders, serviceResolver, extraConfig.DataPath) + if err != nil { + klog.Errorf("Error creating aggregator config: %s", err) + return nil, err + } + + return &AggregatorServerOptions{ + StdOut: out, + StdErr: errOut, + Builders: builders, + sharedInformerFactory: sharedInformerFactory, + serviceResolver: serviceResolver, + Config: &Config{ + Aggregator: aggregatorConfig, + ApiExtensions: extensionsConfig, + + SharedConfig: sharedConfig, + extraConfig: extraConfig, + }, + }, nil } func (o *AggregatorServerOptions) LoadAPIGroupBuilders() error { @@ -96,30 +145,35 @@ func (o *AggregatorServerOptions) LoadAPIGroupBuilders() error { return nil } -func (o *AggregatorServerOptions) Config(codecs serializer.CodecFactory) (*genericapiserver.RecommendedConfig, error) { - if err := o.RecommendedOptions.SecureServing.MaybeDefaultWithSelfSignedCerts( - "localhost", o.AlternateDNS, []net.IP{net.IPv4(127, 0, 0, 1)}, +func initSharedConfig(options *options.RecommendedOptions, codecs serializer.CodecFactory, alternateDNS []string) (*genericapiserver.RecommendedConfig, error) { + if err := options.SecureServing.MaybeDefaultWithSelfSignedCerts( + "localhost", alternateDNS, []net.IP{net.IPv4(127, 0, 0, 1)}, ); err != nil { return nil, fmt.Errorf("error creating self-signed certificates: %v", err) } - o.RecommendedOptions.Authentication.RemoteKubeConfigFileOptional = true - o.RecommendedOptions.Authorization.RemoteKubeConfigFileOptional = true + options.Authentication.RemoteKubeConfigFileOptional = true + options.Authorization.RemoteKubeConfigFileOptional = true - o.RecommendedOptions.Admission = nil + options.Admission = nil - if o.RecommendedOptions.CoreAPI.CoreAPIKubeconfigPath == "" { - o.RecommendedOptions.CoreAPI = nil + if options.CoreAPI.CoreAPIKubeconfigPath == "" { + options.CoreAPI = nil } serverConfig := genericapiserver.NewRecommendedConfig(codecs) - if o.RecommendedOptions.CoreAPI == nil { - if err := o.ModifiedApplyTo(serverConfig); err != nil { + // NOTE: AggregatedDiscoveryGroupManager in kube-apiserver is set up by controlplane APIServerConfig creation + // Here, we adopt that one line in addition to what recommendedOptions gives us + // Without it, CRDs work on API routes (and are registered in openapi) but not discoverable by kubectl + serverConfig.AggregatedDiscoveryGroupManager = aggregated.NewResourceManager("apis") + + if options.CoreAPI == nil { + if err := modifiedApplyTo(options, serverConfig); err != nil { return nil, err } } else { - if err := o.RecommendedOptions.ApplyTo(serverConfig); err != nil { + if err := options.ApplyTo(serverConfig); err != nil { return nil, err } } @@ -129,48 +183,48 @@ func (o *AggregatorServerOptions) Config(codecs serializer.CodecFactory) (*gener // A copy of ApplyTo in recommended.go, but for >= 0.28, server pkg in apiserver does a bit extra causing // a panic when CoreAPI is set to nil -func (o *AggregatorServerOptions) ModifiedApplyTo(config *genericapiserver.RecommendedConfig) error { - if err := o.RecommendedOptions.Etcd.ApplyTo(&config.Config); err != nil { +func modifiedApplyTo(options *options.RecommendedOptions, config *genericapiserver.RecommendedConfig) error { + if err := options.Etcd.ApplyTo(&config.Config); err != nil { return err } - if err := o.RecommendedOptions.EgressSelector.ApplyTo(&config.Config); err != nil { + if err := options.EgressSelector.ApplyTo(&config.Config); err != nil { return err } - if err := o.RecommendedOptions.Traces.ApplyTo(config.Config.EgressSelector, &config.Config); err != nil { + if err := options.Traces.ApplyTo(config.Config.EgressSelector, &config.Config); err != nil { return err } - if err := o.RecommendedOptions.SecureServing.ApplyTo(&config.Config.SecureServing, &config.Config.LoopbackClientConfig); err != nil { + if err := options.SecureServing.ApplyTo(&config.Config.SecureServing, &config.Config.LoopbackClientConfig); err != nil { return err } - if err := o.RecommendedOptions.Authentication.ApplyTo(&config.Config.Authentication, config.SecureServing, config.OpenAPIConfig); err != nil { + if err := options.Authentication.ApplyTo(&config.Config.Authentication, config.SecureServing, config.OpenAPIConfig); err != nil { return err } - if err := o.RecommendedOptions.Authorization.ApplyTo(&config.Config.Authorization); err != nil { + if err := options.Authorization.ApplyTo(&config.Config.Authorization); err != nil { return err } - if err := o.RecommendedOptions.Audit.ApplyTo(&config.Config); err != nil { + if err := options.Audit.ApplyTo(&config.Config); err != nil { return err } // TODO: determine whether we need flow control (API priority and fairness) - //if err := o.RecommendedOptions.Features.ApplyTo(&config.Config); err != nil { + //if err := options.Features.ApplyTo(&config.Config); err != nil { // return err //} - if err := o.RecommendedOptions.CoreAPI.ApplyTo(config); err != nil { + if err := options.CoreAPI.ApplyTo(config); err != nil { return err } - _, err := o.RecommendedOptions.ExtraAdmissionInitializers(config) + _, err := options.ExtraAdmissionInitializers(config) if err != nil { return err } return nil } -func (o *AggregatorServerOptions) getMergedOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenAPIDefinition { +func getMergedOpenAPIDefinitions(builders []grafanaAPIServer.APIGroupBuilder, ref common.ReferenceCallback) map[string]common.OpenAPIDefinition { // Add OpenAPI specs for each group+version - prerequisiteAPIs := grafanaAPIServer.GetOpenAPIDefinitions(o.Builders)(ref) + prerequisiteAPIs := grafanaAPIServer.GetOpenAPIDefinitions(builders)(ref) aggregatorAPIs := aggregatoropenapi.GetOpenAPIDefinitions(ref) for k, v := range prerequisiteAPIs { @@ -180,29 +234,96 @@ func (o *AggregatorServerOptions) getMergedOpenAPIDefinitions(ref common.Referen return aggregatorAPIs } -func (o *AggregatorServerOptions) AddFlags(fs *pflag.FlagSet) { - if o == nil { - return +func initSharedInformerFactory(sharedConfig *genericapiserver.RecommendedConfig) (informersv0alpha1.SharedInformerFactory, error) { + serviceClient, err := serviceclientset.NewForConfig(sharedConfig.LoopbackClientConfig) + if err != nil { + return nil, err } - - o.RecommendedOptions.AddFlags(fs) - - fs.StringVar(&o.ExtraOptions.ProxyClientCertFile, "proxy-client-cert-file", o.ExtraOptions.ProxyClientCertFile, - "path to proxy client cert file") - - fs.StringVar(&o.ExtraOptions.ProxyClientKeyFile, "proxy-client-key-file", o.ExtraOptions.ProxyClientKeyFile, - "path to proxy client cert file") + return informersv0alpha1.NewSharedInformerFactory( + serviceClient, + 5*time.Minute, // this is effectively used as a refresh interval right now. Might want to do something nicer later on. + ), nil } -func (o *AggregatorServerOptions) CreateAggregatorConfig() (*aggregatorapiserver.Config, error) { - sharedConfig, err := o.Config(aggregatorscheme.Codecs) - if err != nil { - klog.Errorf("Error translating server options to config: %s", err) +func initServiceResolver(factory informersv0alpha1.SharedInformerFactory) (apiserver.ServiceResolver, error) { + return NewExternalNameResolver(factory.Service().V0alpha1().ExternalNames().Lister()), nil +} + +func initApiExtensionsConfig(options *options.RecommendedOptions, + sharedConfig *genericapiserver.RecommendedConfig, + fakeInfomers informers.SharedInformerFactory, + serviceResolver apiserver.ServiceResolver, + dataPath string, +) (*apiextensionsapiserver.Config, error) { + // make a shallow copy to let us twiddle a few things + // most of the config actually remains the same. We only need to mess with a couple items related to the particulars of the api extensions + genericConfig := sharedConfig.Config + + genericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{} + genericConfig.RESTOptionsGetter = nil + + // copy the etcd options so we don't mutate originals. + // we assume that the etcd options have been completed already. avoid messing with anything outside + // of changes to StorageConfig as that may lead to unexpected behavior when the options are applied. + etcdOptions := *options.Etcd + // this is where the true decodable levels come from. + etcdOptions.StorageConfig.Codec = apiextensionsapiserver.Codecs.LegacyCodec(apiextensionsv1beta1.SchemeGroupVersion, v1.SchemeGroupVersion) + // prefer the more compact serialization (v1beta1) for storage until https://issue.k8s.io/82292 is resolved for objects whose v1 serialization is too big but whose v1beta1 serialization can be stored + etcdOptions.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(apiextensionsv1beta1.SchemeGroupVersion, schema.GroupKind{Group: apiextensionsv1beta1.GroupName}) + etcdOptions.SkipHealthEndpoints = true // avoid double wiring of health checks + if err := etcdOptions.ApplyTo(&genericConfig); err != nil { return nil, err } - commandOptions := *o.RecommendedOptions + restOptionsGetter := filestorage.NewRESTOptionsGetter(path.Join(dataPath, "grafana-apiextensionsserver"), etcdOptions.StorageConfig) + genericConfig.RESTOptionsGetter = restOptionsGetter + // NOTE: ignoring genericConfig.ResourceTransformers in crdOptionsGetter creation for now + // crdOptionsGetter := apiextensionsoptions.NewCRDRESTOptionsGetter(etcdOptions, genericConfig.ResourceTransformers, ) + // The following is equivalent code to apiextensionsoptions.NewCRDRESTOptionsGetter with lesser dependencies + crdEtcdOptions := etcdOptions + crdEtcdOptions.StorageConfig.Codec = unstructured.UnstructuredJSONScheme + crdEtcdOptions.StorageConfig.StorageObjectCountTracker = genericConfig.StorageObjectCountTracker + crdEtcdOptions.WatchCacheSizes = nil // this control is not provided for custom resources + + // override MergedResourceConfig with apiextensions defaults and registry + mergedResourceConfig, err := resourceconfig.MergeAPIResourceConfigs(apiextensionsapiserver.DefaultAPIResourceConfigSource(), nil, apiextensionsapiserver.Scheme) + if err != nil { + return nil, err + } + genericConfig.MergedResourceConfig = mergedResourceConfig + + genericConfig.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(apiextensionsopenapi.GetOpenAPIDefinitions), openapinamer.NewDefinitionNamer(apiextensionsapiserver.Scheme, apiextensionsapiserver.Scheme)) + + apiextensionsConfig := &apiextensionsapiserver.Config{ + GenericConfig: &genericapiserver.RecommendedConfig{ + Config: genericConfig, + SharedInformerFactory: fakeInfomers, + }, + ExtraConfig: apiextensionsapiserver.ExtraConfig{ + CRDRESTOptionsGetter: filestorage.NewRESTOptionsGetter(path.Join(dataPath, "grafana-apiextensionsserver"), crdEtcdOptions.StorageConfig), + // TODO: remove the hardcod when HA story is more developed + MasterCount: 1, + // TODO: leaving AuthResolverWrapper unset doesn't impact basic operation of CRDs + // AuthResolverWrapper: authResolverWrapper, + ServiceResolver: serviceResolver, + }, + } + + // we need to clear the poststarthooks so we don't add them multiple times to all the servers (that fails) + apiextensionsConfig.GenericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{} + + return apiextensionsConfig, nil +} + +func initAggregatorConfig(options *options.RecommendedOptions, + sharedConfig *genericapiserver.RecommendedConfig, + extra *ExtraConfig, + fakeInformers informers.SharedInformerFactory, + builders []grafanaAPIServer.APIGroupBuilder, + serviceResolver apiserver.ServiceResolver, + dataPath string, +) (*aggregatorapiserver.Config, error) { // make a shallow copy to let us twiddle a few things // most of the config actually remains the same. We only need to mess with a couple items related to the particulars of the aggregator genericConfig := sharedConfig.Config @@ -218,10 +339,16 @@ func (o *AggregatorServerOptions) CreateAggregatorConfig() (*aggregatorapiserver } genericConfig.MergedResourceConfig = mergedResourceConfig - namer := openapinamer.NewDefinitionNamer(aggregatorscheme.Scheme) - genericConfig.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(o.getMergedOpenAPIDefinitions, namer) + getOpenAPIDefinitionsFunc := func() common.GetOpenAPIDefinitions { + return func(ref common.ReferenceCallback) map[string]common.OpenAPIDefinition { + return getMergedOpenAPIDefinitions(builders, ref) + } + } + + namer := openapinamer.NewDefinitionNamer(aggregatorscheme.Scheme, apiextensionsapiserver.Scheme) + genericConfig.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(getOpenAPIDefinitionsFunc(), namer) genericConfig.OpenAPIV3Config.Info.Title = "Kubernetes" - genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(o.getMergedOpenAPIDefinitions, namer) + genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(getOpenAPIDefinitionsFunc(), namer) genericConfig.OpenAPIConfig.Info.Title = "Kubernetes" if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) && @@ -235,31 +362,18 @@ func (o *AggregatorServerOptions) CreateAggregatorConfig() (*aggregatorapiserver // copy the etcd options so we don't mutate originals. // we assume that the etcd options have been completed already. avoid messing with anything outside // of changes to StorageConfig as that may lead to unexpected behavior when the options are applied. - etcdOptions := *commandOptions.Etcd + etcdOptions := *options.Etcd etcdOptions.StorageConfig.Codec = aggregatorscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion, - v1beta1.SchemeGroupVersion, + apiregistrationv1beta1.SchemeGroupVersion, servicev0alpha1.SchemeGroupVersion) etcdOptions.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(v1.SchemeGroupVersion, - schema.GroupKind{Group: v1beta1.GroupName}, + schema.GroupKind{Group: apiregistrationv1beta1.GroupName}, schema.GroupKind{Group: servicev0alpha1.GROUP}) - // etcdOptions.StorageConfig.Transport.ServerList = []string{"127.0.0.1:2379"} etcdOptions.SkipHealthEndpoints = true // avoid double wiring of health checks if err := etcdOptions.ApplyTo(&genericConfig); err != nil { return nil, err } - genericConfig.RESTOptionsGetter = filestorage.NewRESTOptionsGetter("/tmp/grafana.aggregator", etcdOptions.StorageConfig) - - versionedInformers := informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 10*time.Minute) - - serviceClient, err := serviceclientset.NewForConfig(genericConfig.LoopbackClientConfig) - if err != nil { - return nil, err - } - o.sharedInformerFactory = informersv0alpha1.NewSharedInformerFactory( - serviceClient, - 5*time.Minute, // this is effectively used as a refresh interval right now. Might want to do something nicer later on. - ) - serviceResolver := NewExternalNameResolver(o.sharedInformerFactory.Service().V0alpha1().ExternalNames().Lister()) + genericConfig.RESTOptionsGetter = filestorage.NewRESTOptionsGetter(path.Join(dataPath, "grafana-aggregator"), etcdOptions.StorageConfig) genericConfig.DisabledPostStartHooks = genericConfig.DisabledPostStartHooks.Insert("apiservice-status-available-controller") genericConfig.DisabledPostStartHooks = genericConfig.DisabledPostStartHooks.Insert("start-kube-aggregator-informers") @@ -267,12 +381,12 @@ func (o *AggregatorServerOptions) CreateAggregatorConfig() (*aggregatorapiserver aggregatorConfig := &aggregatorapiserver.Config{ GenericConfig: &genericapiserver.RecommendedConfig{ Config: genericConfig, - SharedInformerFactory: versionedInformers, + SharedInformerFactory: fakeInformers, ClientConfig: genericConfig.LoopbackClientConfig, }, ExtraConfig: aggregatorapiserver.ExtraConfig{ - ProxyClientCertFile: o.ExtraOptions.ProxyClientCertFile, - ProxyClientKeyFile: o.ExtraOptions.ProxyClientKeyFile, + ProxyClientCertFile: extra.ProxyClientCertFile, + ProxyClientKeyFile: extra.ProxyClientKeyFile, // NOTE: while ProxyTransport can be skipped in the configuration, it allows honoring // DISABLE_HTTP2, HTTPS_PROXY and NO_PROXY env vars as needed ProxyTransport: createProxyTransport(), @@ -287,15 +401,15 @@ func (o *AggregatorServerOptions) CreateAggregatorConfig() (*aggregatorapiserver return aggregatorConfig, nil } -func (o *AggregatorServerOptions) CreateAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget) (*aggregatorapiserver.APIAggregator, error) { - completedConfig := aggregatorConfig.Complete() +func (o *AggregatorServerOptions) CreateAggregatorServer(delegateAPIServer genericapiserver.DelegationTarget, apiExtensionsInformers apiextensionsinformers.SharedInformerFactory) (*aggregatorapiserver.APIAggregator, error) { + completedConfig := o.Config.AggregatorComplete aggregatorServer, err := completedConfig.NewWithDelegate(delegateAPIServer) if err != nil { return nil, err } // create controllers for auto-registration - apiRegistrationClient, err := apiregistrationclient.NewForConfig(aggregatorConfig.GenericConfig.LoopbackClientConfig) + apiRegistrationClient, err := apiregistrationclient.NewForConfig(completedConfig.GenericConfig.LoopbackClientConfig) if err != nil { return nil, err } @@ -303,15 +417,21 @@ func (o *AggregatorServerOptions) CreateAggregatorServer(aggregatorConfig *aggre autoRegistrationController := autoregister.NewAutoRegisterController(aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(), apiRegistrationClient) apiServices := apiServicesToRegister(delegateAPIServer, autoRegistrationController) + crdRegistrationController := NewCRDRegistrationController( + apiExtensionsInformers.Apiextensions().V1().CustomResourceDefinitions(), + autoRegistrationController) + // Imbue all builtin group-priorities onto the aggregated discovery - if aggregatorConfig.GenericConfig.AggregatedDiscoveryGroupManager != nil { + if completedConfig.GenericConfig.AggregatedDiscoveryGroupManager != nil { for gv, entry := range apiVersionPriorities { - aggregatorConfig.GenericConfig.AggregatedDiscoveryGroupManager.SetGroupVersionPriority(metav1.GroupVersion(gv), int(entry.group), int(entry.version)) + completedConfig.GenericConfig.AggregatedDiscoveryGroupManager.SetGroupVersionPriority(metav1.GroupVersion(gv), int(entry.group), int(entry.version)) } } err = aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error { + go crdRegistrationController.Run(5, context.StopCh) go func() { + crdRegistrationController.WaitForInitialSync() autoRegistrationController.Run(5, context.StopCh) }() return nil @@ -362,7 +482,7 @@ func (o *AggregatorServerOptions) CreateAggregatorServer(aggregatorConfig *aggre // Install the API Group+version for _, b := range o.Builders { - g, err := b.GetAPIGroupInfo(aggregatorscheme.Scheme, aggregatorscheme.Codecs, aggregatorConfig.GenericConfig.RESTOptionsGetter) + g, err := b.GetAPIGroupInfo(aggregatorscheme.Scheme, aggregatorscheme.Codecs, completedConfig.GenericConfig.RESTOptionsGetter) if err != nil { return nil, err } @@ -455,6 +575,7 @@ var apiVersionPriorities = map[schema.GroupVersion]priority{ {Group: "admissionregistration.k8s.io", Version: "v1"}: {group: 16700, version: 15}, {Group: "admissionregistration.k8s.io", Version: "v1beta1"}: {group: 16700, version: 12}, {Group: "admissionregistration.k8s.io", Version: "v1alpha1"}: {group: 16700, version: 9}, + {Group: "apiextensions.k8s.io", Version: "v1"}: {group: 16700, version: 15}, // Append a new group to the end of the list if unsure. // You can use min(existing group)-100 as the initial value for a group. // Version can be set to 9 (to have space around) for a new group. diff --git a/pkg/aggregator/config.go b/pkg/aggregator/config.go new file mode 100644 index 00000000000..e7bcd84e286 --- /dev/null +++ b/pkg/aggregator/config.go @@ -0,0 +1,57 @@ +package aggregator + +import ( + "github.com/spf13/pflag" + apiextensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver" + genericapiserver "k8s.io/apiserver/pkg/server" + "k8s.io/apiserver/pkg/server/options" + aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver" +) + +type ExtraConfig struct { + ProxyClientCertFile string + ProxyClientKeyFile string + + DataPath string +} + +type Config struct { + Aggregator *aggregatorapiserver.Config + ApiExtensions *apiextensionsapiserver.Config + + AggregatorComplete aggregatorapiserver.CompletedConfig + ApiExtensionsComplete apiextensionsapiserver.CompletedConfig + + recommendedOptions *options.RecommendedOptions + SharedConfig *genericapiserver.RecommendedConfig + extraConfig *ExtraConfig +} + +func (c *Config) AddFlags(fs *pflag.FlagSet) { + if c == nil { + return + } + + c.recommendedOptions.AddFlags(fs) +} + +func (c *Config) Complete() { + if c == nil { + return + } + + c.ApiExtensionsComplete = c.ApiExtensions.Complete() + c.AggregatorComplete = c.Aggregator.Complete() +} + +func (ec *ExtraConfig) AddFlags(fs *pflag.FlagSet) { + if ec == nil { + return + } + + fs.StringVar(&ec.ProxyClientCertFile, "proxy-client-cert-file", ec.ProxyClientCertFile, + "path to proxy client cert file") + + fs.StringVar(&ec.ProxyClientKeyFile, "proxy-client-key-file", ec.ProxyClientKeyFile, + "path to proxy client cert file") +} diff --git a/pkg/aggregator/crdRegistrationController.go b/pkg/aggregator/crdRegistrationController.go new file mode 100644 index 00000000000..f3bbdd3c85d --- /dev/null +++ b/pkg/aggregator/crdRegistrationController.go @@ -0,0 +1,214 @@ +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/kubernetes/kubernetes/blob/master/pkg/controlplane/controller/crdregistration/crdregistration_controller.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Kubernetes Authors. + +package aggregator + +import ( + "fmt" + "time" + + "k8s.io/klog/v2" + + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + crdinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1" + crdlisters "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" +) + +// AutoAPIServiceRegistration is an interface which callers can re-declare locally and properly cast to for +// adding and removing APIServices +type AutoAPIServiceRegistration interface { + // AddAPIServiceToSync adds an API service to auto-register. + AddAPIServiceToSync(in *v1.APIService) + // RemoveAPIServiceToSync removes an API service to auto-register. + RemoveAPIServiceToSync(name string) +} + +type crdRegistrationController struct { + crdLister crdlisters.CustomResourceDefinitionLister + crdSynced cache.InformerSynced + + apiServiceRegistration AutoAPIServiceRegistration + + syncHandler func(groupVersion schema.GroupVersion) error + + syncedInitialSet chan struct{} + + // queue is where incoming work is placed to de-dup and to allow "easy" rate limited requeues on errors + // this is actually keyed by a groupVersion + queue workqueue.RateLimitingInterface +} + +// NewCRDRegistrationController returns a controller which will register CRD GroupVersions with the auto APIService registration +// controller so they automatically stay in sync. +func NewCRDRegistrationController(crdinformer crdinformers.CustomResourceDefinitionInformer, apiServiceRegistration AutoAPIServiceRegistration) *crdRegistrationController { + c := &crdRegistrationController{ + crdLister: crdinformer.Lister(), + crdSynced: crdinformer.Informer().HasSynced, + apiServiceRegistration: apiServiceRegistration, + syncedInitialSet: make(chan struct{}), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "crd_autoregistration_controller"), + } + c.syncHandler = c.handleVersionUpdate + + _, _ = crdinformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + cast := obj.(*apiextensionsv1.CustomResourceDefinition) + c.enqueueCRD(cast) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + // Enqueue both old and new object to make sure we remove and add appropriate API services. + // The working queue will resolve any duplicates and only changes will stay in the queue. + c.enqueueCRD(oldObj.(*apiextensionsv1.CustomResourceDefinition)) + c.enqueueCRD(newObj.(*apiextensionsv1.CustomResourceDefinition)) + }, + DeleteFunc: func(obj interface{}) { + cast, ok := obj.(*apiextensionsv1.CustomResourceDefinition) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + klog.V(2).Infof("Couldn't get object from tombstone %#v", obj) + return + } + cast, ok = tombstone.Obj.(*apiextensionsv1.CustomResourceDefinition) + if !ok { + klog.V(2).Infof("Tombstone contained unexpected object: %#v", obj) + return + } + } + c.enqueueCRD(cast) + }, + }) + + return c +} + +func (c *crdRegistrationController) Run(workers int, stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + // make sure the work queue is shutdown which will trigger workers to end + defer c.queue.ShutDown() + + klog.Infof("Starting crd-autoregister controller") + defer klog.Infof("Shutting down crd-autoregister controller") + + // wait for your secondary caches to fill before starting your work + if !cache.WaitForNamedCacheSync("crd-autoregister", stopCh, c.crdSynced) { + return + } + + // process each item in the list once + if crds, err := c.crdLister.List(labels.Everything()); err != nil { + utilruntime.HandleError(err) + } else { + for _, crd := range crds { + for _, version := range crd.Spec.Versions { + if err := c.syncHandler(schema.GroupVersion{Group: crd.Spec.Group, Version: version.Name}); err != nil { + utilruntime.HandleError(err) + } + } + } + } + close(c.syncedInitialSet) + + // start up your worker threads based on workers. Some controllers have multiple kinds of workers + for i := 0; i < workers; i++ { + // runWorker will loop until "something bad" happens. The .Until will then rekick the worker + // after one second + go wait.Until(c.runWorker, time.Second, stopCh) + } + + // wait until we're told to stop + <-stopCh +} + +// WaitForInitialSync blocks until the initial set of CRD resources has been processed +func (c *crdRegistrationController) WaitForInitialSync() { + <-c.syncedInitialSet +} + +func (c *crdRegistrationController) runWorker() { + // hot loop until we're told to stop. processNextWorkItem will automatically wait until there's work + // available, so we don't worry about secondary waits + for c.processNextWorkItem() { + } +} + +// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit. +func (c *crdRegistrationController) processNextWorkItem() bool { + // pull the next work item from queue. It should be a key we use to lookup something in a cache + key, quit := c.queue.Get() + if quit { + return false + } + // you always have to indicate to the queue that you've completed a piece of work + defer c.queue.Done(key) + + // do your work on the key. This method will contains your "do stuff" logic + err := c.syncHandler(key.(schema.GroupVersion)) + if err == nil { + // if you had no error, tell the queue to stop tracking history for your key. This will + // reset things like failure counts for per-item rate limiting + c.queue.Forget(key) + return true + } + + // there was a failure so be sure to report it. This method allows for pluggable error handling + // which can be used for things like cluster-monitoring + utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err)) + // since we failed, we should requeue the item to work on later. This method will add a backoff + // to avoid hotlooping on particular items (they're probably still not going to work right away) + // and overall controller protection (everything I've done is broken, this controller needs to + // calm down or it can starve other useful work) cases. + c.queue.AddRateLimited(key) + + return true +} + +func (c *crdRegistrationController) enqueueCRD(crd *apiextensionsv1.CustomResourceDefinition) { + for _, version := range crd.Spec.Versions { + c.queue.Add(schema.GroupVersion{Group: crd.Spec.Group, Version: version.Name}) + } +} + +func (c *crdRegistrationController) handleVersionUpdate(groupVersion schema.GroupVersion) error { + apiServiceName := groupVersion.Version + "." + groupVersion.Group + + // check all CRDs. There shouldn't that many, but if we have problems later we can index them + crds, err := c.crdLister.List(labels.Everything()) + if err != nil { + return err + } + for _, crd := range crds { + if crd.Spec.Group != groupVersion.Group { + continue + } + for _, version := range crd.Spec.Versions { + if version.Name != groupVersion.Version || !version.Served { + continue + } + + c.apiServiceRegistration.AddAPIServiceToSync(&v1.APIService{ + ObjectMeta: metav1.ObjectMeta{Name: apiServiceName}, + Spec: v1.APIServiceSpec{ + Group: groupVersion.Group, + Version: groupVersion.Version, + GroupPriorityMinimum: 1000, // CRDs should have relatively low priority + VersionPriority: 100, // CRDs will be sorted by kube-like versions like any other APIService with the same VersionPriority + }, + }) + return nil + } + } + + c.apiServiceRegistration.RemoveAPIServiceToSync(apiServiceName) + return nil +} diff --git a/pkg/cmd/grafana/apiserver/cmd.go b/pkg/cmd/grafana/apiserver/cmd.go index dc35169e56e..338c1ca9662 100644 --- a/pkg/cmd/grafana/apiserver/cmd.go +++ b/pkg/cmd/grafana/apiserver/cmd.go @@ -5,8 +5,10 @@ import ( "path" "github.com/spf13/cobra" + genericapifilters "k8s.io/apiserver/pkg/endpoints/filters" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/options" + "k8s.io/apiserver/pkg/util/notfoundhandler" "k8s.io/client-go/tools/clientcmd" "k8s.io/component-base/cli" "k8s.io/klog/v2" @@ -18,7 +20,7 @@ import ( ) const ( - aggregatorDataPath = "data/grafana-aggregator" + aggregatorDataPath = "data" defaultAggregatorEtcdPathPrefix = "/registry/grafana.aggregator" ) @@ -73,9 +75,24 @@ func RunCLI() int { return cli.Run(cmd) } -func newCommandStartAggregator(o *aggregator.AggregatorServerOptions) *cobra.Command { +func newCommandStartAggregator() *cobra.Command { devAcknowledgementNotice := "The aggregator command is in heavy development. The entire setup is subject to change without notice" + cwd, err := os.Getwd() + if err != nil { + panic("could not determine current directory") + } + + extraConfig := &aggregator.ExtraConfig{ + DataPath: path.Join(cwd, aggregatorDataPath), + } + + // Register standard k8s flags with the command line + recommendedOptions := options.NewRecommendedOptions( + defaultAggregatorEtcdPathPrefix, + aggregatorscheme.Codecs.LegacyCodec(), // codec is passed to etcd and hence not used + ) + cmd := &cobra.Command{ Use: "aggregator", Short: "Run the grafana aggregator", @@ -83,10 +100,21 @@ func newCommandStartAggregator(o *aggregator.AggregatorServerOptions) *cobra.Com devAcknowledgementNotice, Example: "grafana aggregator", RunE: func(c *cobra.Command, args []string) error { - return run(o) + serverOptions, err := aggregator.NewAggregatorServerOptions(os.Stdout, os.Stderr, recommendedOptions, extraConfig) + serverOptions.Config.Complete() + + if err != nil { + klog.Errorf("Could not create aggregator server options: %s", err) + os.Exit(1) + } + + return run(serverOptions) }, } + recommendedOptions.AddFlags(cmd.Flags()) + extraConfig.AddFlags(cmd.Flags()) + return cmd } @@ -96,23 +124,20 @@ func run(serverOptions *aggregator.AggregatorServerOptions) error { return err } - serverOptions.RecommendedOptions.SecureServing.BindPort = 8443 - delegationTarget := genericapiserver.NewEmptyDelegate() - - config, err := serverOptions.CreateAggregatorConfig() + notFoundHandler := notfoundhandler.New(serverOptions.Config.SharedConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey) + apiExtensionsServer, err := serverOptions.Config.ApiExtensionsComplete.New(genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler)) if err != nil { - klog.Errorf("Error creating aggregator config: %s", err) return err } - aggregator, err := serverOptions.CreateAggregatorServer(config, delegationTarget) + aggregator, err := serverOptions.CreateAggregatorServer(apiExtensionsServer.GenericAPIServer, apiExtensionsServer.Informers) if err != nil { klog.Errorf("Error creating aggregator server: %s", err) return err } // Install the API Group+version - err = grafanaapiserver.InstallAPIs(aggregator.GenericAPIServer, config.GenericConfig.RESTOptionsGetter, serverOptions.Builders) + err = grafanaapiserver.InstallAPIs(aggregator.GenericAPIServer, serverOptions.Config.Aggregator.GenericConfig.RESTOptionsGetter, serverOptions.Builders) if err != nil { klog.Errorf("Error installing apis: %s", err) return err @@ -120,13 +145,12 @@ func run(serverOptions *aggregator.AggregatorServerOptions) error { if err := clientcmd.WriteToFile( utils.FormatKubeConfig(aggregator.GenericAPIServer.LoopbackClientConfig), - path.Join(aggregatorDataPath, "aggregator.kubeconfig"), + path.Join(aggregatorDataPath, "grafana-aggregator", "aggregator.kubeconfig"), ); err != nil { klog.Errorf("Error persisting aggregator.kubeconfig: %s", err) return err } - // Finish the config (a noop for now) prepared, err := aggregator.PrepareRun() if err != nil { return err @@ -140,16 +164,7 @@ func run(serverOptions *aggregator.AggregatorServerOptions) error { } func RunCobraWrapper() int { - serverOptions := aggregator.NewAggregatorServerOptions(os.Stdout, os.Stderr) - // Register standard k8s flags with the command line - serverOptions.RecommendedOptions = options.NewRecommendedOptions( - defaultAggregatorEtcdPathPrefix, - aggregatorscheme.Codecs.LegacyCodec(), // codec is passed to etcd and hence not used - ) - - cmd := newCommandStartAggregator(serverOptions) - - serverOptions.AddFlags(cmd.Flags()) + cmd := newCommandStartAggregator() return cli.Run(cmd) }