mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Tempo: Add custom headers middleware for grpc client (#81693)
This commit is contained in:
parent
4f1f5636bb
commit
de4171862c
@ -8,6 +8,8 @@ import (
|
|||||||
"net/url"
|
"net/url"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"google.golang.org/grpc/metadata"
|
||||||
|
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
|
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
|
||||||
"github.com/grafana/tempo/pkg/tempopb"
|
"github.com/grafana/tempo/pkg/tempopb"
|
||||||
@ -18,11 +20,12 @@ import (
|
|||||||
|
|
||||||
var logger = backend.NewLoggerWith("logger", "tsdb.tempo")
|
var logger = backend.NewLoggerWith("logger", "tsdb.tempo")
|
||||||
|
|
||||||
// This function creates a new gRPC client to connect to a streaming query service.
|
// newGrpcClient creates a new gRPC client to connect to a streaming query service.
|
||||||
// It starts by parsing the URL from the data source settings and extracting the host, since that's what the gRPC connection expects.
|
// This uses the default google.golang.org/grpc library. One caveat to that is that it does not allow passing the
|
||||||
// If the URL does not contain a port number, it adds a default port based on the scheme (80 for HTTP and 443 for HTTPS).
|
// default httpClient to the gRPC client. This means that we cannot use the same middleware that we use for
|
||||||
// If basic authentication is enabled, it uses TLS transport credentials and sets the basic authentication header for each RPC call.
|
// standard HTTP requests.
|
||||||
// Otherwise, it uses insecure credentials.
|
// Using other library like connect-go isn't possible right now because Tempo uses non-standard proto compiler which
|
||||||
|
// makes generating different client difficult. See https://github.com/grafana/grafana/pull/81683
|
||||||
func newGrpcClient(settings backend.DataSourceInstanceSettings, opts httpclient.Options) (tempopb.StreamingQuerierClient, error) {
|
func newGrpcClient(settings backend.DataSourceInstanceSettings, opts httpclient.Options) (tempopb.StreamingQuerierClient, error) {
|
||||||
parsedUrl, err := url.Parse(settings.URL)
|
parsedUrl, err := url.Parse(settings.URL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -30,6 +33,7 @@ func newGrpcClient(settings backend.DataSourceInstanceSettings, opts httpclient.
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Make sure we have some default port if none is set. This is required for gRPC to work.
|
||||||
onlyHost := parsedUrl.Host
|
onlyHost := parsedUrl.Host
|
||||||
if !strings.Contains(onlyHost, ":") {
|
if !strings.Contains(onlyHost, ":") {
|
||||||
if parsedUrl.Scheme == "http" {
|
if parsedUrl.Scheme == "http" {
|
||||||
@ -39,17 +43,7 @@ func newGrpcClient(settings backend.DataSourceInstanceSettings, opts httpclient.
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var dialOps []grpc.DialOption
|
clientConn, err := grpc.Dial(onlyHost, getDialOpts(settings, opts)...)
|
||||||
if settings.BasicAuthEnabled {
|
|
||||||
dialOps = append(dialOps, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})))
|
|
||||||
dialOps = append(dialOps, grpc.WithPerRPCCredentials(&basicAuth{
|
|
||||||
Header: basicHeaderForAuth(opts.BasicAuth.User, opts.BasicAuth.Password),
|
|
||||||
}))
|
|
||||||
} else {
|
|
||||||
dialOps = append(dialOps, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
||||||
}
|
|
||||||
|
|
||||||
clientConn, err := grpc.Dial(onlyHost, dialOps...)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("Error dialing gRPC client", "error", err, "URL", settings.URL, "function", logEntrypoint())
|
logger.Error("Error dialing gRPC client", "error", err, "URL", settings.URL, "function", logEntrypoint())
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -58,6 +52,45 @@ func newGrpcClient(settings backend.DataSourceInstanceSettings, opts httpclient.
|
|||||||
return tempopb.NewStreamingQuerierClient(clientConn), nil
|
return tempopb.NewStreamingQuerierClient(clientConn), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getDialOpts creates options and interceptors (middleware) this should roughly match what we do in
|
||||||
|
// http_client_provider.go for standard http requests.
|
||||||
|
func getDialOpts(settings backend.DataSourceInstanceSettings, opts httpclient.Options) []grpc.DialOption {
|
||||||
|
// TODO: Missing middleware TracingMiddleware, DataSourceMetricsMiddleware, ContextualMiddleware,
|
||||||
|
// ResponseLimitMiddleware RedirectLimitMiddleware.
|
||||||
|
// Also User agent but that is set before each rpc call as for decoupled DS we have to get it from request context
|
||||||
|
// and cannot add it to client here.
|
||||||
|
|
||||||
|
var dialOps []grpc.DialOption
|
||||||
|
|
||||||
|
dialOps = append(dialOps, grpc.WithChainStreamInterceptor(CustomHeadersStreamInterceptor(opts)))
|
||||||
|
if settings.BasicAuthEnabled {
|
||||||
|
// If basic authentication is enabled, it uses TLS transport credentials and sets the basic authentication header for each RPC call.
|
||||||
|
dialOps = append(dialOps, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})))
|
||||||
|
dialOps = append(dialOps, grpc.WithPerRPCCredentials(&basicAuth{
|
||||||
|
Header: basicHeaderForAuth(opts.BasicAuth.User, opts.BasicAuth.Password),
|
||||||
|
}))
|
||||||
|
} else {
|
||||||
|
// Otherwise, it uses insecure credentials.
|
||||||
|
dialOps = append(dialOps, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||||
|
}
|
||||||
|
|
||||||
|
return dialOps
|
||||||
|
}
|
||||||
|
|
||||||
|
// CustomHeadersStreamInterceptor adds custom headers to the outgoing context for each RPC call. Should work similar
|
||||||
|
// to the CustomHeadersMiddleware in the HTTP client provider.
|
||||||
|
func CustomHeadersStreamInterceptor(httpOpts httpclient.Options) grpc.StreamClientInterceptor {
|
||||||
|
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||||
|
if len(httpOpts.Headers) != 0 {
|
||||||
|
for key, value := range httpOpts.Headers {
|
||||||
|
ctx = metadata.AppendToOutgoingContext(ctx, key, value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return streamer(ctx, desc, cc, method, opts...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type basicAuth struct {
|
type basicAuth struct {
|
||||||
Header string
|
Header string
|
||||||
}
|
}
|
||||||
|
@ -7,6 +7,8 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
|
"google.golang.org/grpc/metadata"
|
||||||
|
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
|
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||||
@ -60,6 +62,11 @@ func (s *Service) runSearchStream(ctx context.Context, req *backend.RunStreamReq
|
|||||||
sr.Start = uint32(backendQuery.TimeRange.From.Unix())
|
sr.Start = uint32(backendQuery.TimeRange.From.Unix())
|
||||||
sr.End = uint32(backendQuery.TimeRange.To.Unix())
|
sr.End = uint32(backendQuery.TimeRange.To.Unix())
|
||||||
|
|
||||||
|
// Setting the user agent for the gRPC call. When DS is decoupled we don't recreate instance when grafana config
|
||||||
|
// changes or updates, so we have to get it from context.
|
||||||
|
// Ideally this would be pushed higher, so it's set once for all rpc calls, but we have only one now.
|
||||||
|
ctx = metadata.AppendToOutgoingContext(ctx, "User-Agent", backend.UserAgentFromContext(ctx).String())
|
||||||
|
|
||||||
stream, err := datasource.StreamingClient.Search(ctx, sr)
|
stream, err := datasource.StreamingClient.Search(ctx, sr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
span.RecordError(err)
|
span.RecordError(err)
|
||||||
|
Loading…
Reference in New Issue
Block a user