From de4171862c45e30a973f194cd3af68a3dbd58703 Mon Sep 17 00:00:00 2001 From: Andrej Ocenas Date: Fri, 2 Feb 2024 16:49:51 +0100 Subject: [PATCH] Tempo: Add custom headers middleware for grpc client (#81693) --- pkg/tsdb/tempo/grpc.go | 65 +++++++++++++++++++++++++-------- pkg/tsdb/tempo/search_stream.go | 7 ++++ 2 files changed, 56 insertions(+), 16 deletions(-) diff --git a/pkg/tsdb/tempo/grpc.go b/pkg/tsdb/tempo/grpc.go index 474502af75f..061713f989e 100644 --- a/pkg/tsdb/tempo/grpc.go +++ b/pkg/tsdb/tempo/grpc.go @@ -8,6 +8,8 @@ import ( "net/url" "strings" + "google.golang.org/grpc/metadata" + "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient" "github.com/grafana/tempo/pkg/tempopb" @@ -18,11 +20,12 @@ import ( var logger = backend.NewLoggerWith("logger", "tsdb.tempo") -// This function creates a new gRPC client to connect to a streaming query service. -// It starts by parsing the URL from the data source settings and extracting the host, since that's what the gRPC connection expects. -// If the URL does not contain a port number, it adds a default port based on the scheme (80 for HTTP and 443 for HTTPS). -// If basic authentication is enabled, it uses TLS transport credentials and sets the basic authentication header for each RPC call. -// Otherwise, it uses insecure credentials. +// newGrpcClient creates a new gRPC client to connect to a streaming query service. +// This uses the default google.golang.org/grpc library. One caveat to that is that it does not allow passing the +// default httpClient to the gRPC client. This means that we cannot use the same middleware that we use for +// standard HTTP requests. +// Using other library like connect-go isn't possible right now because Tempo uses non-standard proto compiler which +// makes generating different client difficult. See https://github.com/grafana/grafana/pull/81683 func newGrpcClient(settings backend.DataSourceInstanceSettings, opts httpclient.Options) (tempopb.StreamingQuerierClient, error) { parsedUrl, err := url.Parse(settings.URL) if err != nil { @@ -30,6 +33,7 @@ func newGrpcClient(settings backend.DataSourceInstanceSettings, opts httpclient. return nil, err } + // Make sure we have some default port if none is set. This is required for gRPC to work. onlyHost := parsedUrl.Host if !strings.Contains(onlyHost, ":") { if parsedUrl.Scheme == "http" { @@ -39,17 +43,7 @@ func newGrpcClient(settings backend.DataSourceInstanceSettings, opts httpclient. } } - var dialOps []grpc.DialOption - if settings.BasicAuthEnabled { - dialOps = append(dialOps, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))) - dialOps = append(dialOps, grpc.WithPerRPCCredentials(&basicAuth{ - Header: basicHeaderForAuth(opts.BasicAuth.User, opts.BasicAuth.Password), - })) - } else { - dialOps = append(dialOps, grpc.WithTransportCredentials(insecure.NewCredentials())) - } - - clientConn, err := grpc.Dial(onlyHost, dialOps...) + clientConn, err := grpc.Dial(onlyHost, getDialOpts(settings, opts)...) if err != nil { logger.Error("Error dialing gRPC client", "error", err, "URL", settings.URL, "function", logEntrypoint()) return nil, err @@ -58,6 +52,45 @@ func newGrpcClient(settings backend.DataSourceInstanceSettings, opts httpclient. return tempopb.NewStreamingQuerierClient(clientConn), nil } +// getDialOpts creates options and interceptors (middleware) this should roughly match what we do in +// http_client_provider.go for standard http requests. +func getDialOpts(settings backend.DataSourceInstanceSettings, opts httpclient.Options) []grpc.DialOption { + // TODO: Missing middleware TracingMiddleware, DataSourceMetricsMiddleware, ContextualMiddleware, + // ResponseLimitMiddleware RedirectLimitMiddleware. + // Also User agent but that is set before each rpc call as for decoupled DS we have to get it from request context + // and cannot add it to client here. + + var dialOps []grpc.DialOption + + dialOps = append(dialOps, grpc.WithChainStreamInterceptor(CustomHeadersStreamInterceptor(opts))) + if settings.BasicAuthEnabled { + // If basic authentication is enabled, it uses TLS transport credentials and sets the basic authentication header for each RPC call. + dialOps = append(dialOps, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))) + dialOps = append(dialOps, grpc.WithPerRPCCredentials(&basicAuth{ + Header: basicHeaderForAuth(opts.BasicAuth.User, opts.BasicAuth.Password), + })) + } else { + // Otherwise, it uses insecure credentials. + dialOps = append(dialOps, grpc.WithTransportCredentials(insecure.NewCredentials())) + } + + return dialOps +} + +// CustomHeadersStreamInterceptor adds custom headers to the outgoing context for each RPC call. Should work similar +// to the CustomHeadersMiddleware in the HTTP client provider. +func CustomHeadersStreamInterceptor(httpOpts httpclient.Options) grpc.StreamClientInterceptor { + return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + if len(httpOpts.Headers) != 0 { + for key, value := range httpOpts.Headers { + ctx = metadata.AppendToOutgoingContext(ctx, key, value) + } + } + + return streamer(ctx, desc, cc, method, opts...) + } +} + type basicAuth struct { Header string } diff --git a/pkg/tsdb/tempo/search_stream.go b/pkg/tsdb/tempo/search_stream.go index d2ef5a1d8ae..d96fec7a216 100644 --- a/pkg/tsdb/tempo/search_stream.go +++ b/pkg/tsdb/tempo/search_stream.go @@ -7,6 +7,8 @@ import ( "fmt" "io" + "google.golang.org/grpc/metadata" + "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend/tracing" "github.com/grafana/grafana-plugin-sdk-go/data" @@ -60,6 +62,11 @@ func (s *Service) runSearchStream(ctx context.Context, req *backend.RunStreamReq sr.Start = uint32(backendQuery.TimeRange.From.Unix()) sr.End = uint32(backendQuery.TimeRange.To.Unix()) + // Setting the user agent for the gRPC call. When DS is decoupled we don't recreate instance when grafana config + // changes or updates, so we have to get it from context. + // Ideally this would be pushed higher, so it's set once for all rpc calls, but we have only one now. + ctx = metadata.AppendToOutgoingContext(ctx, "User-Agent", backend.UserAgentFromContext(ctx).String()) + stream, err := datasource.StreamingClient.Search(ctx, sr) if err != nil { span.RecordError(err)