mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
K8s: Data Plane Aggregator (#91228)
This commit is contained in:
205
pkg/aggregator/apiserver/dataplaneservice_controller.go
Normal file
205
pkg/aggregator/apiserver/dataplaneservice_controller.go
Normal file
@@ -0,0 +1,205 @@
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
// Provenance-includes-location: https://github.com/kubernetes/kube-aggregator/blob/master/pkg/apiserver/apiservice_controller.go
|
||||
// Provenance-includes-license: Apache-2.0
|
||||
// Provenance-includes-copyright: The Kubernetes Authors.
|
||||
|
||||
package apiserver
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/server/dynamiccertificates"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
v0alpha1 "github.com/grafana/grafana/pkg/aggregator/apis/aggregation/v0alpha1"
|
||||
informers "github.com/grafana/grafana/pkg/aggregator/generated/informers/externalversions/aggregation/v0alpha1"
|
||||
listers "github.com/grafana/grafana/pkg/aggregator/generated/listers/aggregation/v0alpha1"
|
||||
)
|
||||
|
||||
// DataPlaneHandlerManager defines the behaviour that an API handler should have.
|
||||
type DataPlaneHandlerManager interface {
|
||||
AddDataPlaneService(dataPlaneService *v0alpha1.DataPlaneService) error
|
||||
RemoveDataPlaneService(dataPlaneServiceName string)
|
||||
}
|
||||
|
||||
// DataPlaneServiceRegistrationController is responsible for registering and removing API services.
|
||||
type DataPlaneServiceRegistrationController struct {
|
||||
dataPlaneHandlerManager DataPlaneHandlerManager
|
||||
|
||||
dataPlaneServiceLister listers.DataPlaneServiceLister
|
||||
dataPlaneServiceSynced cache.InformerSynced
|
||||
|
||||
// To allow injection for testing.
|
||||
syncFn func(key string) error
|
||||
|
||||
queue workqueue.TypedRateLimitingInterface[string]
|
||||
}
|
||||
|
||||
var _ dynamiccertificates.Listener = &DataPlaneServiceRegistrationController{}
|
||||
|
||||
// NewDataPlaneServiceRegistrationController returns a new DataPlaneServiceRegistrationController.
|
||||
func NewDataPlaneServiceRegistrationController(dataPlaneServiceInformer informers.DataPlaneServiceInformer, dataPlaneHandlerManager DataPlaneHandlerManager) *DataPlaneServiceRegistrationController {
|
||||
c := &DataPlaneServiceRegistrationController{
|
||||
dataPlaneHandlerManager: dataPlaneHandlerManager,
|
||||
dataPlaneServiceLister: dataPlaneServiceInformer.Lister(),
|
||||
dataPlaneServiceSynced: dataPlaneServiceInformer.Informer().HasSynced,
|
||||
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
|
||||
// We want a fairly tight requeue time. The controller listens to the API, but because it relies on the routability of the
|
||||
// service network, it is possible for an external, non-watchable factor to affect availability. This keeps
|
||||
// the maximum disruption time to a minimum, but it does prevent hot loops.
|
||||
workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 30*time.Second),
|
||||
workqueue.TypedRateLimitingQueueConfig[string]{Name: "DataPlaneServiceRegistrationController"},
|
||||
),
|
||||
}
|
||||
|
||||
_, err := dataPlaneServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.addDataPlaneService,
|
||||
UpdateFunc: c.updateDataPlaneService,
|
||||
DeleteFunc: c.deleteDataPlaneService,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to register event handler for DataPlaneService: %v", err)
|
||||
}
|
||||
|
||||
c.syncFn = c.sync
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *DataPlaneServiceRegistrationController) sync(key string) error {
|
||||
dataPlaneService, err := c.dataPlaneServiceLister.Get(key)
|
||||
if apierrors.IsNotFound(err) {
|
||||
c.dataPlaneHandlerManager.RemoveDataPlaneService(key)
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return c.dataPlaneHandlerManager.AddDataPlaneService(dataPlaneService)
|
||||
}
|
||||
|
||||
// Run starts DataPlaneServiceRegistrationController which will process all registration requests until stopCh is closed.
|
||||
func (c *DataPlaneServiceRegistrationController) Run(ctx context.Context, handlerSyncedCh chan<- struct{}) {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer c.queue.ShutDown()
|
||||
|
||||
klog.Info("Starting DataPlaneServiceRegistrationController")
|
||||
defer klog.Info("Shutting down DataPlaneServiceRegistrationController")
|
||||
|
||||
if !cache.WaitForCacheSync(ctx.Done(), c.dataPlaneServiceSynced) {
|
||||
return
|
||||
}
|
||||
|
||||
// initially sync all DataPlaneServices to make sure the proxy handler is complete
|
||||
err := wait.PollUntilContextCancel(ctx, time.Second, true, func(context.Context) (bool, error) {
|
||||
services, err := c.dataPlaneServiceLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("failed to initially list DataPlaneServices: %v", err))
|
||||
return false, nil
|
||||
}
|
||||
for _, s := range services {
|
||||
if err := c.dataPlaneHandlerManager.AddDataPlaneService(s); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("failed to initially sync DataPlaneService %s: %v", s.Name, err))
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
return
|
||||
}
|
||||
close(handlerSyncedCh)
|
||||
|
||||
// only start one worker thread since its a slow moving API and the aggregation server adding bits
|
||||
// aren't threadsafe
|
||||
go wait.Until(c.runWorker, time.Second, ctx.Done())
|
||||
|
||||
<-ctx.Done()
|
||||
}
|
||||
|
||||
func (c *DataPlaneServiceRegistrationController) runWorker() {
|
||||
for c.processNextWorkItem() {
|
||||
}
|
||||
}
|
||||
|
||||
// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
|
||||
func (c *DataPlaneServiceRegistrationController) processNextWorkItem() bool {
|
||||
key, quit := c.queue.Get()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
defer c.queue.Done(key)
|
||||
|
||||
err := c.syncFn(key)
|
||||
if err == nil {
|
||||
c.queue.Forget(key)
|
||||
return true
|
||||
}
|
||||
|
||||
utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
|
||||
c.queue.AddRateLimited(key)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *DataPlaneServiceRegistrationController) enqueueInternal(obj *v0alpha1.DataPlaneService) {
|
||||
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
|
||||
if err != nil {
|
||||
klog.Errorf("Couldn't get key for object %#v: %v", obj, err)
|
||||
return
|
||||
}
|
||||
|
||||
c.queue.Add(key)
|
||||
}
|
||||
|
||||
func (c *DataPlaneServiceRegistrationController) addDataPlaneService(obj interface{}) {
|
||||
castObj := obj.(*v0alpha1.DataPlaneService)
|
||||
klog.V(4).Infof("Adding %s", castObj.Name)
|
||||
c.enqueueInternal(castObj)
|
||||
}
|
||||
|
||||
func (c *DataPlaneServiceRegistrationController) updateDataPlaneService(obj, _ interface{}) {
|
||||
castObj := obj.(*v0alpha1.DataPlaneService)
|
||||
klog.V(4).Infof("Updating %s", castObj.Name)
|
||||
c.enqueueInternal(castObj)
|
||||
}
|
||||
|
||||
func (c *DataPlaneServiceRegistrationController) deleteDataPlaneService(obj interface{}) {
|
||||
castObj, ok := obj.(*v0alpha1.DataPlaneService)
|
||||
if !ok {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
klog.Errorf("Couldn't get object from tombstone %#v", obj)
|
||||
return
|
||||
}
|
||||
castObj, ok = tombstone.Obj.(*v0alpha1.DataPlaneService)
|
||||
if !ok {
|
||||
klog.Errorf("Tombstone contained object that is not expected %#v", obj)
|
||||
return
|
||||
}
|
||||
}
|
||||
klog.V(4).Infof("Deleting %q", castObj.Name)
|
||||
c.enqueueInternal(castObj)
|
||||
}
|
||||
|
||||
func (c *DataPlaneServiceRegistrationController) Enqueue() {
|
||||
dataPlaneServices, err := c.dataPlaneServiceLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
return
|
||||
}
|
||||
for _, dataPlaneService := range dataPlaneServices {
|
||||
c.addDataPlaneService(dataPlaneService)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user