grafana/pkg/tsdb/tempo/grpc.go
Taylor Dean d5b21f77aa
Tempo: Fix grpc streaming support over pdc-agent (#89883)
* Tempo: Fix grpc streaming support over pdc-agent

* Fix a spelling error and formatting

* Ignore lint issue for reasons listed in source code comment

---------

Co-authored-by: Joey Tawadrous <joey.tawadrous@grafana.com>
2024-07-04 14:09:34 +01:00

170 lines
7.2 KiB
Go

package tempo
import (
"context"
"crypto/tls"
"encoding/base64"
"fmt"
"net"
"net/url"
"strings"
"google.golang.org/grpc/metadata"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
"github.com/grafana/tempo/pkg/tempopb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)
var logger = backend.NewLoggerWith("logger", "tsdb.tempo")
// newGrpcClient creates a new gRPC client to connect to a streaming query service.
// This uses the default google.golang.org/grpc library. One caveat to that is that it does not allow passing the
// default httpClient to the gRPC client. This means that we cannot use the same middleware that we use for
// standard HTTP requests.
// Using other library like connect-go isn't possible right now because Tempo uses non-standard proto compiler which
// makes generating different client difficult. See https://github.com/grafana/grafana/pull/81683
func newGrpcClient(ctx context.Context, settings backend.DataSourceInstanceSettings, opts httpclient.Options) (tempopb.StreamingQuerierClient, error) {
parsedUrl, err := url.Parse(settings.URL)
if err != nil {
logger.Error("Error parsing URL for gRPC client", "error", err, "URL", settings.URL, "function", logEntrypoint())
return nil, err
}
// Make sure we have some default port if none is set. This is required for gRPC to work.
onlyHost := parsedUrl.Host
if !strings.Contains(onlyHost, ":") {
if parsedUrl.Scheme == "http" {
onlyHost += ":80"
} else {
onlyHost += ":443"
}
}
dialOpts, err := getDialOpts(ctx, settings, opts)
if err != nil {
return nil, fmt.Errorf("error getting dial options: %w", err)
}
// grpc.Dial() is deprecated in favor of grpc.NewClient(), but grpc.NewClient() changed the default resolver to dns from passthrough.
// This is a problem because the getDialOpts() function appends a custom dialer to the dial options to support Grafana Cloud PDC.
//
// See the following quote from the grpc package documentation:
// One subtle difference between NewClient and Dial and DialContext is that the
// former uses "dns" as the default name resolver, while the latter use
// "passthrough" for backward compatibility. This distinction should not matter
// to most users, but could matter to legacy users that specify a custom dialer
// and expect it to receive the target string directly.
// https://github.com/grpc/grpc-go/blob/fa274d77904729c2893111ac292048d56dcf0bb1/clientconn.go#L209
//
// Unfortunately, the passthrough resolver isn't exported by the grpc package, so we can't use it.
// The options are to continue using grpc.Dial() or implement a custom resolver.
// Since the go-grpc package maintainers intend to continue supporting grpc.Dial() through the 1.x series,
// we'll continue using grpc.Dial() until we have a compelling reason or bandwidth to implement the custom resolver.
// Reference: https://github.com/grpc/grpc-go/blob/f199062ef31ddda54152e1ca5e3d15fb63903dc3/clientconn.go#L204
//
// See this issue for more information: https://github.com/grpc/grpc-go/issues/7091
// Ignore the lint check as this fails the build and for the reasons above.
// nolint:staticcheck
clientConn, err := grpc.Dial(onlyHost, dialOpts...)
if err != nil {
logger.Error("Error dialing gRPC client", "error", err, "URL", settings.URL, "function", logEntrypoint())
return nil, err
}
logger.Debug("Instantiating new gRPC client")
return tempopb.NewStreamingQuerierClient(clientConn), nil
}
// getDialOpts creates options and interceptors (middleware) this should roughly match what we do in
// http_client_provider.go for standard http requests.
func getDialOpts(ctx context.Context, settings backend.DataSourceInstanceSettings, opts httpclient.Options) ([]grpc.DialOption, error) {
// TODO: Missing middleware TracingMiddleware, DataSourceMetricsMiddleware, ContextualMiddleware,
// ResponseLimitMiddleware RedirectLimitMiddleware.
// Also User agent but that is set before each rpc call as for decoupled DS we have to get it from request context
// and cannot add it to client here.
var dialOps []grpc.DialOption
dialOps = append(dialOps, grpc.WithChainStreamInterceptor(CustomHeadersStreamInterceptor(opts)))
if settings.BasicAuthEnabled {
// If basic authentication is enabled, it uses TLS transport credentials and sets the basic authentication header for each RPC call.
dialOps = append(dialOps, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})))
dialOps = append(dialOps, grpc.WithPerRPCCredentials(&basicAuth{
Header: basicHeaderForAuth(opts.BasicAuth.User, opts.BasicAuth.Password),
}))
} else {
// Otherwise, it uses insecure credentials.
dialOps = append(dialOps, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
// The following code is required to make gRPC work with Grafana Cloud PDC
// (https://grafana.com/docs/grafana-cloud/connect-externally-hosted/private-data-source-connect/)
proxyClient, err := settings.ProxyClient(ctx)
if err != nil {
return nil, fmt.Errorf("proxy client cannot be retrieved, it is not possible to check if secure socks proxy is enabled: %w", err)
}
if proxyClient.SecureSocksProxyEnabled() { // secure socks proxy is behind a feature flag
dialer, err := proxyClient.NewSecureSocksProxyContextDialer()
if err != nil {
return nil, fmt.Errorf("failure in creating dialer: %w", err)
}
logger.Debug("gRPC dialer instantiated. Appending gRPC dialer to dial options")
dialOps = append(dialOps, grpc.WithContextDialer(func(ctx context.Context, host string) (net.Conn, error) {
logger.Debug("Dialing secure socks proxy", "host", host)
conn, err := dialer.Dial("tcp", host)
if err != nil {
return nil, fmt.Errorf("not possible to dial secure socks proxy: %w", err)
}
select {
case <-ctx.Done():
logger.Debug("Context canceled")
// We return `conn` anyway since we need to better test how context cancellation works
return conn, fmt.Errorf("context canceled: %w", err)
default:
return conn, nil
}
}))
}
logger.Debug("Returning dial options")
return dialOps, nil
}
// CustomHeadersStreamInterceptor adds custom headers to the outgoing context for each RPC call. Should work similar
// to the CustomHeadersMiddleware in the HTTP client provider.
func CustomHeadersStreamInterceptor(httpOpts httpclient.Options) grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
if len(httpOpts.Header) != 0 {
for key, value := range httpOpts.Header {
for _, v := range value {
ctx = metadata.AppendToOutgoingContext(ctx, key, v)
}
}
}
return streamer(ctx, desc, cc, method, opts...)
}
}
type basicAuth struct {
Header string
}
func (c *basicAuth) GetRequestMetadata(context.Context, ...string) (map[string]string, error) {
return map[string]string{
"Authorization": c.Header,
}, nil
}
func (c *basicAuth) RequireTransportSecurity() bool {
return true
}
func basicHeaderForAuth(username, password string) string {
return fmt.Sprintf("Basic %s", base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%s", username, password))))
}