Aggregator: Prepare for new handlers (#92030)

This commit is contained in:
Todd Treece 2024-08-16 14:13:38 -04:00 committed by GitHub
parent 135f6571a9
commit def8104e74
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 208 additions and 220 deletions

View File

@ -16,14 +16,16 @@ import (
"github.com/grafana/grafana/pkg/aggregator/apis/aggregation/v0alpha1"
v0alpha1helper "github.com/grafana/grafana/pkg/aggregator/apis/aggregation/v0alpha1/helper"
"github.com/grafana/grafana/pkg/aggregator/apiserver/discovery"
"github.com/grafana/grafana/pkg/aggregator/apiserver/plugin"
clientset "github.com/grafana/grafana/pkg/aggregator/generated/clientset/versioned"
informers "github.com/grafana/grafana/pkg/aggregator/generated/informers/externalversions"
dataplaneservicerest "github.com/grafana/grafana/pkg/aggregator/registry/dataplaneservice/rest"
)
type ExtraConfig struct {
PluginClient PluginClient
PluginContextProvider PluginContextProvider
PluginClient plugin.PluginClient
PluginContextProvider plugin.PluginContextProvider
}
type Config struct {
@ -57,10 +59,10 @@ type GrafanaAggregator struct {
GenericAPIServer *genericapiserver.GenericAPIServer
RegistrationInformers informers.SharedInformerFactory
delegateHandler http.Handler
proxyHandlers map[string]*proxyHandler
proxyHandlers map[string]*dataPlaneServiceHandler
handledGroupVersions map[string]sets.Set[string]
PluginClient PluginClient
PluginContextProvider PluginContextProvider
PluginClient plugin.PluginClient
PluginContextProvider plugin.PluginContextProvider
}
// Complete fills in any fields not set that are required to have valid data. It's mutating the receiver.
@ -82,8 +84,8 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
return nil, err
}
discoveryHandler := newApisProxyHandler(delegationTarget.UnprotectedHandler())
genericServer.Handler.GoRestfulContainer.Filter(discoveryHandler.handle)
discoveryHandler := discovery.NewRootDiscoveryHandler(delegationTarget.UnprotectedHandler())
genericServer.Handler.GoRestfulContainer.Filter(discoveryHandler.Handle)
dataplaneServiceRegistrationControllerInitiated := make(chan struct{})
if err := genericServer.RegisterMuxAndDiscoveryCompleteSignal("DataPlaneServiceRegistrationControllerInitiated", dataplaneServiceRegistrationControllerInitiated); err != nil {
@ -104,7 +106,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
RegistrationInformers: informerFactory,
delegateHandler: delegationTarget.UnprotectedHandler(),
handledGroupVersions: map[string]sets.Set[string]{},
proxyHandlers: map[string]*proxyHandler{},
proxyHandlers: map[string]*dataPlaneServiceHandler{},
PluginClient: c.ExtraConfig.PluginClient,
PluginContextProvider: c.ExtraConfig.PluginContextProvider,
}
@ -151,7 +153,7 @@ func (s *GrafanaAggregator) AddDataPlaneService(dataplaneService *v0alpha1.DataP
}
proxyPath := "/apis/dataplane/" + dataplaneService.Spec.Group + "/" + dataplaneService.Spec.Version
proxyHandler := &proxyHandler{
proxyHandler := &dataPlaneServiceHandler{
localDelegate: s.delegateHandler,
client: s.PluginClient,
pluginContextProvider: s.PluginContextProvider,

View File

@ -0,0 +1,67 @@
package apiserver
import (
"net/http"
"sync/atomic"
"time"
grafanasemconv "github.com/grafana/grafana/pkg/semconv"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/component-base/tracing"
aggregationv0alpha1 "github.com/grafana/grafana/pkg/aggregator/apis/aggregation/v0alpha1"
"github.com/grafana/grafana/pkg/aggregator/apiserver/plugin"
)
// dataPlaneServiceHandler provides a http.Handler which will proxy traffic to a plugin client.
type dataPlaneServiceHandler struct {
localDelegate http.Handler
client plugin.PluginClient
pluginContextProvider plugin.PluginContextProvider
handlingInfo atomic.Value
}
type handlingInfo struct {
name string
handler http.Handler
}
func (r *dataPlaneServiceHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
value := r.handlingInfo.Load()
if value == nil {
r.localDelegate.ServeHTTP(w, req)
return
}
handlingInfo := value.(handlingInfo)
namespace, _ := request.NamespaceFrom(req.Context())
ctx, span := tracing.Start(
req.Context(),
"grafana-aggregator",
grafanasemconv.K8sDataplaneserviceName(handlingInfo.name),
semconv.K8SNamespaceName(namespace),
semconv.HTTPMethod(req.Method),
semconv.HTTPURL(req.URL.String()),
)
// log if the span has not ended after a minute
defer span.End(time.Minute)
handlingInfo.handler.ServeHTTP(w, req.WithContext(ctx))
}
func (r *dataPlaneServiceHandler) updateDataPlaneService(dataplaneService *aggregationv0alpha1.DataPlaneService) {
newInfo := handlingInfo{
name: dataplaneService.Name,
}
// currently only plugin handlers are supported
newInfo.handler = plugin.NewPluginHandler(
r.client,
*dataplaneService,
r.pluginContextProvider,
r.localDelegate,
)
r.handlingInfo.Store(newInfo)
}

View File

@ -1,4 +1,4 @@
package apiserver
package discovery
import (
"encoding/json"
@ -18,13 +18,13 @@ import (
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
)
// apisProxyHandler serves the `/apis` endpoint.
type apisProxyHandler struct {
// RootDiscoveryHandler serves the `/apis` endpoint.
type RootDiscoveryHandler struct {
delegate http.Handler
codecs serializer.CodecFactory
}
func newApisProxyHandler(delegate http.Handler) *apisProxyHandler {
func NewRootDiscoveryHandler(delegate http.Handler) *RootDiscoveryHandler {
scheme := runtime.NewScheme()
metav1.AddToGroupVersion(scheme, schema.GroupVersion{Version: "v1"})
@ -40,13 +40,13 @@ func newApisProxyHandler(delegate http.Handler) *apisProxyHandler {
utilruntime.Must(apidiscoveryv2.AddToScheme(scheme))
utilruntime.Must(apidiscoveryv2beta1.AddToScheme(scheme))
codecs := serializer.NewCodecFactory(scheme)
return &apisProxyHandler{
return &RootDiscoveryHandler{
delegate: delegate,
codecs: codecs,
}
}
func (a *apisProxyHandler) handle(req *restful.Request, resp *restful.Response, chain *restful.FilterChain) {
func (a *RootDiscoveryHandler) Handle(req *restful.Request, resp *restful.Response, chain *restful.FilterChain) {
if req.Request.URL.Path != "/apis" && req.Request.URL.Path != "/apis/" {
chain.ProcessFilter(req, resp)
return
@ -55,49 +55,15 @@ func (a *apisProxyHandler) handle(req *restful.Request, resp *restful.Response,
apisHandlerWithAggregationSupport.ServeHTTP(resp.ResponseWriter, req.Request)
}
func (a *apisProxyHandler) v2handler(chain *restful.FilterChain) http.HandlerFunc {
func (a *RootDiscoveryHandler) v2handler(chain *restful.FilterChain) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
clonedReq := req.Clone(req.Context())
newReq := restful.NewRequest(clonedReq)
rw := httptest.NewRecorder()
newRes := restful.NewResponse(rw)
newReq := restful.NewRequest(req)
newRes := restful.NewResponse(w)
chain.ProcessFilter(newReq, newRes)
if rw.Code != http.StatusOK {
http.Error(w, rw.Body.String(), rw.Code)
return
}
v2Discovery := apidiscoveryv2.APIGroupDiscoveryList{}
if err := json.Unmarshal(rw.Body.Bytes(), &v2Discovery); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
clonedReq = req.Clone(req.Context())
rw = httptest.NewRecorder()
a.delegate.ServeHTTP(rw, clonedReq)
if rw.Code != http.StatusOK {
http.Error(w, rw.Body.String(), rw.Code)
return
}
proxiedDiscovery := apidiscoveryv2.APIGroupDiscoveryList{}
if err := json.Unmarshal(rw.Body.Bytes(), &proxiedDiscovery); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
v2Discovery.Items = append(v2Discovery.Items, proxiedDiscovery.Items...)
responsewriters.WriteObjectNegotiated(a.codecs, aggregated.DiscoveryEndpointRestrictions, schema.GroupVersion{
Group: apidiscoveryv2.SchemeGroupVersion.Group,
Version: apidiscoveryv2.SchemeGroupVersion.Version,
}, w, req, http.StatusOK, &v2Discovery, true)
}
}
func (a *apisProxyHandler) v1handler(chain *restful.FilterChain) http.HandlerFunc {
func (a *RootDiscoveryHandler) v1handler(chain *restful.FilterChain) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
clonedReq := req.Clone(req.Context())
clonedReq.Header.Set("Accept", "application/json")

View File

@ -1,4 +1,4 @@
package apiserver
package discovery
import (
"encoding/json"
@ -16,7 +16,7 @@ import (
aggregationv0alpha1api "github.com/grafana/grafana/pkg/aggregator/apis/aggregation/v0alpha1"
)
func TestApisProxyHandler_Handle(t *testing.T) {
func TestRootDiscoveryHandler_Handle(t *testing.T) {
v1Discovery := metav1.APIGroup{
Name: aggregationv0alpha1api.SchemeGroupVersion.Group,
Versions: []metav1.GroupVersionForDiscovery{
@ -107,7 +107,7 @@ func TestApisProxyHandler_Handle(t *testing.T) {
}
})
handler := newApisProxyHandler(delegationTarget)
handler := NewRootDiscoveryHandler(delegationTarget)
chain := &restful.FilterChain{
Target: func(req *restful.Request, resp *restful.Response) {
@ -152,7 +152,7 @@ func TestApisProxyHandler_Handle(t *testing.T) {
ResponseWriter: rec,
}
handler.handle(req, resp, chain)
handler.Handle(req, resp, chain)
require.Equal(t, http.StatusOK, resp.StatusCode())
require.NoError(t, resp.Error())
require.Equal(t, "application/json", resp.Header().Get("Content-Type"))
@ -182,7 +182,7 @@ func TestApisProxyHandler_Handle(t *testing.T) {
ResponseWriter: rec,
}
handler.handle(req, resp, chain)
handler.Handle(req, resp, chain)
require.Equal(t, http.StatusOK, resp.StatusCode())
require.Equal(t, "application/json;g=apidiscovery.k8s.io;v=v2;as=APIGroupDiscoveryList", resp.Header().Get("Content-Type"))
@ -226,7 +226,7 @@ func TestApisProxyHandler_Handle(t *testing.T) {
ResponseWriter: rec,
}
handler.handle(req, resp, chain)
handler.Handle(req, resp, chain)
require.Equal(t, http.StatusOK, rec.Code)
require.Equal(t, "v0alpha1", rec.Body.String())
})

View File

@ -1,102 +0,0 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/kubernetes/kube-aggregator/blob/master/pkg/apiserver/handler_proxy.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Kubernetes Authors.
package apiserver
import (
"context"
"net/http"
"sync/atomic"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/component-base/tracing"
aggregationv0alpha1 "github.com/grafana/grafana/pkg/aggregator/apis/aggregation/v0alpha1"
grafanasemconv "github.com/grafana/grafana/pkg/semconv"
)
type PluginClient interface {
backend.QueryDataHandler
backend.StreamHandler
backend.AdmissionHandler
backend.CallResourceHandler
}
type PluginContextProvider interface {
GetPluginContext(ctx context.Context, pluginID, uid string) (backend.PluginContext, error)
}
// proxyHandler provides a http.Handler which will proxy traffic to a plugin client.
type proxyHandler struct {
localDelegate http.Handler
client PluginClient
pluginContextProvider PluginContextProvider
handlingInfo atomic.Value
}
type proxyHandlingInfo struct {
name string
handler *pluginHandler
}
func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
value := r.handlingInfo.Load()
if value == nil {
r.localDelegate.ServeHTTP(w, req)
return
}
handlingInfo := value.(proxyHandlingInfo)
namespace, _ := request.NamespaceFrom(req.Context())
ctx, span := tracing.Start(
req.Context(),
"grafana-aggregator",
grafanasemconv.K8sDataplaneserviceName(handlingInfo.name),
semconv.K8SNamespaceName(namespace),
semconv.HTTPMethod(req.Method),
semconv.HTTPURL(req.URL.String()),
)
// log if the span has not ended after a minute
defer span.End(time.Minute)
handlingInfo.handler.ServeHTTP(w, req.WithContext(ctx))
}
func (r *proxyHandler) updateDataPlaneService(dataplaneService *aggregationv0alpha1.DataPlaneService) {
newInfo := proxyHandlingInfo{
name: dataplaneService.Name,
}
newInfo.handler = newPluginHandler(
r.client,
*dataplaneService,
r.pluginContextProvider,
r.localDelegate,
)
r.handlingInfo.Store(newInfo)
}
// responder implements rest.Responder for assisting a connector in writing objects or errors.
type responder struct {
w http.ResponseWriter
}
// TODO this should properly handle content type negotiation
// if the caller asked for protobuf and you write JSON bad things happen.
func (r *responder) Object(statusCode int, obj runtime.Object) {
responsewriters.WriteRawJSON(statusCode, obj, r.w)
}
func (r *responder) Error(_ http.ResponseWriter, req *http.Request, err error) {
tracing.SpanFromContext(req.Context()).RecordError(err)
s := responsewriters.ErrorToAPIStatus(err)
r.Object(http.StatusInternalServerError, s)
}

View File

@ -0,0 +1,83 @@
package plugin
import (
"context"
"net/http"
"path"
"github.com/grafana/grafana-plugin-sdk-go/backend"
aggregationv0alpha1 "github.com/grafana/grafana/pkg/aggregator/apis/aggregation/v0alpha1"
)
type PluginClient interface {
backend.QueryDataHandler
backend.StreamHandler
backend.AdmissionHandler
backend.CallResourceHandler
}
type PluginContextProvider interface {
GetPluginContext(ctx context.Context, pluginID, uid string) (backend.PluginContext, error)
}
type PluginHandler struct {
mux *http.ServeMux
delegate http.Handler
client PluginClient
pluginContextProvider PluginContextProvider
dataplaneService aggregationv0alpha1.DataPlaneService
}
func NewPluginHandler(
client PluginClient,
dataplaneService aggregationv0alpha1.DataPlaneService,
pluginContextProvider PluginContextProvider,
delegate http.Handler,
) *PluginHandler {
h := &PluginHandler{
mux: http.NewServeMux(),
delegate: delegate,
client: client,
pluginContextProvider: pluginContextProvider,
dataplaneService: dataplaneService,
}
h.registerRoutes()
return h
}
func (h *PluginHandler) registerRoutes() {
proxyPath := proxyPathBuilder(h.dataplaneService.Spec.Group, h.dataplaneService.Spec.Version)
for _, service := range h.dataplaneService.Spec.Services {
switch service.Type {
case aggregationv0alpha1.AdmissionControlServiceType:
// TODO: implement in future PR
case aggregationv0alpha1.ConversionServiceType:
// TODO: implement in future PR
case aggregationv0alpha1.DataSourceProxyServiceType:
// TODO: implement in future PR
case aggregationv0alpha1.QueryServiceType:
h.mux.Handle(proxyPath("/namespaces/{namespace}/connections/{uid}/query"), h.QueryDataHandler())
case aggregationv0alpha1.RouteServiceType:
// TODO: implement in future PR
case aggregationv0alpha1.StreamServiceType:
// TODO: implement in future PR
}
}
// fallback to the delegate
h.mux.Handle("/", h.delegate)
}
func (h *PluginHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
h.mux.ServeHTTP(w, req)
}
func proxyPathBuilder(group, version string) func(string) string {
return func(suffix string) string {
return path.Join("/apis", group, version, suffix)
}
}

View File

@ -1,4 +1,4 @@
package apiserver
package plugin
import (
"encoding/json"
@ -8,73 +8,21 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend"
data "github.com/grafana/grafana-plugin-sdk-go/experimental/apis/data/v0alpha1"
grafanasemconv "github.com/grafana/grafana/pkg/semconv"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
"k8s.io/component-base/tracing"
"k8s.io/klog/v2"
aggregationv0alpha1 "github.com/grafana/grafana/pkg/aggregator/apis/aggregation/v0alpha1"
grafanasemconv "github.com/grafana/grafana/pkg/semconv"
"github.com/grafana/grafana/pkg/aggregator/apiserver/util"
)
type pluginHandler struct {
mux *http.ServeMux
delegate http.Handler
client PluginClient
pluginContextProvider PluginContextProvider
dataplaneService aggregationv0alpha1.DataPlaneService
}
func newPluginHandler(
client PluginClient,
dataplaneService aggregationv0alpha1.DataPlaneService,
pluginContextProvider PluginContextProvider,
delegate http.Handler,
) *pluginHandler {
mux := http.NewServeMux()
h := &pluginHandler{
mux: mux,
delegate: delegate,
client: client,
pluginContextProvider: pluginContextProvider,
dataplaneService: dataplaneService,
}
for _, service := range dataplaneService.Spec.Services {
switch service.Type {
case aggregationv0alpha1.QueryServiceType:
proxyPath := fmt.Sprintf("/apis/%s/%s/namespaces/{namespace}/connections/{uid}/query", dataplaneService.Spec.Group, dataplaneService.Spec.Version)
mux.Handle(proxyPath, h.QueryDataHandler())
case aggregationv0alpha1.StreamServiceType:
// TODO: implement in future PR
case aggregationv0alpha1.AdmissionControlServiceType:
// TODO: implement in future PR
case aggregationv0alpha1.RouteServiceType:
// TODO: implement in future PR
case aggregationv0alpha1.ConversionServiceType:
// TODO: implement in future PR
case aggregationv0alpha1.DataSourceProxyServiceType:
// TODO: implement in future PR
}
}
// fallback to the delegate
mux.Handle("/", delegate)
return h
}
func (h *pluginHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
h.mux.ServeHTTP(w, req)
}
func (h *pluginHandler) QueryDataHandler() http.HandlerFunc {
func (h *PluginHandler) QueryDataHandler() http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
span := tracing.SpanFromContext(ctx)
span.AddEvent("QueryDataHandler")
responder := &responder{w: w}
responder := &util.Responder{ResponseWriter: w}
dqr := data.QueryDataRequest{}
if err := json.NewDecoder(req.Body).Decode(&dqr); err != nil {
responder.Error(w, req, err)

View File

@ -1,4 +1,4 @@
package apiserver
package plugin
import (
"bytes"
@ -59,7 +59,7 @@ func TestQueryDataHandler(t *testing.T) {
}
delegate := newFakeHTTPHandler(http.StatusNotFound, []byte(`Not Found`))
handler := newPluginHandler(pc, dps, contextProvider, delegate)
handler := NewPluginHandler(pc, dps, contextProvider, delegate)
qdr := datav0alpha1.QueryDataRequest{
TimeRange: datav0alpha1.TimeRange{},

View File

@ -0,0 +1,24 @@
package util
import (
"net/http"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/component-base/tracing"
)
// Responder implements rest.Responder for assisting a connector in writing objects or errors.
type Responder struct {
ResponseWriter http.ResponseWriter
}
func (r Responder) Object(statusCode int, obj runtime.Object) {
responsewriters.WriteRawJSON(statusCode, obj, r.ResponseWriter)
}
func (r *Responder) Error(_ http.ResponseWriter, req *http.Request, err error) {
tracing.SpanFromContext(req.Context()).RecordError(err)
s := responsewriters.ErrorToAPIStatus(err)
r.Object(http.StatusInternalServerError, s)
}