mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
K8s: Refactor config/options for aggregation (#81739)
This commit is contained in:
47
pkg/services/apiserver/aggregator/README.md
Normal file
47
pkg/services/apiserver/aggregator/README.md
Normal file
@@ -0,0 +1,47 @@
|
||||
# aggregator
|
||||
|
||||
This is a package that is intended to power the aggregation of microservices within Grafana. The concept
|
||||
as well as implementation is largely borrowed from [kube-aggregator](https://github.com/kubernetes/kube-aggregator).
|
||||
|
||||
## Why aggregate services?
|
||||
|
||||
Grafana's future architecture will entail the same API Server design as that of Kubernetes API Servers. API Servers
|
||||
provide a standard way of stitching together API Groups through discovery and shared routing patterns that allows
|
||||
them to aggregate to a parent API Server in a seamless manner. Since we desire to break Grafana monolith up into
|
||||
more functionally divided microservices, aggregation does the job of still being able to provide these services
|
||||
under a single address. Other benefits of aggregation include free health checks and being able to independently
|
||||
roll out features for each service without downtime.
|
||||
|
||||
To read more about the concept, see
|
||||
[here](https://kubernetes.io/docs/tasks/extend-kubernetes/setup-extension-api-server/).
|
||||
|
||||
Note that, this aggregation will be a totally internal detail to Grafana. External fully functional APIServers that
|
||||
may themselves act as parent API Servers to Grafana will never be made aware of them. Any of the `APIService`
|
||||
related to Grafana Groups registered in a real K8s environment will take the address of Grafana's
|
||||
parent server (which will bundle grafana-aggregator).
|
||||
|
||||
### kube-aggregator versus grafana-aggregator
|
||||
|
||||
The `grafana-aggregator` component will work similarly to how `kube-aggregator` works for `kube-apiserver`, the major
|
||||
difference being that it doesn't require core V1 APIs such as `Service`. Early on, we decided to not have core V1
|
||||
APIs in the root Grafana API Server. In order to still be able to implement aggregation, we do the following in this Go
|
||||
package:
|
||||
|
||||
1. We do not start the core shared informer factories as well as any default controllers that utilize them.
|
||||
This is achieved using `DisabledPostStartHooks` facility under the GenericAPIServer's RecommendedConfig.
|
||||
2. We provide an `externalname` Kind API implementation under `service.grafana.app` group which works functionally
|
||||
equivalent to the idea with the same name under `core/v1/Service`.
|
||||
3. Lastly, we swap the default available condition controller with the custom one written by us. This one is based on
|
||||
our `externalname` (`service.grafana.app`) implementation. We register separate `PostStartHooks`
|
||||
using `AddPostStartHookOrDie` on the GenericAPIServer to start the corresponding custom controller as well as
|
||||
requisite informer factories for our own `externalname` Kind.
|
||||
4. For now, we bundle apiextensions-apiserver under our aggregator component. This is slightly different from K8s
|
||||
where kube-apiserver is called the top-level component and controlplane, aggregator and apiextensions-apiserver
|
||||
live under that instead.
|
||||
|
||||
### Gotchas (Pay Attention)
|
||||
|
||||
1. `grafana-aggregator` uses file storage under `data/grafana-aggregator` (`apiregistration.k8s.io`,
|
||||
`service.grafana.app`) and `data/grafana-apiextensions` (`apiextensions.k8s.io`).
|
||||
2. Since `grafana-aggregator` outputs configuration (TLS and kubeconfig) that is used in the invocation of aggregated
|
||||
servers, ensure you start the aggregated service after launching the aggregator during local development.
|
||||
285
pkg/services/apiserver/aggregator/aggregator.go
Normal file
285
pkg/services/apiserver/aggregator/aggregator.go
Normal file
@@ -0,0 +1,285 @@
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
// Provenance-includes-location: https://github.com/kubernetes/kubernetes/blob/master/cmd/kube-apiserver/app/aggregator.go
|
||||
// Provenance-includes-license: Apache-2.0
|
||||
// Provenance-includes-copyright: The Kubernetes Authors.
|
||||
// Provenance-includes-location: https://github.com/kubernetes/kubernetes/blob/master/cmd/kube-apiserver/app/server.go
|
||||
// Provenance-includes-license: Apache-2.0
|
||||
// Provenance-includes-copyright: The Kubernetes Authors.
|
||||
// Provenance-includes-location: https://github.com/kubernetes/kubernetes/blob/master/pkg/controlplane/apiserver/apiextensions.go
|
||||
// Provenance-includes-license: Apache-2.0
|
||||
// Provenance-includes-copyright: The Kubernetes Authors.
|
||||
|
||||
package aggregator
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/klog/v2"
|
||||
v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
|
||||
v1helper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
|
||||
aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
|
||||
apiregistrationclientset "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
|
||||
apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1"
|
||||
apiregistrationInformers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1"
|
||||
"k8s.io/kube-aggregator/pkg/controllers/autoregister"
|
||||
|
||||
serviceclientset "github.com/grafana/grafana/pkg/generated/clientset/versioned"
|
||||
informersv0alpha1 "github.com/grafana/grafana/pkg/generated/informers/externalversions"
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/options"
|
||||
)
|
||||
|
||||
func CreateAggregatorConfig(commandOptions *options.Options, sharedConfig genericapiserver.RecommendedConfig) (*aggregatorapiserver.Config, informersv0alpha1.SharedInformerFactory, error) {
|
||||
// Create a fake clientset and informers for the k8s v1 API group.
|
||||
// These are not used in grafana's aggregator because v1 APIs are not available.
|
||||
fakev1Informers := informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 10*time.Minute)
|
||||
|
||||
serviceClient, err := serviceclientset.NewForConfig(sharedConfig.LoopbackClientConfig)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
sharedInformerFactory := informersv0alpha1.NewSharedInformerFactory(
|
||||
serviceClient,
|
||||
5*time.Minute, // this is effectively used as a refresh interval right now. Might want to do something nicer later on.
|
||||
)
|
||||
serviceResolver := NewExternalNameResolver(sharedInformerFactory.Service().V0alpha1().ExternalNames().Lister())
|
||||
|
||||
aggregatorConfig := &aggregatorapiserver.Config{
|
||||
GenericConfig: &genericapiserver.RecommendedConfig{
|
||||
Config: sharedConfig.Config,
|
||||
SharedInformerFactory: fakev1Informers,
|
||||
ClientConfig: sharedConfig.LoopbackClientConfig,
|
||||
},
|
||||
ExtraConfig: aggregatorapiserver.ExtraConfig{
|
||||
ProxyClientCertFile: commandOptions.AggregatorOptions.ProxyClientCertFile,
|
||||
ProxyClientKeyFile: commandOptions.AggregatorOptions.ProxyClientKeyFile,
|
||||
// NOTE: while ProxyTransport can be skipped in the configuration, it allows honoring
|
||||
// DISABLE_HTTP2, HTTPS_PROXY and NO_PROXY env vars as needed
|
||||
ProxyTransport: createProxyTransport(),
|
||||
ServiceResolver: serviceResolver,
|
||||
},
|
||||
}
|
||||
|
||||
if err := commandOptions.AggregatorOptions.ApplyTo(aggregatorConfig, commandOptions.RecommendedOptions.Etcd, commandOptions.StorageOptions.DataPath); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return aggregatorConfig, sharedInformerFactory, nil
|
||||
}
|
||||
|
||||
func CreateAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, sharedInformerFactory informersv0alpha1.SharedInformerFactory, delegateAPIServer genericapiserver.DelegationTarget) (*aggregatorapiserver.APIAggregator, error) {
|
||||
completedConfig := aggregatorConfig.Complete()
|
||||
aggregatorServer, err := completedConfig.NewWithDelegate(delegateAPIServer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// create controllers for auto-registration
|
||||
apiRegistrationClient, err := apiregistrationclient.NewForConfig(completedConfig.GenericConfig.LoopbackClientConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
autoRegistrationController := autoregister.NewAutoRegisterController(aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(), apiRegistrationClient)
|
||||
apiServices := apiServicesToRegister(delegateAPIServer, autoRegistrationController)
|
||||
|
||||
// Imbue all builtin group-priorities onto the aggregated discovery
|
||||
if completedConfig.GenericConfig.AggregatedDiscoveryGroupManager != nil {
|
||||
for gv, entry := range APIVersionPriorities {
|
||||
completedConfig.GenericConfig.AggregatedDiscoveryGroupManager.SetGroupVersionPriority(metav1.GroupVersion(gv), int(entry.Group), int(entry.Version))
|
||||
}
|
||||
}
|
||||
|
||||
err = aggregatorServer.GenericAPIServer.AddPostStartHook("grafana-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error {
|
||||
go func() {
|
||||
autoRegistrationController.Run(5, context.StopCh)
|
||||
}()
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = aggregatorServer.GenericAPIServer.AddBootSequenceHealthChecks(
|
||||
makeAPIServiceAvailableHealthCheck(
|
||||
"autoregister-completion",
|
||||
apiServices,
|
||||
aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(),
|
||||
),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
apiregistrationClient, err := apiregistrationclientset.NewForConfig(completedConfig.GenericConfig.LoopbackClientConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
availableController, err := NewAvailableConditionController(
|
||||
aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(),
|
||||
sharedInformerFactory.Service().V0alpha1().ExternalNames(),
|
||||
apiregistrationClient.ApiregistrationV1(),
|
||||
nil,
|
||||
(func() ([]byte, []byte))(nil),
|
||||
completedConfig.ExtraConfig.ServiceResolver,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
aggregatorServer.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-override-available-controller", func(context genericapiserver.PostStartHookContext) error {
|
||||
// if we end up blocking for long periods of time, we may need to increase workers.
|
||||
go availableController.Run(5, context.StopCh)
|
||||
return nil
|
||||
})
|
||||
|
||||
aggregatorServer.GenericAPIServer.AddPostStartHookOrDie("start-grafana-aggregator-informers", func(context genericapiserver.PostStartHookContext) error {
|
||||
sharedInformerFactory.Start(context.StopCh)
|
||||
aggregatorServer.APIRegistrationInformers.Start(context.StopCh)
|
||||
return nil
|
||||
})
|
||||
|
||||
return aggregatorServer, nil
|
||||
}
|
||||
|
||||
func makeAPIService(gv schema.GroupVersion) *v1.APIService {
|
||||
apiServicePriority, ok := APIVersionPriorities[gv]
|
||||
if !ok {
|
||||
// if we aren't found, then we shouldn't register ourselves because it could result in a CRD group version
|
||||
// being permanently stuck in the APIServices list.
|
||||
klog.Infof("Skipping APIService creation for %v", gv)
|
||||
return nil
|
||||
}
|
||||
return &v1.APIService{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: gv.Version + "." + gv.Group},
|
||||
Spec: v1.APIServiceSpec{
|
||||
Group: gv.Group,
|
||||
Version: gv.Version,
|
||||
GroupPriorityMinimum: apiServicePriority.Group,
|
||||
VersionPriority: apiServicePriority.Version,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// makeAPIServiceAvailableHealthCheck returns a healthz check that returns healthy
|
||||
// once all of the specified services have been observed to be available at least once.
|
||||
func makeAPIServiceAvailableHealthCheck(name string, apiServices []*v1.APIService, apiServiceInformer apiregistrationInformers.APIServiceInformer) healthz.HealthChecker {
|
||||
// Track the auto-registered API services that have not been observed to be available yet
|
||||
pendingServiceNamesLock := &sync.RWMutex{}
|
||||
pendingServiceNames := sets.NewString()
|
||||
for _, service := range apiServices {
|
||||
pendingServiceNames.Insert(service.Name)
|
||||
}
|
||||
|
||||
// When an APIService in the list is seen as available, remove it from the pending list
|
||||
handleAPIServiceChange := func(service *v1.APIService) {
|
||||
pendingServiceNamesLock.Lock()
|
||||
defer pendingServiceNamesLock.Unlock()
|
||||
if !pendingServiceNames.Has(service.Name) {
|
||||
return
|
||||
}
|
||||
if v1helper.IsAPIServiceConditionTrue(service, v1.Available) {
|
||||
pendingServiceNames.Delete(service.Name)
|
||||
}
|
||||
}
|
||||
|
||||
// Watch add/update events for APIServices
|
||||
_, _ = apiServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) { handleAPIServiceChange(obj.(*v1.APIService)) },
|
||||
UpdateFunc: func(old, new interface{}) { handleAPIServiceChange(new.(*v1.APIService)) },
|
||||
})
|
||||
|
||||
// Don't return healthy until the pending list is empty
|
||||
return healthz.NamedCheck(name, func(r *http.Request) error {
|
||||
pendingServiceNamesLock.RLock()
|
||||
defer pendingServiceNamesLock.RUnlock()
|
||||
if pendingServiceNames.Len() > 0 {
|
||||
return fmt.Errorf("missing APIService: %v", pendingServiceNames.List())
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// Priority defines group Priority that is used in discovery. This controls
|
||||
// group position in the kubectl output.
|
||||
type Priority struct {
|
||||
// Group indicates the order of the Group relative to other groups.
|
||||
Group int32
|
||||
// Version indicates the relative order of the Version inside of its group.
|
||||
Version int32
|
||||
}
|
||||
|
||||
// APIVersionPriorities are the proper way to resolve this letting the aggregator know the desired group and version-within-group order of the underlying servers
|
||||
// is to refactor the genericapiserver.DelegationTarget to include a list of priorities based on which APIs were installed.
|
||||
// This requires the APIGroupInfo struct to evolve and include the concept of priorities and to avoid mistakes, the core storage map there needs to be updated.
|
||||
// That ripples out every bit as far as you'd expect, so for 1.7 we'll include the list here instead of being built up during storage.
|
||||
var APIVersionPriorities = map[schema.GroupVersion]Priority{
|
||||
{Group: "", Version: "v1"}: {Group: 18000, Version: 1},
|
||||
// to my knowledge, nothing below here collides
|
||||
{Group: "admissionregistration.k8s.io", Version: "v1"}: {Group: 16700, Version: 15},
|
||||
{Group: "admissionregistration.k8s.io", Version: "v1beta1"}: {Group: 16700, Version: 12},
|
||||
{Group: "admissionregistration.k8s.io", Version: "v1alpha1"}: {Group: 16700, Version: 9},
|
||||
// Append a new group to the end of the list if unsure.
|
||||
// You can use min(existing group)-100 as the initial value for a group.
|
||||
// Version can be set to 9 (to have space around) for a new group.
|
||||
}
|
||||
|
||||
func apiServicesToRegister(delegateAPIServer genericapiserver.DelegationTarget, registration autoregister.AutoAPIServiceRegistration) []*v1.APIService {
|
||||
apiServices := []*v1.APIService{}
|
||||
|
||||
for _, curr := range delegateAPIServer.ListedPaths() {
|
||||
if curr == "/api/v1" {
|
||||
apiService := makeAPIService(schema.GroupVersion{Group: "", Version: "v1"})
|
||||
registration.AddAPIServiceToSyncOnStart(apiService)
|
||||
apiServices = append(apiServices, apiService)
|
||||
continue
|
||||
}
|
||||
|
||||
if !strings.HasPrefix(curr, "/apis/") {
|
||||
continue
|
||||
}
|
||||
// this comes back in a list that looks like /apis/rbac.authorization.k8s.io/v1alpha1
|
||||
tokens := strings.Split(curr, "/")
|
||||
if len(tokens) != 4 {
|
||||
continue
|
||||
}
|
||||
|
||||
apiService := makeAPIService(schema.GroupVersion{Group: tokens[2], Version: tokens[3]})
|
||||
if apiService == nil {
|
||||
continue
|
||||
}
|
||||
registration.AddAPIServiceToSyncOnStart(apiService)
|
||||
apiServices = append(apiServices, apiService)
|
||||
}
|
||||
|
||||
return apiServices
|
||||
}
|
||||
|
||||
// NOTE: below function imported from https://github.com/kubernetes/kubernetes/blob/master/cmd/kube-apiserver/app/server.go#L197
|
||||
// createProxyTransport creates the dialer infrastructure to connect to the api servers.
|
||||
func createProxyTransport() *http.Transport {
|
||||
// NOTE: We don't set proxyDialerFn but the below SetTransportDefaults will
|
||||
// See https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/apimachinery/pkg/util/net/http.go#L109
|
||||
var proxyDialerFn utilnet.DialFunc
|
||||
// Proxying to services is IP-based... don't expect to be able to verify the hostname
|
||||
proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true}
|
||||
proxyTransport := utilnet.SetTransportDefaults(&http.Transport{
|
||||
DialContext: proxyDialerFn,
|
||||
TLSClientConfig: proxyTLSClientConfig,
|
||||
})
|
||||
return proxyTransport
|
||||
}
|
||||
466
pkg/services/apiserver/aggregator/availableController.go
Normal file
466
pkg/services/apiserver/aggregator/availableController.go
Normal file
@@ -0,0 +1,466 @@
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
// Provenance-includes-location: https://github.com/kubernetes/kube-aggregator/blob/master/pkg/controllers/status/available_controller.go
|
||||
// Provenance-includes-license: Apache-2.0
|
||||
// Provenance-includes-copyright: The Kubernetes Authors.
|
||||
|
||||
package aggregator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apis/service/v0alpha1"
|
||||
informersservicev0alpha1 "github.com/grafana/grafana/pkg/generated/informers/externalversions/service/v0alpha1"
|
||||
listersservicev0alpha1 "github.com/grafana/grafana/pkg/generated/listers/service/v0alpha1"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/equality"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/transport"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog/v2"
|
||||
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
|
||||
apiregistrationv1apihelper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
|
||||
apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1"
|
||||
informers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1"
|
||||
listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1"
|
||||
"k8s.io/kube-aggregator/pkg/controllers"
|
||||
)
|
||||
|
||||
type certKeyFunc func() ([]byte, []byte)
|
||||
|
||||
// ServiceResolver knows how to convert a service reference into an actual location.
|
||||
type ServiceResolver interface {
|
||||
ResolveEndpoint(namespace, name string, port int32) (*url.URL, error)
|
||||
}
|
||||
|
||||
// AvailableConditionController handles checking the availability of registered API services.
|
||||
type AvailableConditionController struct {
|
||||
apiServiceClient apiregistrationclient.APIServicesGetter
|
||||
|
||||
apiServiceLister listers.APIServiceLister
|
||||
apiServiceSynced cache.InformerSynced
|
||||
|
||||
// externalNameLister is used to get the IP to create the transport for
|
||||
externalNameLister listersservicev0alpha1.ExternalNameLister
|
||||
servicesSynced cache.InformerSynced
|
||||
|
||||
// proxyTransportDial specifies the dial function for creating unencrypted TCP connections.
|
||||
proxyTransportDial *transport.DialHolder
|
||||
proxyCurrentCertKeyContent certKeyFunc
|
||||
serviceResolver ServiceResolver
|
||||
|
||||
// To allow injection for testing.
|
||||
syncFn func(key string) error
|
||||
|
||||
queue workqueue.RateLimitingInterface
|
||||
// map from service-namespace -> service-name -> apiservice names
|
||||
cache map[string]map[string][]string
|
||||
// this lock protects operations on the above cache
|
||||
cacheLock sync.RWMutex
|
||||
}
|
||||
|
||||
// NewAvailableConditionController returns a new AvailableConditionController.
|
||||
func NewAvailableConditionController(
|
||||
apiServiceInformer informers.APIServiceInformer,
|
||||
externalNameInformer informersservicev0alpha1.ExternalNameInformer,
|
||||
apiServiceClient apiregistrationclient.APIServicesGetter,
|
||||
proxyTransportDial *transport.DialHolder,
|
||||
proxyCurrentCertKeyContent certKeyFunc,
|
||||
serviceResolver ServiceResolver,
|
||||
) (*AvailableConditionController, error) {
|
||||
c := &AvailableConditionController{
|
||||
apiServiceClient: apiServiceClient,
|
||||
apiServiceLister: apiServiceInformer.Lister(),
|
||||
externalNameLister: externalNameInformer.Lister(),
|
||||
serviceResolver: serviceResolver,
|
||||
queue: workqueue.NewNamedRateLimitingQueue(
|
||||
// We want a fairly tight requeue time. The controller listens to the API, but because it relies on the routability of the
|
||||
// service network, it is possible for an external, non-watchable factor to affect availability. This keeps
|
||||
// the maximum disruption time to a minimum, but it does prevent hot loops.
|
||||
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 30*time.Second),
|
||||
"AvailableConditionController"),
|
||||
proxyTransportDial: proxyTransportDial,
|
||||
proxyCurrentCertKeyContent: proxyCurrentCertKeyContent,
|
||||
}
|
||||
|
||||
// resync on this one because it is low cardinality and rechecking the actual discovery
|
||||
// allows us to detect health in a more timely fashion when network connectivity to
|
||||
// nodes is snipped, but the network still attempts to route there. See
|
||||
// https://github.com/openshift/origin/issues/17159#issuecomment-341798063
|
||||
apiServiceHandler, _ := apiServiceInformer.Informer().AddEventHandlerWithResyncPeriod(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.addAPIService,
|
||||
UpdateFunc: c.updateAPIService,
|
||||
DeleteFunc: c.deleteAPIService,
|
||||
},
|
||||
30*time.Second)
|
||||
c.apiServiceSynced = apiServiceHandler.HasSynced
|
||||
|
||||
serviceHandler, _ := externalNameInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.addService,
|
||||
UpdateFunc: c.updateService,
|
||||
DeleteFunc: c.deleteService,
|
||||
})
|
||||
c.servicesSynced = serviceHandler.HasSynced
|
||||
|
||||
c.syncFn = c.sync
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (c *AvailableConditionController) sync(key string) error {
|
||||
originalAPIService, err := c.apiServiceLister.Get(key)
|
||||
if apierrors.IsNotFound(err) {
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// if a particular transport was specified, use that otherwise build one
|
||||
// construct an http client that will ignore TLS verification (if someone owns the network and messes with your status
|
||||
// that's not so bad) and sets a very short timeout. This is a best effort GET that provides no additional information
|
||||
transportConfig := &transport.Config{
|
||||
TLS: transport.TLSConfig{
|
||||
Insecure: true,
|
||||
},
|
||||
DialHolder: c.proxyTransportDial,
|
||||
}
|
||||
|
||||
if c.proxyCurrentCertKeyContent != nil {
|
||||
proxyClientCert, proxyClientKey := c.proxyCurrentCertKeyContent()
|
||||
|
||||
transportConfig.TLS.CertData = proxyClientCert
|
||||
transportConfig.TLS.KeyData = proxyClientKey
|
||||
}
|
||||
restTransport, err := transport.New(transportConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
discoveryClient := &http.Client{
|
||||
Transport: restTransport,
|
||||
// the request should happen quickly.
|
||||
Timeout: 5 * time.Second,
|
||||
CheckRedirect: func(req *http.Request, via []*http.Request) error {
|
||||
return http.ErrUseLastResponse
|
||||
},
|
||||
}
|
||||
|
||||
apiService := originalAPIService.DeepCopy()
|
||||
|
||||
availableCondition := apiregistrationv1.APIServiceCondition{
|
||||
Type: apiregistrationv1.Available,
|
||||
Status: apiregistrationv1.ConditionTrue,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
}
|
||||
|
||||
// local API services are always considered available
|
||||
if apiService.Spec.Service == nil {
|
||||
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, apiregistrationv1apihelper.NewLocalAvailableAPIServiceCondition())
|
||||
_, err := c.updateAPIServiceStatus(originalAPIService, apiService)
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = c.externalNameLister.ExternalNames(apiService.Spec.Service.Namespace).Get(apiService.Spec.Service.Name)
|
||||
if apierrors.IsNotFound(err) {
|
||||
availableCondition.Status = apiregistrationv1.ConditionFalse
|
||||
availableCondition.Reason = "ServiceNotFound"
|
||||
availableCondition.Message = fmt.Sprintf("service/%s in %q is not present", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace)
|
||||
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
|
||||
_, err := c.updateAPIServiceStatus(originalAPIService, apiService)
|
||||
return err
|
||||
} else if err != nil {
|
||||
availableCondition.Status = apiregistrationv1.ConditionUnknown
|
||||
availableCondition.Reason = "ServiceAccessError"
|
||||
availableCondition.Message = fmt.Sprintf("service/%s in %q cannot be checked due to: %v", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, err)
|
||||
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
|
||||
_, err := c.updateAPIServiceStatus(originalAPIService, apiService)
|
||||
return err
|
||||
}
|
||||
|
||||
// actually try to hit the discovery endpoint when it isn't local and when we're routing as a service.
|
||||
if apiService.Spec.Service != nil && c.serviceResolver != nil {
|
||||
attempts := 5
|
||||
results := make(chan error, attempts)
|
||||
for i := 0; i < attempts; i++ {
|
||||
go func() {
|
||||
discoveryURL, err := c.serviceResolver.ResolveEndpoint(apiService.Spec.Service.Namespace, apiService.Spec.Service.Name, *apiService.Spec.Service.Port)
|
||||
if err != nil {
|
||||
results <- err
|
||||
return
|
||||
}
|
||||
// render legacyAPIService health check path when it is delegated to a service
|
||||
if apiService.Name == "v1." {
|
||||
discoveryURL.Path = "/api/" + apiService.Spec.Version
|
||||
} else {
|
||||
discoveryURL.Path = "/apis/" + apiService.Spec.Group + "/" + apiService.Spec.Version
|
||||
}
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
// be sure to check a URL that the aggregated API server is required to serve
|
||||
newReq, err := http.NewRequest("GET", discoveryURL.String(), nil)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
// setting the system-masters identity ensures that we will always have access rights
|
||||
transport.SetAuthProxyHeaders(newReq, "system:kube-aggregator", []string{"system:masters"}, nil)
|
||||
resp, err := discoveryClient.Do(newReq)
|
||||
if resp != nil {
|
||||
_ = resp.Body.Close()
|
||||
// we should always been in the 200s or 300s
|
||||
if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices {
|
||||
errCh <- fmt.Errorf("bad status from %v: %v", discoveryURL, resp.StatusCode)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
errCh <- err
|
||||
}()
|
||||
|
||||
select {
|
||||
case err = <-errCh:
|
||||
if err != nil {
|
||||
results <- fmt.Errorf("failing or missing response from %v: %v", discoveryURL, err)
|
||||
return
|
||||
}
|
||||
|
||||
// we had trouble with slow dial and DNS responses causing us to wait too long.
|
||||
// we added this as insurance
|
||||
case <-time.After(6 * time.Second):
|
||||
results <- fmt.Errorf("timed out waiting for %v", discoveryURL)
|
||||
return
|
||||
}
|
||||
|
||||
results <- nil
|
||||
}()
|
||||
}
|
||||
|
||||
var lastError error
|
||||
for i := 0; i < attempts; i++ {
|
||||
lastError = <-results
|
||||
// if we had at least one success, we are successful overall and we can return now
|
||||
if lastError == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if lastError != nil {
|
||||
availableCondition.Status = apiregistrationv1.ConditionFalse
|
||||
availableCondition.Reason = "FailedDiscoveryCheck"
|
||||
availableCondition.Message = lastError.Error()
|
||||
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
|
||||
_, updateErr := c.updateAPIServiceStatus(originalAPIService, apiService)
|
||||
if updateErr != nil {
|
||||
return updateErr
|
||||
}
|
||||
// force a requeue to make it very obvious that this will be retried at some point in the future
|
||||
// along with other requeues done via service change, endpoint change, and resync
|
||||
return lastError
|
||||
}
|
||||
}
|
||||
|
||||
availableCondition.Reason = "Passed"
|
||||
availableCondition.Message = "all checks passed"
|
||||
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
|
||||
_, err = c.updateAPIServiceStatus(originalAPIService, apiService)
|
||||
return err
|
||||
}
|
||||
|
||||
// updateAPIServiceStatus only issues an update if a change is detected. We have a tight resync loop to quickly detect dead
|
||||
// apiservices. Doing that means we don't want to quickly issue no-op updates.
|
||||
func (c *AvailableConditionController) updateAPIServiceStatus(originalAPIService, newAPIService *apiregistrationv1.APIService) (*apiregistrationv1.APIService, error) {
|
||||
if equality.Semantic.DeepEqual(originalAPIService.Status, newAPIService.Status) {
|
||||
return newAPIService, nil
|
||||
}
|
||||
|
||||
orig := apiregistrationv1apihelper.GetAPIServiceConditionByType(originalAPIService, apiregistrationv1.Available)
|
||||
now := apiregistrationv1apihelper.GetAPIServiceConditionByType(newAPIService, apiregistrationv1.Available)
|
||||
unknown := apiregistrationv1.APIServiceCondition{
|
||||
Type: apiregistrationv1.Available,
|
||||
Status: apiregistrationv1.ConditionUnknown,
|
||||
}
|
||||
if orig == nil {
|
||||
orig = &unknown
|
||||
}
|
||||
if now == nil {
|
||||
now = &unknown
|
||||
}
|
||||
if *orig != *now {
|
||||
klog.V(2).InfoS("changing APIService availability", "name", newAPIService.Name, "oldStatus", orig.Status, "newStatus", now.Status, "message", now.Message, "reason", now.Reason)
|
||||
}
|
||||
|
||||
newAPIService, err := c.apiServiceClient.APIServices().UpdateStatus(context.TODO(), newAPIService, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return newAPIService, nil
|
||||
}
|
||||
|
||||
// Run starts the AvailableConditionController loop which manages the availability condition of API services.
|
||||
func (c *AvailableConditionController) Run(workers int, stopCh <-chan struct{}) {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer c.queue.ShutDown()
|
||||
|
||||
klog.Info("Starting AvailableConditionController")
|
||||
defer klog.Info("Shutting down AvailableConditionController")
|
||||
|
||||
// This waits not just for the informers to sync, but for our handlers
|
||||
// to be called; since the handlers are three different ways of
|
||||
// enqueueing the same thing, waiting for this permits the queue to
|
||||
// maximally de-duplicate the entries.
|
||||
if !controllers.WaitForCacheSync("AvailableConditionCOverrideController", stopCh, c.apiServiceSynced, c.servicesSynced) {
|
||||
return
|
||||
}
|
||||
|
||||
for i := 0; i < workers; i++ {
|
||||
go wait.Until(c.runWorker, time.Second, stopCh)
|
||||
}
|
||||
|
||||
<-stopCh
|
||||
}
|
||||
|
||||
func (c *AvailableConditionController) runWorker() {
|
||||
for c.processNextWorkItem() {
|
||||
}
|
||||
}
|
||||
|
||||
// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
|
||||
func (c *AvailableConditionController) processNextWorkItem() bool {
|
||||
key, quit := c.queue.Get()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
defer c.queue.Done(key)
|
||||
|
||||
err := c.syncFn(key.(string))
|
||||
if err == nil {
|
||||
c.queue.Forget(key)
|
||||
return true
|
||||
}
|
||||
|
||||
utilruntime.HandleError(fmt.Errorf("%v failed with: %v", key, err))
|
||||
c.queue.AddRateLimited(key)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *AvailableConditionController) addAPIService(obj interface{}) {
|
||||
castObj := obj.(*apiregistrationv1.APIService)
|
||||
klog.V(4).Infof("Adding %s", castObj.Name)
|
||||
if castObj.Spec.Service != nil {
|
||||
c.rebuildAPIServiceCache()
|
||||
}
|
||||
c.queue.Add(castObj.Name)
|
||||
}
|
||||
|
||||
func (c *AvailableConditionController) updateAPIService(oldObj, newObj interface{}) {
|
||||
castObj := newObj.(*apiregistrationv1.APIService)
|
||||
oldCastObj := oldObj.(*apiregistrationv1.APIService)
|
||||
klog.V(4).Infof("Updating %s", oldCastObj.Name)
|
||||
if !reflect.DeepEqual(castObj.Spec.Service, oldCastObj.Spec.Service) {
|
||||
c.rebuildAPIServiceCache()
|
||||
}
|
||||
c.queue.Add(oldCastObj.Name)
|
||||
}
|
||||
|
||||
func (c *AvailableConditionController) deleteAPIService(obj interface{}) {
|
||||
castObj, ok := obj.(*apiregistrationv1.APIService)
|
||||
if !ok {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
klog.Errorf("Couldn't get object from tombstone %#v", obj)
|
||||
return
|
||||
}
|
||||
castObj, ok = tombstone.Obj.(*apiregistrationv1.APIService)
|
||||
if !ok {
|
||||
klog.Errorf("Tombstone contained object that is not expected %#v", obj)
|
||||
return
|
||||
}
|
||||
}
|
||||
klog.V(4).Infof("Deleting %q", castObj.Name)
|
||||
if castObj.Spec.Service != nil {
|
||||
c.rebuildAPIServiceCache()
|
||||
}
|
||||
c.queue.Add(castObj.Name)
|
||||
}
|
||||
|
||||
func (c *AvailableConditionController) getAPIServicesFor(obj runtime.Object) []string {
|
||||
metadata, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
return nil
|
||||
}
|
||||
c.cacheLock.RLock()
|
||||
defer c.cacheLock.RUnlock()
|
||||
return c.cache[metadata.GetNamespace()][metadata.GetName()]
|
||||
}
|
||||
|
||||
// if the service/endpoint handler wins the race against the cache rebuilding, it may queue a no-longer-relevant apiservice
|
||||
// (which will get processed an extra time - this doesn't matter),
|
||||
// and miss a newly relevant apiservice (which will get queued by the apiservice handler)
|
||||
func (c *AvailableConditionController) rebuildAPIServiceCache() {
|
||||
apiServiceList, _ := c.apiServiceLister.List(labels.Everything())
|
||||
newCache := map[string]map[string][]string{}
|
||||
for _, apiService := range apiServiceList {
|
||||
if apiService.Spec.Service == nil {
|
||||
continue
|
||||
}
|
||||
if newCache[apiService.Spec.Service.Namespace] == nil {
|
||||
newCache[apiService.Spec.Service.Namespace] = map[string][]string{}
|
||||
}
|
||||
newCache[apiService.Spec.Service.Namespace][apiService.Spec.Service.Name] = append(newCache[apiService.Spec.Service.Namespace][apiService.Spec.Service.Name], apiService.Name)
|
||||
}
|
||||
|
||||
c.cacheLock.Lock()
|
||||
defer c.cacheLock.Unlock()
|
||||
c.cache = newCache
|
||||
}
|
||||
|
||||
// TODO, think of a way to avoid checking on every service manipulation
|
||||
|
||||
func (c *AvailableConditionController) addService(obj interface{}) {
|
||||
for _, apiService := range c.getAPIServicesFor(obj.(*v0alpha1.ExternalName)) {
|
||||
c.queue.Add(apiService)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *AvailableConditionController) updateService(obj, _ interface{}) {
|
||||
for _, apiService := range c.getAPIServicesFor(obj.(*v0alpha1.ExternalName)) {
|
||||
c.queue.Add(apiService)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *AvailableConditionController) deleteService(obj interface{}) {
|
||||
castObj, ok := obj.(*v0alpha1.ExternalName)
|
||||
if !ok {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
klog.Errorf("Couldn't get object from tombstone %#v", obj)
|
||||
return
|
||||
}
|
||||
castObj, ok = tombstone.Obj.(*v0alpha1.ExternalName)
|
||||
if !ok {
|
||||
klog.Errorf("Tombstone contained object that is not expected %#v", obj)
|
||||
return
|
||||
}
|
||||
}
|
||||
for _, apiService := range c.getAPIServicesFor(castObj) {
|
||||
c.queue.Add(apiService)
|
||||
}
|
||||
}
|
||||
32
pkg/services/apiserver/aggregator/resolver.go
Normal file
32
pkg/services/apiserver/aggregator/resolver.go
Normal file
@@ -0,0 +1,32 @@
|
||||
package aggregator
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/url"
|
||||
|
||||
"k8s.io/kube-aggregator/pkg/apiserver"
|
||||
|
||||
servicelistersv0alpha1 "github.com/grafana/grafana/pkg/generated/listers/service/v0alpha1"
|
||||
)
|
||||
|
||||
func NewExternalNameResolver(externalNames servicelistersv0alpha1.ExternalNameLister) apiserver.ServiceResolver {
|
||||
return &externalNameResolver{
|
||||
externalNames: externalNames,
|
||||
}
|
||||
}
|
||||
|
||||
type externalNameResolver struct {
|
||||
externalNames servicelistersv0alpha1.ExternalNameLister
|
||||
}
|
||||
|
||||
func (r *externalNameResolver) ResolveEndpoint(namespace, name string, port int32) (*url.URL, error) {
|
||||
extName, err := r.externalNames.ExternalNames(namespace).Get(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &url.URL{
|
||||
Scheme: "https",
|
||||
Host: net.JoinHostPort(extName.Spec.Host, fmt.Sprintf("%d", port)),
|
||||
}, nil
|
||||
}
|
||||
Reference in New Issue
Block a user