mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
APIServer: Propagate a new context with limited information (#100374)
* APIServer: Propagate a new context with limited information * APIServer: Remove error return * APIServer: Test that context propagation does fork * APIServer: Fix golangci-lint lints * chore: make update-workspace
This commit is contained in:
parent
aca024bcbb
commit
a0701a42f1
@ -2,13 +2,19 @@ package responsewriter
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/grafana/grafana-app-sdk/logging"
|
||||
"github.com/grafana/grafana/pkg/apimachinery/identity"
|
||||
"k8s.io/apiserver/pkg/authentication/user"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/endpoints/responsewriter"
|
||||
"k8s.io/component-base/tracing"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
@ -27,6 +33,13 @@ func WrapHandler(handler http.Handler) func(req *http.Request) (*http.Response,
|
||||
// so the client will be responsible for closing the response body.
|
||||
//nolint:bodyclose
|
||||
return func(req *http.Request) (*http.Response, error) {
|
||||
ctx, cancel, err := createLimitedContext(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer cancel()
|
||||
req = req.WithContext(ctx) // returns a shallow copy, so we can't do it as part of the adapter.
|
||||
|
||||
w := NewAdapter(req)
|
||||
go func() {
|
||||
handler.ServeHTTP(w, req)
|
||||
@ -39,6 +52,74 @@ func WrapHandler(handler http.Handler) func(req *http.Request) (*http.Response,
|
||||
}
|
||||
}
|
||||
|
||||
// createLimitedContext creates a new context based on the the req's.
|
||||
// It contains vital information such as a logger for the driver of the request, a user for auth, tracing, and deadlines. It propagates the parent's cancellation.
|
||||
func createLimitedContext(req *http.Request) (context.Context, context.CancelFunc, error) {
|
||||
refCtx := req.Context()
|
||||
newCtx := context.Background()
|
||||
|
||||
if ns, ok := request.NamespaceFrom(refCtx); ok {
|
||||
newCtx = request.WithNamespace(newCtx, ns)
|
||||
}
|
||||
if signal := request.ServerShutdownSignalFrom(refCtx); signal != nil {
|
||||
newCtx = request.WithServerShutdownSignal(newCtx, signal)
|
||||
}
|
||||
|
||||
requester, _ := identity.GetRequester(refCtx)
|
||||
if requester != nil {
|
||||
newCtx = identity.WithRequester(newCtx, requester)
|
||||
}
|
||||
|
||||
usr, ok := request.UserFrom(refCtx)
|
||||
if !ok && requester != nil {
|
||||
// add in k8s user if not there yet
|
||||
var ok bool
|
||||
usr, ok = requester.(user.Info)
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("could not convert user to Kubernetes user")
|
||||
}
|
||||
}
|
||||
if ok {
|
||||
newCtx = request.WithUser(newCtx, usr)
|
||||
}
|
||||
|
||||
// App SDK logger
|
||||
appLogger := logging.FromContext(refCtx)
|
||||
newCtx = logging.Context(newCtx, appLogger)
|
||||
// Klog logger
|
||||
klogger := klog.FromContext(refCtx)
|
||||
if klogger.Enabled() {
|
||||
newCtx = klog.NewContext(newCtx, klogger)
|
||||
}
|
||||
|
||||
// The tracing package deals with both k8s trace and otel.
|
||||
if span := tracing.SpanFromContext(refCtx); span != nil && *span != (tracing.Span{}) {
|
||||
newCtx = tracing.ContextWithSpan(newCtx, span)
|
||||
}
|
||||
|
||||
deadlineCancel := context.CancelFunc(func() {})
|
||||
if deadline, ok := refCtx.Deadline(); ok {
|
||||
newCtx, deadlineCancel = context.WithDeadline(newCtx, deadline)
|
||||
}
|
||||
|
||||
newCtx, cancel := context.WithCancelCause(newCtx)
|
||||
// We intentionally do not defer a cancel(nil) here. It wouldn't make sense to cancel until (*ResponseAdapter).Close() is called.
|
||||
go func() { // Even context's own impls do goroutines for this type of pattern.
|
||||
select {
|
||||
case <-newCtx.Done():
|
||||
// We don't have to do anything!
|
||||
case <-refCtx.Done():
|
||||
cancel(context.Cause(refCtx))
|
||||
}
|
||||
deadlineCancel()
|
||||
}()
|
||||
|
||||
return newCtx, context.CancelFunc(func() {
|
||||
cancel(nil)
|
||||
deadlineCancel()
|
||||
}), nil
|
||||
}
|
||||
|
||||
// ResponseAdapter is an implementation of [http.ResponseWriter] that allows conversion to a [http.Response].
|
||||
type ResponseAdapter struct {
|
||||
req *http.Request
|
||||
|
@ -10,6 +10,8 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"k8s.io/apiserver/pkg/authentication/user"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
|
||||
grafanaresponsewriter "github.com/grafana/grafana/pkg/apiserver/endpoints/responsewriter"
|
||||
)
|
||||
@ -158,6 +160,44 @@ func TestResponseAdapter(t *testing.T) {
|
||||
}
|
||||
wg.Wait()
|
||||
})
|
||||
|
||||
t.Run("should fork the context", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
type K int
|
||||
var key K
|
||||
baseCtx := context.Background()
|
||||
baseCtx = context.WithValue(baseCtx, key, "hello, world!") // we expect this one not to be sent to the inner handler.
|
||||
|
||||
expectedUsr := &user.DefaultInfo{Name: "hello, world!"}
|
||||
baseCtx = request.WithUser(baseCtx, expectedUsr)
|
||||
// There are more keys to consider, but this should be sufficient to decide that we do actually propagate select data across.
|
||||
|
||||
client := &http.Client{
|
||||
Transport: &roundTripperFunc{
|
||||
ready: make(chan struct{}),
|
||||
// ignore the lint error because the response is passed directly to the client,
|
||||
// so the client will be responsible for closing the response body.
|
||||
//nolint:bodyclose
|
||||
fn: grafanaresponsewriter.WrapHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
require.Nil(t, r.Context().Value(key), "inner handler should not have a value for key of type K")
|
||||
usr, ok := request.UserFrom(r.Context())
|
||||
require.True(t, ok, "no user found in request context")
|
||||
require.Equal(t, expectedUsr.Name, usr.GetName(), "user data was not propagated through request context")
|
||||
|
||||
_, err := w.Write([]byte("OK"))
|
||||
require.NoError(t, err)
|
||||
})),
|
||||
},
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(baseCtx, http.MethodGet, "/", nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
resp, err := client.Do(req)
|
||||
require.NoError(t, err, "request should not fail")
|
||||
require.NoError(t, resp.Body.Close())
|
||||
})
|
||||
}
|
||||
|
||||
func syncHandler(w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -1,10 +1,13 @@
|
||||
module github.com/grafana/grafana/pkg/apiserver
|
||||
|
||||
go 1.23.1
|
||||
go 1.23.4
|
||||
|
||||
toolchain go1.23.6
|
||||
|
||||
require (
|
||||
github.com/google/go-cmp v0.6.0
|
||||
github.com/grafana/authlib/types v0.0.0-20250120145936-5f0e28e7a87c
|
||||
github.com/grafana/grafana-app-sdk/logging v0.30.0
|
||||
github.com/grafana/grafana/pkg/apimachinery v0.0.0-20240701135906-559738ce6ae1
|
||||
github.com/prometheus/client_golang v1.20.5
|
||||
github.com/stretchr/testify v1.10.0
|
||||
|
@ -81,6 +81,8 @@ github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aN
|
||||
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/grafana/authlib/types v0.0.0-20250120145936-5f0e28e7a87c h1:b0sPDtt33uFdmvUJjSCld3kwE2E49dUvevuUDSJsEuo=
|
||||
github.com/grafana/authlib/types v0.0.0-20250120145936-5f0e28e7a87c/go.mod h1:qYjSd1tmJiuVoSICp7Py9/zD54O9uQQA3wuM6Gg4DFM=
|
||||
github.com/grafana/grafana-app-sdk/logging v0.30.0 h1:K/P/bm7Cp7Di4tqIJ3EQz2+842JozQGRaz62r95ApME=
|
||||
github.com/grafana/grafana-app-sdk/logging v0.30.0/go.mod h1:xy6ZyVXl50Z3DBDLybvBPphbykPhuVNed/VNmen9DQM=
|
||||
github.com/grafana/grafana/pkg/apimachinery v0.0.0-20240701135906-559738ce6ae1 h1:ItDcDxUjVLPKja+hogpqgW/kj8LxUL2qscelXIsN1Bs=
|
||||
github.com/grafana/grafana/pkg/apimachinery v0.0.0-20240701135906-559738ce6ae1/go.mod h1:DkxMin+qOh1Fgkxfbt+CUfBqqsCQJMG9op8Os/irBPA=
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDaL56wXCB/5+wF6uHfaI=
|
||||
|
Loading…
Reference in New Issue
Block a user