package tracing import ( "context" "fmt" "math" "net" "net/http" "strings" "sync" "time" jaegerpropagator "go.opentelemetry.io/contrib/propagators/jaeger" "go.opentelemetry.io/contrib/samplers/jaegerremote" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/jaeger" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/sdk/resource" tracesdk "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.17.0" trace "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace/noop" "github.com/go-kit/log/level" "github.com/grafana/grafana/pkg/infra/log" ) const ( envJaegerAgentHost = "JAEGER_AGENT_HOST" envJaegerAgentPort = "JAEGER_AGENT_PORT" ) const ( jaegerExporter string = "jaeger" otlpExporter string = "otlp" noopExporter string = "noop" jaegerPropagator string = "jaeger" w3cPropagator string = "w3c" ) type TracingService struct { cfg *TracingConfig log log.Logger tracerProvider tracerProvider trace.Tracer } type tracerProvider interface { trace.TracerProvider Shutdown(ctx context.Context) error } // Tracer defines the service used to create new spans. type Tracer interface { trace.Tracer // Inject adds identifying information for the span to the // headers defined in [http.Header] map (this mutates http.Header). // // Implementation quirk: Where OpenTelemetry is used, the [Span] is // picked up from [context.Context] and for OpenTracing the // information passed as [Span] is preferred. // Both the context and span must be derived from the same call to // [Tracer.Start]. Inject(context.Context, http.Header, trace.Span) } func ProvideService(tracingCfg *TracingConfig) (*TracingService, error) { if tracingCfg == nil { return nil, fmt.Errorf("tracingCfg cannot be nil") } log.RegisterContextualLogProvider(func(ctx context.Context) ([]any, bool) { if traceID := TraceIDFromContext(ctx, false); traceID != "" { return []any{"traceID", traceID}, true } return nil, false }) ots := &TracingService{ cfg: tracingCfg, log: log.New("tracing"), } if err := ots.initOpentelemetryTracer(); err != nil { return nil, err } return ots, nil } func (ots *TracingService) GetTracerProvider() tracerProvider { return ots.tracerProvider } func TraceIDFromContext(ctx context.Context, requireSampled bool) string { spanCtx := trace.SpanContextFromContext(ctx) if !spanCtx.HasTraceID() || !spanCtx.IsValid() || (requireSampled && !spanCtx.IsSampled()) { return "" } return spanCtx.TraceID().String() } type noopTracerProvider struct { trace.TracerProvider } func (noopTracerProvider) Shutdown(ctx context.Context) error { return nil } func (ots *TracingService) initJaegerTracerProvider() (*tracesdk.TracerProvider, error) { var ep jaeger.EndpointOption // Create the Jaeger exporter: address can be either agent address (host:port) or collector URL if strings.HasPrefix(ots.cfg.Address, "http://") || strings.HasPrefix(ots.cfg.Address, "https://") { ots.log.Debug("using jaeger collector", "address", ots.cfg.Address) ep = jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(ots.cfg.Address)) } else if host, port, err := net.SplitHostPort(ots.cfg.Address); err == nil { ots.log.Debug("using jaeger agent", "host", host, "port", port) ep = jaeger.WithAgentEndpoint(jaeger.WithAgentHost(host), jaeger.WithAgentPort(port), jaeger.WithMaxPacketSize(64000)) } else { return nil, fmt.Errorf("invalid tracer address: %s", ots.cfg.Address) } exp, err := jaeger.New(ep) if err != nil { return nil, err } res, err := resource.New( context.Background(), resource.WithAttributes( // TODO: why are these attributes different from ones added to the // OTLP provider? semconv.ServiceNameKey.String(ots.cfg.ServiceName), attribute.String("environment", "production"), ), resource.WithAttributes(ots.cfg.CustomAttribs...), ) if err != nil { return nil, err } sampler, err := ots.initSampler() if err != nil { return nil, err } tp := tracesdk.NewTracerProvider( tracesdk.WithBatcher(exp), tracesdk.WithResource(res), tracesdk.WithSampler(sampler), ) return tp, nil } func (ots *TracingService) initOTLPTracerProvider() (*tracesdk.TracerProvider, error) { client := otlptracegrpc.NewClient(otlptracegrpc.WithEndpoint(ots.cfg.Address), otlptracegrpc.WithInsecure()) exp, err := otlptrace.New(context.Background(), client) if err != nil { return nil, err } sampler, err := ots.initSampler() if err != nil { return nil, err } return initTracerProvider(exp, ots.cfg.ServiceName, ots.cfg.ServiceVersion, sampler, ots.cfg.CustomAttribs...) } func (ots *TracingService) initSampler() (tracesdk.Sampler, error) { switch ots.cfg.Sampler { case "const", "": if ots.cfg.SamplerParam >= 1 { return tracesdk.AlwaysSample(), nil } else if ots.cfg.SamplerParam <= 0 { return tracesdk.NeverSample(), nil } return nil, fmt.Errorf("invalid param for const sampler - must be 0 or 1: %f", ots.cfg.SamplerParam) case "probabilistic": return tracesdk.TraceIDRatioBased(ots.cfg.SamplerParam), nil case "rateLimiting": return newRateLimiter(ots.cfg.SamplerParam), nil case "remote": return jaegerremote.New("grafana", jaegerremote.WithSamplingServerURL(ots.cfg.SamplerRemoteURL), jaegerremote.WithInitialSampler(tracesdk.TraceIDRatioBased(ots.cfg.SamplerParam)), ), nil default: return nil, fmt.Errorf("invalid sampler type: %s", ots.cfg.Sampler) } } func initTracerProvider(exp tracesdk.SpanExporter, serviceName string, serviceVersion string, sampler tracesdk.Sampler, customAttribs ...attribute.KeyValue) (*tracesdk.TracerProvider, error) { res, err := resource.New( context.Background(), resource.WithAttributes( semconv.ServiceNameKey.String(serviceName), semconv.ServiceVersionKey.String(serviceVersion), ), resource.WithAttributes(customAttribs...), resource.WithFromEnv(), resource.WithProcessRuntimeDescription(), resource.WithTelemetrySDK(), ) if err != nil { return nil, err } tp := tracesdk.NewTracerProvider( tracesdk.WithBatcher(exp), tracesdk.WithSampler(tracesdk.ParentBased(sampler)), tracesdk.WithResource(res), ) return tp, nil } func (ots *TracingService) initNoopTracerProvider() (tracerProvider, error) { return &noopTracerProvider{TracerProvider: noop.NewTracerProvider()}, nil } func (ots *TracingService) initOpentelemetryTracer() error { var tp tracerProvider var err error switch ots.cfg.enabled { case jaegerExporter: tp, err = ots.initJaegerTracerProvider() if err != nil { return err } case otlpExporter: tp, err = ots.initOTLPTracerProvider() if err != nil { return err } default: tp, err = ots.initNoopTracerProvider() if err != nil { return err } } // Register our TracerProvider as the global so any imported // instrumentation in the future will default to using it // only if tracing is enabled if ots.cfg.enabled != "" { otel.SetTracerProvider(tp) } propagators := []propagation.TextMapPropagator{} for _, p := range strings.Split(ots.cfg.Propagation, ",") { switch p { case w3cPropagator: propagators = append(propagators, propagation.TraceContext{}, propagation.Baggage{}) case jaegerPropagator: propagators = append(propagators, jaegerpropagator.Jaeger{}) case "": default: return fmt.Errorf("unsupported OpenTelemetry propagator: %q", p) } } switch len(propagators) { case 0: otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( propagation.TraceContext{}, propagation.Baggage{}, )) case 1: otel.SetTextMapPropagator(propagators[0]) default: otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagators...)) } if ots.tracerProvider == nil { ots.tracerProvider = tp } ots.Tracer = otel.GetTracerProvider().Tracer("component-main") return nil } func (ots *TracingService) Run(ctx context.Context) error { otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) { err = level.Error(ots.log).Log("msg", "OpenTelemetry handler returned an error", "err", err) if err != nil { ots.log.Error("OpenTelemetry log returning error", err) } })) <-ctx.Done() ots.log.Info("Closing tracing") if ots.tracerProvider == nil { return nil } ctxShutdown, cancel := context.WithTimeout(ctx, time.Second*5) defer cancel() if err := ots.tracerProvider.Shutdown(ctxShutdown); err != nil { return err } return nil } func (ots *TracingService) Inject(ctx context.Context, header http.Header, _ trace.Span) { otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(header)) } func (ots *TracingService) OtelTracer() trace.Tracer { return ots } type rateLimiter struct { sync.Mutex description string rps float64 balance float64 maxBalance float64 lastTick time.Time now func() time.Time } func newRateLimiter(rps float64) *rateLimiter { return &rateLimiter{ rps: rps, description: fmt.Sprintf("RateLimitingSampler{%g}", rps), balance: math.Max(rps, 1), maxBalance: math.Max(rps, 1), lastTick: time.Now(), now: time.Now, } } func (rl *rateLimiter) ShouldSample(p tracesdk.SamplingParameters) tracesdk.SamplingResult { rl.Lock() defer rl.Unlock() psc := trace.SpanContextFromContext(p.ParentContext) if rl.balance >= 1 { rl.balance -= 1 return tracesdk.SamplingResult{Decision: tracesdk.RecordAndSample, Tracestate: psc.TraceState()} } currentTime := rl.now() elapsedTime := currentTime.Sub(rl.lastTick).Seconds() rl.lastTick = currentTime rl.balance = math.Min(rl.maxBalance, rl.balance+elapsedTime*rl.rps) if rl.balance >= 1 { rl.balance -= 1 return tracesdk.SamplingResult{Decision: tracesdk.RecordAndSample, Tracestate: psc.TraceState()} } return tracesdk.SamplingResult{Decision: tracesdk.Drop, Tracestate: psc.TraceState()} } func (rl *rateLimiter) Description() string { return rl.description }