Tempo: Implement PDC for gRPC client (#85867)

This commit is contained in:
lean.dev 2024-05-15 09:39:02 -03:00 committed by GitHub
parent 1cf1c5f03a
commit 639ab6cca7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 41 additions and 5 deletions

View File

@ -5,6 +5,7 @@ import (
"crypto/tls" "crypto/tls"
"encoding/base64" "encoding/base64"
"fmt" "fmt"
"net"
"net/url" "net/url"
"strings" "strings"
@ -26,7 +27,7 @@ var logger = backend.NewLoggerWith("logger", "tsdb.tempo")
// standard HTTP requests. // standard HTTP requests.
// Using other library like connect-go isn't possible right now because Tempo uses non-standard proto compiler which // Using other library like connect-go isn't possible right now because Tempo uses non-standard proto compiler which
// makes generating different client difficult. See https://github.com/grafana/grafana/pull/81683 // makes generating different client difficult. See https://github.com/grafana/grafana/pull/81683
func newGrpcClient(settings backend.DataSourceInstanceSettings, opts httpclient.Options) (tempopb.StreamingQuerierClient, error) { func newGrpcClient(ctx context.Context, settings backend.DataSourceInstanceSettings, opts httpclient.Options) (tempopb.StreamingQuerierClient, error) {
parsedUrl, err := url.Parse(settings.URL) parsedUrl, err := url.Parse(settings.URL)
if err != nil { if err != nil {
logger.Error("Error parsing URL for gRPC client", "error", err, "URL", settings.URL, "function", logEntrypoint()) logger.Error("Error parsing URL for gRPC client", "error", err, "URL", settings.URL, "function", logEntrypoint())
@ -43,18 +44,23 @@ func newGrpcClient(settings backend.DataSourceInstanceSettings, opts httpclient.
} }
} }
clientConn, err := grpc.Dial(onlyHost, getDialOpts(settings, opts)...) dialOpts, err := getDialOpts(ctx, settings, opts)
if err != nil {
return nil, fmt.Errorf("error getting dial options: %w", err)
}
clientConn, err := grpc.Dial(onlyHost, dialOpts...)
if err != nil { if err != nil {
logger.Error("Error dialing gRPC client", "error", err, "URL", settings.URL, "function", logEntrypoint()) logger.Error("Error dialing gRPC client", "error", err, "URL", settings.URL, "function", logEntrypoint())
return nil, err return nil, err
} }
logger.Debug("Instantiating new gRPC client")
return tempopb.NewStreamingQuerierClient(clientConn), nil return tempopb.NewStreamingQuerierClient(clientConn), nil
} }
// getDialOpts creates options and interceptors (middleware) this should roughly match what we do in // getDialOpts creates options and interceptors (middleware) this should roughly match what we do in
// http_client_provider.go for standard http requests. // http_client_provider.go for standard http requests.
func getDialOpts(settings backend.DataSourceInstanceSettings, opts httpclient.Options) []grpc.DialOption { func getDialOpts(ctx context.Context, settings backend.DataSourceInstanceSettings, opts httpclient.Options) ([]grpc.DialOption, error) {
// TODO: Missing middleware TracingMiddleware, DataSourceMetricsMiddleware, ContextualMiddleware, // TODO: Missing middleware TracingMiddleware, DataSourceMetricsMiddleware, ContextualMiddleware,
// ResponseLimitMiddleware RedirectLimitMiddleware. // ResponseLimitMiddleware RedirectLimitMiddleware.
// Also User agent but that is set before each rpc call as for decoupled DS we have to get it from request context // Also User agent but that is set before each rpc call as for decoupled DS we have to get it from request context
@ -74,7 +80,37 @@ func getDialOpts(settings backend.DataSourceInstanceSettings, opts httpclient.Op
dialOps = append(dialOps, grpc.WithTransportCredentials(insecure.NewCredentials())) dialOps = append(dialOps, grpc.WithTransportCredentials(insecure.NewCredentials()))
} }
return dialOps // The following code is required to make gRPC work with Grafana Cloud PDC
// (https://grafana.com/docs/grafana-cloud/connect-externally-hosted/private-data-source-connect/)
proxyClient, err := settings.ProxyClient(ctx)
if err != nil {
return nil, fmt.Errorf("proxy client cannot be retrieved, it is not possible to check if secure socks proxy is enabled: %w", err)
}
if proxyClient.SecureSocksProxyEnabled() { // secure socks proxy is behind a feature flag
dialer, err := proxyClient.NewSecureSocksProxyContextDialer()
if err != nil {
return nil, fmt.Errorf("failure in creating dialer: %w", err)
}
logger.Debug("gRPC dialer instantiated. Appending gRPC dialer to dial options")
dialOps = append(dialOps, grpc.WithContextDialer(func(ctx context.Context, host string) (net.Conn, error) {
logger.Debug("Dialing secure socks proxy", "host", host)
conn, err := dialer.Dial("tcp", host)
if err != nil {
return nil, fmt.Errorf("not possible to dial secure socks proxy: %w", err)
}
select {
case <-ctx.Done():
logger.Debug("Context canceled")
// We return `conn` anyway since we need to better test how context cancellation works
return conn, fmt.Errorf("context canceled: %w", err)
default:
return conn, nil
}
}))
}
logger.Debug("Returning dial options")
return dialOps, nil
} }
// CustomHeadersStreamInterceptor adds custom headers to the outgoing context for each RPC call. Should work similar // CustomHeadersStreamInterceptor adds custom headers to the outgoing context for each RPC call. Should work similar

View File

@ -66,7 +66,7 @@ func newInstanceSettings(httpClientProvider *httpclient.Provider) datasource.Ins
return nil, err return nil, err
} }
streamingClient, err := newGrpcClient(settings, opts) streamingClient, err := newGrpcClient(ctx, settings, opts)
if err != nil { if err != nil {
ctxLogger.Error("Failed to get gRPC client", "error", err, "function", logEntrypoint()) ctxLogger.Error("Failed to get gRPC client", "error", err, "function", logEntrypoint())
return nil, err return nil, err