mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
392 lines
11 KiB
Go
392 lines
11 KiB
Go
package tracing
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
"net"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
jaegerpropagator "go.opentelemetry.io/contrib/propagators/jaeger"
|
|
"go.opentelemetry.io/contrib/samplers/jaegerremote"
|
|
"go.opentelemetry.io/otel"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/codes"
|
|
"go.opentelemetry.io/otel/exporters/jaeger"
|
|
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
|
|
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
|
|
"go.opentelemetry.io/otel/propagation"
|
|
"go.opentelemetry.io/otel/sdk/resource"
|
|
tracesdk "go.opentelemetry.io/otel/sdk/trace"
|
|
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
|
|
trace "go.opentelemetry.io/otel/trace"
|
|
"go.opentelemetry.io/otel/trace/noop"
|
|
|
|
"github.com/go-kit/log/level"
|
|
|
|
"github.com/grafana/grafana/pkg/apimachinery/errutil"
|
|
"github.com/grafana/grafana/pkg/infra/log"
|
|
)
|
|
|
|
const (
|
|
envJaegerAgentHost = "JAEGER_AGENT_HOST"
|
|
envJaegerAgentPort = "JAEGER_AGENT_PORT"
|
|
)
|
|
|
|
const (
|
|
jaegerExporter string = "jaeger"
|
|
otlpExporter string = "otlp"
|
|
noopExporter string = "noop"
|
|
|
|
jaegerPropagator string = "jaeger"
|
|
w3cPropagator string = "w3c"
|
|
)
|
|
|
|
type TracingService struct {
|
|
cfg *TracingConfig
|
|
log log.Logger
|
|
|
|
tracerProvider tracerProvider
|
|
trace.Tracer
|
|
}
|
|
|
|
type tracerProvider interface {
|
|
trace.TracerProvider
|
|
|
|
Shutdown(ctx context.Context) error
|
|
}
|
|
|
|
// Tracer defines the service used to create new spans.
|
|
type Tracer interface {
|
|
trace.Tracer
|
|
|
|
// Inject adds identifying information for the span to the
|
|
// headers defined in [http.Header] map (this mutates http.Header).
|
|
//
|
|
// Implementation quirk: Where OpenTelemetry is used, the [Span] is
|
|
// picked up from [context.Context] and for OpenTracing the
|
|
// information passed as [Span] is preferred.
|
|
// Both the context and span must be derived from the same call to
|
|
// [Tracer.Start].
|
|
Inject(context.Context, http.Header, trace.Span)
|
|
}
|
|
|
|
func ProvideService(tracingCfg *TracingConfig) (*TracingService, error) {
|
|
if tracingCfg == nil {
|
|
return nil, fmt.Errorf("tracingCfg cannot be nil")
|
|
}
|
|
|
|
log.RegisterContextualLogProvider(func(ctx context.Context) ([]any, bool) {
|
|
if traceID := TraceIDFromContext(ctx, false); traceID != "" {
|
|
return []any{"traceID", traceID}, true
|
|
}
|
|
|
|
return nil, false
|
|
})
|
|
|
|
ots := &TracingService{
|
|
cfg: tracingCfg,
|
|
log: log.New("tracing"),
|
|
}
|
|
|
|
if err := ots.initOpentelemetryTracer(); err != nil {
|
|
return nil, err
|
|
}
|
|
return ots, nil
|
|
}
|
|
|
|
func (ots *TracingService) GetTracerProvider() tracerProvider {
|
|
return ots.tracerProvider
|
|
}
|
|
|
|
func TraceIDFromContext(ctx context.Context, requireSampled bool) string {
|
|
spanCtx := trace.SpanContextFromContext(ctx)
|
|
if !spanCtx.HasTraceID() || !spanCtx.IsValid() || (requireSampled && !spanCtx.IsSampled()) {
|
|
return ""
|
|
}
|
|
|
|
return spanCtx.TraceID().String()
|
|
}
|
|
|
|
// Error sets the status to error and record the error as an exception in the provided span.
|
|
func Error(span trace.Span, err error) error {
|
|
attr := []attribute.KeyValue{}
|
|
grafanaErr := errutil.Error{}
|
|
if errors.As(err, &grafanaErr) {
|
|
attr = append(attr, attribute.String("message_id", grafanaErr.MessageID))
|
|
}
|
|
|
|
span.SetStatus(codes.Error, err.Error())
|
|
span.RecordError(err, trace.WithAttributes(attr...))
|
|
return err
|
|
}
|
|
|
|
// Errorf wraps fmt.Errorf and also sets the status to error and record the error as an exception in the provided span.
|
|
func Errorf(span trace.Span, format string, args ...any) error {
|
|
err := fmt.Errorf(format, args...)
|
|
return Error(span, err)
|
|
}
|
|
|
|
type noopTracerProvider struct {
|
|
trace.TracerProvider
|
|
}
|
|
|
|
func (noopTracerProvider) Shutdown(ctx context.Context) error {
|
|
return nil
|
|
}
|
|
|
|
func (ots *TracingService) initJaegerTracerProvider() (*tracesdk.TracerProvider, error) {
|
|
var ep jaeger.EndpointOption
|
|
// Create the Jaeger exporter: address can be either agent address (host:port) or collector URL
|
|
if strings.HasPrefix(ots.cfg.Address, "http://") || strings.HasPrefix(ots.cfg.Address, "https://") {
|
|
ots.log.Debug("using jaeger collector", "address", ots.cfg.Address)
|
|
ep = jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(ots.cfg.Address))
|
|
} else if host, port, err := net.SplitHostPort(ots.cfg.Address); err == nil {
|
|
ots.log.Debug("using jaeger agent", "host", host, "port", port)
|
|
ep = jaeger.WithAgentEndpoint(jaeger.WithAgentHost(host), jaeger.WithAgentPort(port), jaeger.WithMaxPacketSize(64000))
|
|
} else {
|
|
return nil, fmt.Errorf("invalid tracer address: %s", ots.cfg.Address)
|
|
}
|
|
exp, err := jaeger.New(ep)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
res, err := resource.New(
|
|
context.Background(),
|
|
resource.WithAttributes(
|
|
// TODO: why are these attributes different from ones added to the
|
|
// OTLP provider?
|
|
semconv.ServiceNameKey.String(ots.cfg.ServiceName),
|
|
attribute.String("environment", "production"),
|
|
),
|
|
resource.WithAttributes(ots.cfg.CustomAttribs...),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
sampler, err := ots.initSampler()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
tp := tracesdk.NewTracerProvider(
|
|
tracesdk.WithBatcher(exp),
|
|
tracesdk.WithResource(res),
|
|
tracesdk.WithSampler(sampler),
|
|
)
|
|
|
|
return tp, nil
|
|
}
|
|
|
|
func (ots *TracingService) initOTLPTracerProvider() (*tracesdk.TracerProvider, error) {
|
|
client := otlptracegrpc.NewClient(otlptracegrpc.WithEndpoint(ots.cfg.Address), otlptracegrpc.WithInsecure())
|
|
exp, err := otlptrace.New(context.Background(), client)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
sampler, err := ots.initSampler()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return initTracerProvider(exp, ots.cfg.ServiceName, ots.cfg.ServiceVersion, sampler, ots.cfg.CustomAttribs...)
|
|
}
|
|
|
|
func (ots *TracingService) initSampler() (tracesdk.Sampler, error) {
|
|
switch ots.cfg.Sampler {
|
|
case "const", "":
|
|
if ots.cfg.SamplerParam >= 1 {
|
|
return tracesdk.AlwaysSample(), nil
|
|
} else if ots.cfg.SamplerParam <= 0 {
|
|
return tracesdk.NeverSample(), nil
|
|
}
|
|
|
|
return nil, fmt.Errorf("invalid param for const sampler - must be 0 or 1: %f", ots.cfg.SamplerParam)
|
|
case "probabilistic":
|
|
return tracesdk.TraceIDRatioBased(ots.cfg.SamplerParam), nil
|
|
case "rateLimiting":
|
|
return newRateLimiter(ots.cfg.SamplerParam), nil
|
|
case "remote":
|
|
return jaegerremote.New("grafana",
|
|
jaegerremote.WithSamplingServerURL(ots.cfg.SamplerRemoteURL),
|
|
jaegerremote.WithInitialSampler(tracesdk.TraceIDRatioBased(ots.cfg.SamplerParam)),
|
|
), nil
|
|
default:
|
|
return nil, fmt.Errorf("invalid sampler type: %s", ots.cfg.Sampler)
|
|
}
|
|
}
|
|
|
|
func initTracerProvider(exp tracesdk.SpanExporter, serviceName string, serviceVersion string, sampler tracesdk.Sampler, customAttribs ...attribute.KeyValue) (*tracesdk.TracerProvider, error) {
|
|
res, err := resource.New(
|
|
context.Background(),
|
|
resource.WithAttributes(
|
|
semconv.ServiceNameKey.String(serviceName),
|
|
semconv.ServiceVersionKey.String(serviceVersion),
|
|
),
|
|
resource.WithAttributes(customAttribs...),
|
|
resource.WithFromEnv(),
|
|
resource.WithProcessRuntimeDescription(),
|
|
resource.WithTelemetrySDK(),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
tp := tracesdk.NewTracerProvider(
|
|
tracesdk.WithBatcher(exp),
|
|
tracesdk.WithSampler(tracesdk.ParentBased(sampler)),
|
|
tracesdk.WithResource(res),
|
|
)
|
|
return tp, nil
|
|
}
|
|
|
|
func (ots *TracingService) initNoopTracerProvider() (tracerProvider, error) {
|
|
return &noopTracerProvider{TracerProvider: noop.NewTracerProvider()}, nil
|
|
}
|
|
|
|
func (ots *TracingService) initOpentelemetryTracer() error {
|
|
var tp tracerProvider
|
|
var err error
|
|
switch ots.cfg.enabled {
|
|
case jaegerExporter:
|
|
tp, err = ots.initJaegerTracerProvider()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
case otlpExporter:
|
|
tp, err = ots.initOTLPTracerProvider()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
default:
|
|
tp, err = ots.initNoopTracerProvider()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if ots.cfg.ProfilingIntegration {
|
|
tp = NewProfilingTracerProvider(tp)
|
|
}
|
|
|
|
// Register our TracerProvider as the global so any imported
|
|
// instrumentation in the future will default to using it
|
|
// only if tracing is enabled
|
|
if ots.cfg.enabled != "" {
|
|
otel.SetTracerProvider(tp)
|
|
}
|
|
|
|
propagators := []propagation.TextMapPropagator{}
|
|
for _, p := range strings.Split(ots.cfg.Propagation, ",") {
|
|
switch p {
|
|
case w3cPropagator:
|
|
propagators = append(propagators, propagation.TraceContext{}, propagation.Baggage{})
|
|
case jaegerPropagator:
|
|
propagators = append(propagators, jaegerpropagator.Jaeger{})
|
|
case "":
|
|
default:
|
|
return fmt.Errorf("unsupported OpenTelemetry propagator: %q", p)
|
|
}
|
|
}
|
|
|
|
switch len(propagators) {
|
|
case 0:
|
|
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
|
|
propagation.TraceContext{}, propagation.Baggage{},
|
|
))
|
|
case 1:
|
|
otel.SetTextMapPropagator(propagators[0])
|
|
default:
|
|
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagators...))
|
|
}
|
|
|
|
if ots.tracerProvider == nil {
|
|
ots.tracerProvider = tp
|
|
}
|
|
|
|
ots.Tracer = otel.GetTracerProvider().Tracer("component-main")
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ots *TracingService) Run(ctx context.Context) error {
|
|
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
|
|
err = level.Error(ots.log).Log("msg", "OpenTelemetry handler returned an error", "err", err)
|
|
if err != nil {
|
|
ots.log.Error("OpenTelemetry log returning error", err)
|
|
}
|
|
}))
|
|
<-ctx.Done()
|
|
|
|
ots.log.Info("Closing tracing")
|
|
if ots.tracerProvider == nil {
|
|
return nil
|
|
}
|
|
ctxShutdown, cancel := context.WithTimeout(ctx, time.Second*5)
|
|
defer cancel()
|
|
|
|
if err := ots.tracerProvider.Shutdown(ctxShutdown); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ots *TracingService) Inject(ctx context.Context, header http.Header, _ trace.Span) {
|
|
otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(header))
|
|
}
|
|
|
|
func (ots *TracingService) OtelTracer() trace.Tracer {
|
|
return ots
|
|
}
|
|
|
|
type rateLimiter struct {
|
|
sync.Mutex
|
|
description string
|
|
rps float64
|
|
balance float64
|
|
maxBalance float64
|
|
lastTick time.Time
|
|
|
|
now func() time.Time
|
|
}
|
|
|
|
func newRateLimiter(rps float64) *rateLimiter {
|
|
return &rateLimiter{
|
|
rps: rps,
|
|
description: fmt.Sprintf("RateLimitingSampler{%g}", rps),
|
|
balance: math.Max(rps, 1),
|
|
maxBalance: math.Max(rps, 1),
|
|
lastTick: time.Now(),
|
|
now: time.Now,
|
|
}
|
|
}
|
|
|
|
func (rl *rateLimiter) ShouldSample(p tracesdk.SamplingParameters) tracesdk.SamplingResult {
|
|
rl.Lock()
|
|
defer rl.Unlock()
|
|
psc := trace.SpanContextFromContext(p.ParentContext)
|
|
if rl.balance >= 1 {
|
|
rl.balance -= 1
|
|
return tracesdk.SamplingResult{Decision: tracesdk.RecordAndSample, Tracestate: psc.TraceState()}
|
|
}
|
|
currentTime := rl.now()
|
|
elapsedTime := currentTime.Sub(rl.lastTick).Seconds()
|
|
rl.lastTick = currentTime
|
|
rl.balance = math.Min(rl.maxBalance, rl.balance+elapsedTime*rl.rps)
|
|
if rl.balance >= 1 {
|
|
rl.balance -= 1
|
|
return tracesdk.SamplingResult{Decision: tracesdk.RecordAndSample, Tracestate: psc.TraceState()}
|
|
}
|
|
return tracesdk.SamplingResult{Decision: tracesdk.Drop, Tracestate: psc.TraceState()}
|
|
}
|
|
|
|
func (rl *rateLimiter) Description() string { return rl.description }
|