
482 lines
19 KiB

// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location:
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Kubernetes Authors.
// Provenance-includes-location:
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Kubernetes Authors.
// Provenance-includes-location:
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Kubernetes Authors.
package aggregator
import (
metav1 ""
utilnet ""
genericapiserver ""
v1 ""
v1helper ""
aggregatorapiserver ""
aggregatorscheme ""
apiregistrationclientset ""
apiregistrationclient ""
apiregistrationInformers ""
servicev0alpha1 ""
servicev0alpha1applyconfiguration ""
serviceclientset ""
informersv0alpha1 ""
func readCABundlePEM(path string, devMode bool) ([]byte, error) {
if devMode {
return nil, nil
// We can ignore the gosec G304 warning on this one because `path` comes
// from Grafana configuration (commandOptions.AggregatorOptions.APIServiceCABundleFile)
f, err := os.Open(path)
if err != nil {
return nil, err
defer func() {
if err := f.Close(); err != nil {
klog.Errorf("error closing remote services file: %s", err)
return io.ReadAll(f)
func readRemoteServices(path string) ([]RemoteService, error) {
// We can ignore the gosec G304 warning on this one because `path` comes
// from Grafana configuration (commandOptions.AggregatorOptions.RemoteServicesFile)
f, err := os.Open(path)
if err != nil {
return nil, err
defer func() {
if err := f.Close(); err != nil {
klog.Errorf("error closing remote services file: %s", err)
rawRemoteServices, err := io.ReadAll(f)
if err != nil {
return nil, err
remoteServices := make([]RemoteService, 0)
if err := yaml.Unmarshal(rawRemoteServices, &remoteServices); err != nil {
return nil, err
return remoteServices, nil
func CreateAggregatorConfig(commandOptions *options.Options, sharedConfig genericapiserver.RecommendedConfig, externalNamesNamespace string) (*Config, error) {
// Create a fake clientset and informers for the k8s v1 API group.
// These are not used in grafana's aggregator because v1 APIs are not available.
fakev1Informers := informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 10*time.Minute)
serviceClient, err := serviceclientset.NewForConfig(sharedConfig.LoopbackClientConfig)
if err != nil {
return nil, err
sharedInformerFactory := informersv0alpha1.NewSharedInformerFactory(
5*time.Minute, // this is effectively used as a refresh interval right now. Might want to do something nicer later on.
serviceResolver := NewExternalNameResolver(sharedInformerFactory.Service().V0alpha1().ExternalNames().Lister())
aggregatorConfig := &aggregatorapiserver.Config{
GenericConfig: &genericapiserver.RecommendedConfig{
Config: sharedConfig.Config,
SharedInformerFactory: fakev1Informers,
ClientConfig: sharedConfig.LoopbackClientConfig,
ExtraConfig: aggregatorapiserver.ExtraConfig{
ProxyClientCertFile: commandOptions.AggregatorOptions.ProxyClientCertFile,
ProxyClientKeyFile: commandOptions.AggregatorOptions.ProxyClientKeyFile,
// NOTE: while ProxyTransport can be skipped in the configuration, it allows honoring
// DISABLE_HTTP2, HTTPS_PROXY and NO_PROXY env vars as needed
ProxyTransport: createProxyTransport(),
ServiceResolver: serviceResolver,
if err := commandOptions.AggregatorOptions.ApplyTo(aggregatorConfig, commandOptions.RecommendedOptions.Etcd, commandOptions.StorageOptions.DataPath); err != nil {
return nil, err
serviceAPIBuilder := service.NewServiceAPIBuilder()
if err := serviceAPIBuilder.InstallSchema(aggregatorscheme.Scheme); err != nil {
return nil, err
APIVersionPriorities[serviceAPIBuilder.GetGroupVersion()] = Priority{Group: 15000, Version: int32(1)}
// Exit early, if no remote services file is configured
if commandOptions.AggregatorOptions.RemoteServicesFile == "" {
return NewConfig(aggregatorConfig, sharedInformerFactory, []builder.APIGroupBuilder{serviceAPIBuilder}, nil), nil
_, err = readCABundlePEM(commandOptions.AggregatorOptions.APIServiceCABundleFile, commandOptions.ExtraOptions.DevMode)
if err != nil {
return nil, err
remoteServices, err := readRemoteServices(commandOptions.AggregatorOptions.RemoteServicesFile)
if err != nil {
return nil, err
remoteServicesConfig := &RemoteServicesConfig{
// TODO: in practice, we should only use the insecure flag when commandOptions.ExtraOptions.DevMode == true
// But given the bug in K8s, we are forced to set it to true until the below PR is merged and available
InsecureSkipTLSVerify: true,
ExternalNamesNamespace: externalNamesNamespace,
// TODO: CABundle can't be set when insecure is true
// CABundle: caBundlePEM,
Services: remoteServices,
serviceClientSet: serviceClient,
return NewConfig(aggregatorConfig, sharedInformerFactory, []builder.APIGroupBuilder{serviceAPIBuilder}, remoteServicesConfig), nil
func CreateAggregatorServer(config *Config, delegateAPIServer genericapiserver.DelegationTarget, reg prometheus.Registerer) (*aggregatorapiserver.APIAggregator, error) {
aggregatorConfig := config.KubeAggregatorConfig
sharedInformerFactory := config.Informers
remoteServicesConfig := config.RemoteServicesConfig
externalNamesInformer := sharedInformerFactory.Service().V0alpha1().ExternalNames()
completedConfig := aggregatorConfig.Complete()
aggregatorServer, err := completedConfig.NewWithDelegate(delegateAPIServer)
if err != nil {
return nil, err
// create controllers for auto-registration
apiRegistrationClient, err := apiregistrationclient.NewForConfig(completedConfig.GenericConfig.LoopbackClientConfig)
if err != nil {
return nil, err
autoRegistrationController := autoregister.NewAutoRegisterController(aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(), apiRegistrationClient)
apiServices := apiServicesToRegister(delegateAPIServer, autoRegistrationController)
// Imbue all builtin group-priorities onto the aggregated discovery
if completedConfig.GenericConfig.AggregatedDiscoveryGroupManager != nil {
for gv, entry := range APIVersionPriorities {
completedConfig.GenericConfig.AggregatedDiscoveryGroupManager.SetGroupVersionPriority(metav1.GroupVersion(gv), int(entry.Group), int(entry.Version))
err = aggregatorServer.GenericAPIServer.AddPostStartHook("grafana-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error {
go autoRegistrationController.Run(5, context.StopCh)
return nil
if err != nil {
return nil, err
if remoteServicesConfig != nil {
addRemoteAPIServicesToRegister(remoteServicesConfig, autoRegistrationController)
externalNames := getRemoteExternalNamesToRegister(remoteServicesConfig)
err = aggregatorServer.GenericAPIServer.AddPostStartHook("grafana-apiserver-remote-autoregistration", func(ctx genericapiserver.PostStartHookContext) error {
controllers.WaitForCacheSync("grafana-apiserver-remote-autoregistration", ctx.StopCh, externalNamesInformer.Informer().HasSynced)
namespacedClient := remoteServicesConfig.serviceClientSet.ServiceV0alpha1().ExternalNames(remoteServicesConfig.ExternalNamesNamespace)
for _, externalName := range externalNames {
_, err := namespacedClient.Apply(context.Background(), externalName, metav1.ApplyOptions{
FieldManager: "grafana-aggregator",
Force: true,
if err != nil {
return err
return nil
if err != nil {
return nil, err
err = aggregatorServer.GenericAPIServer.AddBootSequenceHealthChecks(
if err != nil {
return nil, err
apiregistrationClient, err := apiregistrationclientset.NewForConfig(completedConfig.GenericConfig.LoopbackClientConfig)
if err != nil {
return nil, err
proxyCurrentCertKeyContentFunc := func() ([]byte, []byte) {
return nil, nil
if len(config.KubeAggregatorConfig.ExtraConfig.ProxyClientCertFile) > 0 && len(config.KubeAggregatorConfig.ExtraConfig.ProxyClientKeyFile) > 0 {
aggregatorProxyCerts, err := dynamiccertificates.NewDynamicServingContentFromFiles("aggregator-proxy-cert", config.KubeAggregatorConfig.ExtraConfig.ProxyClientCertFile, config.KubeAggregatorConfig.ExtraConfig.ProxyClientKeyFile)
if err != nil {
return nil, err
proxyCurrentCertKeyContentFunc = func() ([]byte, []byte) {
return aggregatorProxyCerts.CurrentCertKeyContent()
availableController, err := NewAvailableConditionController(
if err != nil {
return nil, err
aggregatorServer.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-override-available-controller", func(context genericapiserver.PostStartHookContext) error {
// if we end up blocking for long periods of time, we may need to increase workers.
go availableController.Run(5, context.StopCh)
return nil
aggregatorServer.GenericAPIServer.AddPostStartHookOrDie("start-grafana-aggregator-informers", func(context genericapiserver.PostStartHookContext) error {
return nil
for _, b := range config.Builders {
serviceAPIGroupInfo, err := b.GetAPIGroupInfo(
nil, // no dual writer
if err != nil {
return nil, err
if err := aggregatorServer.GenericAPIServer.InstallAPIGroup(serviceAPIGroupInfo); err != nil {
return nil, err
return aggregatorServer, nil
func makeAPIService(gv schema.GroupVersion) *v1.APIService {
apiServicePriority, ok := APIVersionPriorities[gv]
if !ok {
// if we aren't found, then we shouldn't register ourselves because it could result in a CRD group version
// being permanently stuck in the APIServices list.
klog.Infof("Skipping APIService creation for %v", gv)
return nil
return &v1.APIService{
ObjectMeta: metav1.ObjectMeta{Name: gv.Version + "." + gv.Group},
Spec: v1.APIServiceSpec{
Group: gv.Group,
Version: gv.Version,
GroupPriorityMinimum: apiServicePriority.Group,
VersionPriority: apiServicePriority.Version,
// makeAPIServiceAvailableHealthCheck returns a healthz check that returns healthy
// once all of the specified services have been observed to be available at least once.
func makeAPIServiceAvailableHealthCheck(name string, apiServices []*v1.APIService, apiServiceInformer apiregistrationInformers.APIServiceInformer) healthz.HealthChecker {
// Track the auto-registered API services that have not been observed to be available yet
pendingServiceNamesLock := &sync.RWMutex{}
pendingServiceNames := sets.NewString()
for _, service := range apiServices {
// When an APIService in the list is seen as available, remove it from the pending list
handleAPIServiceChange := func(service *v1.APIService) {
defer pendingServiceNamesLock.Unlock()
if !pendingServiceNames.Has(service.Name) {
if v1helper.IsAPIServiceConditionTrue(service, v1.Available) {
// Watch add/update events for APIServices
_, err := apiServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { handleAPIServiceChange(obj.(*v1.APIService)) },
UpdateFunc: func(old, new interface{}) { handleAPIServiceChange(new.(*v1.APIService)) },
if err != nil {
klog.Errorf("Failed to watch APIServices for health check: %v", err)
// Don't return healthy until the pending list is empty
return healthz.NamedCheck(name, func(r *http.Request) error {
defer pendingServiceNamesLock.RUnlock()
if pendingServiceNames.Len() > 0 {
klog.Error("APIServices not yet available", "services", pendingServiceNames.List())
return fmt.Errorf("missing APIService: %v", pendingServiceNames.List())
return nil
// Priority defines group Priority that is used in discovery. This controls
// group position in the kubectl output.
type Priority struct {
// Group indicates the order of the Group relative to other groups.
Group int32
// Version indicates the relative order of the Version inside of its group.
Version int32
// APIVersionPriorities are the proper way to resolve this letting the aggregator know the desired group and version-within-group order of the underlying servers
// is to refactor the genericapiserver.DelegationTarget to include a list of priorities based on which APIs were installed.
// This requires the APIGroupInfo struct to evolve and include the concept of priorities and to avoid mistakes, the core storage map there needs to be updated.
// That ripples out every bit as far as you'd expect, so for 1.7 we'll include the list here instead of being built up during storage.
var APIVersionPriorities = map[schema.GroupVersion]Priority{
{Group: "", Version: "v1"}: {Group: 18000, Version: 1},
// to my knowledge, nothing below here collides
{Group: "", Version: "v1"}: {Group: 16700, Version: 15},
{Group: "", Version: "v1beta1"}: {Group: 16700, Version: 12},
{Group: "", Version: "v1alpha1"}: {Group: 16700, Version: 9},
// Append a new group to the end of the list if unsure.
// You can use min(existing group)-100 as the initial value for a group.
// Version can be set to 9 (to have space around) for a new group.
func addRemoteAPIServicesToRegister(config *RemoteServicesConfig, registration autoregister.AutoAPIServiceRegistration) {
for i, service := range config.Services {
port := service.Port
apiService := &v1.APIService{
ObjectMeta: metav1.ObjectMeta{Name: service.Version + "." + service.Group},
Spec: v1.APIServiceSpec{
Group: service.Group,
Version: service.Version,
InsecureSkipTLSVerify: config.InsecureSkipTLSVerify,
CABundle: config.CABundle,
// TODO: Group priority minimum of 1000 more than for local services, figure out a better story
// when we have multiple versions, potentially running in heterogeneous ways (local and remote)
GroupPriorityMinimum: 16000,
VersionPriority: 1 + int32(i),
Service: &v1.ServiceReference{
Name: service.Version + "." + service.Group,
Namespace: config.ExternalNamesNamespace,
Port: &port,
func getRemoteExternalNamesToRegister(config *RemoteServicesConfig) []*servicev0alpha1applyconfiguration.ExternalNameApplyConfiguration {
externalNames := make([]*servicev0alpha1applyconfiguration.ExternalNameApplyConfiguration, 0)
for _, service := range config.Services {
host := service.Host
name := service.Version + "." + service.Group
externalName := &servicev0alpha1applyconfiguration.ExternalNameApplyConfiguration{}
Host: &host,
externalNames = append(externalNames, externalName)
return externalNames
func apiServicesToRegister(delegateAPIServer genericapiserver.DelegationTarget, registration autoregister.AutoAPIServiceRegistration) []*v1.APIService {
apiServices := []*v1.APIService{}
for _, curr := range delegateAPIServer.ListedPaths() {
if curr == "/api/v1" {
apiService := makeAPIService(schema.GroupVersion{Group: "", Version: "v1"})
apiServices = append(apiServices, apiService)
if !strings.HasPrefix(curr, "/apis/") {
// this comes back in a list that looks like /apis/
tokens := strings.Split(curr, "/")
if len(tokens) != 4 {
apiService := makeAPIService(schema.GroupVersion{Group: tokens[2], Version: tokens[3]})
if apiService == nil {
apiServices = append(apiServices, apiService)
return apiServices
// NOTE: below function imported from
// createProxyTransport creates the dialer infrastructure to connect to the api servers.
func createProxyTransport() *http.Transport {
// NOTE: We don't set proxyDialerFn but the below SetTransportDefaults will
// See
var proxyDialerFn utilnet.DialFunc
// Proxying to services is IP-based... don't expect to be able to verify the hostname
proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true}
proxyTransport := utilnet.SetTransportDefaults(&http.Transport{
DialContext: proxyDialerFn,
TLSClientConfig: proxyTLSClientConfig,
return proxyTransport