K8s: Add Aggregation to Backend Service (#81591)

Co-authored-by: Charandas Batra <charandas.batra@grafana.com>
This commit is contained in:
Todd Treece
2024-02-12 15:59:35 -05:00
committed by GitHub
parent 6d5211e172
commit d6e6298103
20 changed files with 2267 additions and 1856 deletions

View File

@@ -4,7 +4,6 @@ import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"path"
"github.com/grafana/dskit/services"
@@ -26,8 +25,10 @@ import (
"github.com/grafana/grafana/pkg/middleware"
"github.com/grafana/grafana/pkg/modules"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/services/apiserver/aggregator"
"github.com/grafana/grafana/pkg/services/apiserver/auth/authorizer"
"github.com/grafana/grafana/pkg/services/apiserver/builder"
grafanaresponsewriter "github.com/grafana/grafana/pkg/services/apiserver/endpoints/responsewriter"
grafanaapiserveroptions "github.com/grafana/grafana/pkg/services/apiserver/options"
entitystorage "github.com/grafana/grafana/pkg/services/apiserver/storage/entity"
filestorage "github.com/grafana/grafana/pkg/services/apiserver/storage/file"
@@ -189,12 +190,17 @@ func (s *service) start(ctx context.Context) error {
groupVersions := make([]schema.GroupVersion, 0, len(builders))
// Install schemas
for _, b := range builders {
for i, b := range builders {
groupVersions = append(groupVersions, b.GetGroupVersion())
if err := b.InstallSchema(Scheme); err != nil {
return err
}
if s.features.IsEnabledGlobally(featuremgmt.FlagKubernetesAggregator) {
// set the priority for the group+version
aggregator.APIVersionPriorities[b.GetGroupVersion()] = aggregator.Priority{Group: 15000, Version: int32(i + 1)}
}
auth := b.GetAuthorizer()
if auth != nil {
s.authorizer.Register(b.GetGroupVersion(), auth)
@@ -216,7 +222,7 @@ func (s *service) start(ctx context.Context) error {
serverConfig.Authorization.Authorizer = s.authorizer
serverConfig.TracerProvider = s.tracing.GetTracerProvider()
// setup loopback transport
// setup loopback transport for the aggregator server
transport := &roundTripperFunc{ready: make(chan struct{})}
serverConfig.LoopbackClientConfig.Transport = transport
serverConfig.LoopbackClientConfig.TLSClientConfig = clientrest.TLSClientConfig{}
@@ -283,41 +289,100 @@ func (s *service) start(ctx context.Context) error {
return err
}
// dual writing is only enabled when the storage type is not legacy.
// this is needed to support setting a default RESTOptionsGetter for new APIs that don't
// support the legacy storage type.
dualWriteEnabled := o.StorageOptions.StorageType != grafanaapiserveroptions.StorageTypeLegacy
// Install the API Group+version
// Install the API group+version
err = builder.InstallAPIs(Scheme, Codecs, server, serverConfig.RESTOptionsGetter, builders, dualWriteEnabled)
if err != nil {
return err
}
// set the transport function and signal that it's ready
transport.fn = func(req *http.Request) (*http.Response, error) {
w := newWrappedResponseWriter()
resp := responsewriter.WrapForHTTP1Or2(w)
server.Handler.ServeHTTP(resp, req)
return w.Result(), nil
}
close(transport.ready)
// stash the options for later use
s.options = o
// only write kubeconfig in dev mode
if o.ExtraOptions.DevMode {
if err := ensureKubeConfig(server.LoopbackClientConfig, o.StorageOptions.DataPath); err != nil {
var runningServer *genericapiserver.GenericAPIServer
if s.features.IsEnabledGlobally(featuremgmt.FlagKubernetesAggregator) {
runningServer, err = s.startAggregator(transport, serverConfig, server)
if err != nil {
return err
}
} else {
runningServer, err = s.startCoreServer(transport, serverConfig, server)
if err != nil {
return err
}
}
// Used by the proxy wrapper registered in ProvideService
s.handler = server.Handler
s.restConfig = server.LoopbackClientConfig
s.options = o
// only write kubeconfig in dev mode
if o.ExtraOptions.DevMode {
if err := ensureKubeConfig(runningServer.LoopbackClientConfig, o.StorageOptions.DataPath); err != nil {
return err
}
}
// used by the proxy wrapper registered in ProvideService
s.handler = runningServer.Handler
// used by local clients to make requests to the server
s.restConfig = runningServer.LoopbackClientConfig
return nil
}
func (s *service) startCoreServer(
transport *roundTripperFunc,
serverConfig *genericapiserver.RecommendedConfig,
server *genericapiserver.GenericAPIServer,
) (*genericapiserver.GenericAPIServer, error) {
// setup the loopback transport and signal that it's ready.
// ignore the lint error because the response is passed directly to the client,
// so the client will be responsible for closing the response body.
// nolint:bodyclose
transport.fn = grafanaresponsewriter.WrapHandler(server.Handler)
close(transport.ready)
prepared := server.PrepareRun()
go func() {
s.stoppedCh <- prepared.Run(s.stopCh)
}()
return server, nil
}
func (s *service) startAggregator(
transport *roundTripperFunc,
serverConfig *genericapiserver.RecommendedConfig,
server *genericapiserver.GenericAPIServer,
) (*genericapiserver.GenericAPIServer, error) {
aggregatorConfig, aggregatorInformers, err := aggregator.CreateAggregatorConfig(s.options, *serverConfig)
if err != nil {
return nil, err
}
aggregatorServer, err := aggregator.CreateAggregatorServer(aggregatorConfig, aggregatorInformers, server)
if err != nil {
return nil, err
}
// setup the loopback transport for the aggregator server and signal that it's ready
// ignore the lint error because the response is passed directly to the client,
// so the client will be responsible for closing the response body.
// nolint:bodyclose
transport.fn = grafanaresponsewriter.WrapHandler(aggregatorServer.GenericAPIServer.Handler)
close(transport.ready)
prepared, err := aggregatorServer.PrepareRun()
if err != nil {
return nil, err
}
go func() {
s.stoppedCh <- prepared.Run(s.stopCh)
}()
return nil
return aggregatorServer.GenericAPIServer, nil
}
func (s *service) GetDirectRestConfig(c *contextmodel.ReqContext) *clientrest.Config {
@@ -325,9 +390,8 @@ func (s *service) GetDirectRestConfig(c *contextmodel.ReqContext) *clientrest.Co
Transport: &roundTripperFunc{
fn: func(req *http.Request) (*http.Response, error) {
ctx := appcontext.WithUser(req.Context(), c.SignedInUser)
w := httptest.NewRecorder()
s.handler.ServeHTTP(w, req.WithContext(ctx))
return w.Result(), nil
wrapped := grafanaresponsewriter.WrapHandler(s.handler)
return wrapped(req.WithContext(ctx))
},
},
}
@@ -367,24 +431,3 @@ func (f *roundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error)
}
return f.fn(req)
}
var _ http.ResponseWriter = (*wrappedResponseWriter)(nil)
var _ responsewriter.UserProvidedDecorator = (*wrappedResponseWriter)(nil)
type wrappedResponseWriter struct {
*httptest.ResponseRecorder
}
func newWrappedResponseWriter() *wrappedResponseWriter {
w := httptest.NewRecorder()
return &wrappedResponseWriter{w}
}
func (w *wrappedResponseWriter) Unwrap() http.ResponseWriter {
return w.ResponseRecorder
}
func (w *wrappedResponseWriter) CloseNotify() <-chan bool {
// TODO: this is probably not the right thing to do here
return make(<-chan bool)
}