diff --git a/go.mod b/go.mod index 5ec41b905c9..6a91e3d4a8a 100644 --- a/go.mod +++ b/go.mod @@ -81,7 +81,7 @@ require ( github.com/mattn/go-sqlite3 v1.14.16 github.com/matttproud/golang_protobuf_extensions v1.0.4 github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f - github.com/opentracing/opentracing-go v1.2.0 + github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/patrickmn/go-cache v2.1.0+incompatible github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect github.com/pkg/errors v0.9.1 @@ -95,7 +95,7 @@ require ( github.com/stretchr/testify v1.8.2 github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf github.com/ua-parser/uap-go v0.0.0-20211112212520-00c877edfe0f - github.com/uber/jaeger-client-go v2.29.1+incompatible + github.com/uber/jaeger-client-go v2.29.1+incompatible // indirect github.com/urfave/cli/v2 v2.24.4 github.com/vectordotdev/go-datemath v0.1.1-0.20220323213446-f3954d0b18ae github.com/yalue/merged_fs v1.2.2 @@ -271,6 +271,7 @@ require ( github.com/redis/go-redis/v9 v9.0.2 github.com/weaveworks/common v0.0.0-20230208133027-16871410fca4 github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f + go.opentelemetry.io/contrib/samplers/jaegerremote v0.9.0 ) require ( diff --git a/go.sum b/go.sum index 76dbc1af720..cc84fbc5f8a 100644 --- a/go.sum +++ b/go.sum @@ -2428,6 +2428,8 @@ go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0. go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.21.0/go.mod h1:JQAtechjxLEL81EjmbRwxBq/XEzGaHcsPuDHAx54hg4= go.opentelemetry.io/contrib/propagators/jaeger v1.15.0 h1:xdJjwy5t/8I+TZehMMQ+r2h50HREihH2oMUhimQ+jug= go.opentelemetry.io/contrib/propagators/jaeger v1.15.0/go.mod h1:tU0nwW4QTvKceNUP60/PQm0FI8zDSwey7gIFt3RR/yw= +go.opentelemetry.io/contrib/samplers/jaegerremote v0.9.0 h1:zRi6a8uX+cJGTPLXRPjFEzN27a26k5R7LiLK87ntXgg= +go.opentelemetry.io/contrib/samplers/jaegerremote v0.9.0/go.mod h1:pzJOLTppaPbiPjPZEwGRf0VWx6G07hhOqznjKXIMkEk= go.opentelemetry.io/contrib/zpages v0.0.0-20210722161726-7668016acb73/go.mod h1:NAkejuYm41lpyL43Fu1XdnCOYxN5NVV80/MJ03JQ/X8= go.opentelemetry.io/otel v1.0.0-RC1/go.mod h1:x9tRa9HK4hSSq7jf2TKbqFbtt58/TGk0f9XiEYISI1I= go.opentelemetry.io/otel v1.0.0/go.mod h1:AjRVh9A5/5DE7S+mZtTR6t8vpKKryam+0lREnfmS4cg= diff --git a/pkg/infra/tracing/opentelemetry_tracing.go b/pkg/infra/tracing/opentelemetry_tracing.go deleted file mode 100644 index 8535233b8eb..00000000000 --- a/pkg/infra/tracing/opentelemetry_tracing.go +++ /dev/null @@ -1,338 +0,0 @@ -package tracing - -import ( - "context" - "fmt" - "net/http" - "strings" - "time" - - "github.com/go-kit/log/level" - "go.etcd.io/etcd/api/v3/version" - jaegerpropagator "go.opentelemetry.io/contrib/propagators/jaeger" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/exporters/jaeger" - "go.opentelemetry.io/otel/exporters/otlp/otlptrace" - "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" - "go.opentelemetry.io/otel/propagation" - "go.opentelemetry.io/otel/sdk/resource" - tracesdk "go.opentelemetry.io/otel/sdk/trace" - semconv "go.opentelemetry.io/otel/semconv/v1.4.0" - trace "go.opentelemetry.io/otel/trace" - - "github.com/grafana/grafana/pkg/infra/log" - "github.com/grafana/grafana/pkg/setting" -) - -const ( - jaegerExporter string = "jaeger" - otlpExporter string = "otlp" - noopExporter string = "noop" - - jaegerPropagator string = "jaeger" - w3cPropagator string = "w3c" -) - -type Opentelemetry struct { - Enabled string - Address string - Propagation string - customAttribs []attribute.KeyValue - - log log.Logger - - tracerProvider tracerProvider - tracer trace.Tracer - - Cfg *setting.Cfg -} - -type tracerProvider interface { - trace.TracerProvider - - Shutdown(ctx context.Context) error -} - -type OpentelemetrySpan struct { - span trace.Span -} - -type EventValue struct { - Str string - Num int64 -} - -type otelErrHandler func(err error) - -func (o otelErrHandler) Handle(err error) { - o(err) -} - -type noopTracerProvider struct { - trace.TracerProvider -} - -func (noopTracerProvider) Shutdown(ctx context.Context) error { - return nil -} - -func (ots *Opentelemetry) parseSettingsOpentelemetry() error { - section := ots.Cfg.Raw.Section("tracing.opentelemetry") - var err error - ots.customAttribs, err = splitCustomAttribs(section.Key("custom_attributes").MustString("")) - if err != nil { - return err - } - - section = ots.Cfg.Raw.Section("tracing.opentelemetry.jaeger") - ots.Enabled = noopExporter - - ots.Address = section.Key("address").MustString("") - ots.Propagation = section.Key("propagation").MustString("") - if ots.Address != "" { - ots.Enabled = jaegerExporter - return nil - } - - section = ots.Cfg.Raw.Section("tracing.opentelemetry.otlp") - ots.Address = section.Key("address").MustString("") - if ots.Address != "" { - ots.Enabled = otlpExporter - } - ots.Propagation = section.Key("propagation").MustString("") - return nil -} - -func (ots *Opentelemetry) OTelExporterEnabled() bool { - return ots.Enabled == otlpExporter -} - -func splitCustomAttribs(s string) ([]attribute.KeyValue, error) { - res := []attribute.KeyValue{} - - attribs := strings.Split(s, ",") - for _, v := range attribs { - parts := strings.SplitN(v, ":", 2) - if len(parts) > 1 { - res = append(res, attribute.String(parts[0], parts[1])) - } else if v != "" { - return nil, fmt.Errorf("custom attribute malformed - must be in 'key:value' form: %q", v) - } - } - - return res, nil -} - -func (ots *Opentelemetry) initJaegerTracerProvider() (*tracesdk.TracerProvider, error) { - // Create the Jaeger exporter - exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(ots.Address))) - if err != nil { - return nil, err - } - - res, err := resource.New( - context.Background(), - resource.WithAttributes( - // TODO: why are these attributes different from ones added to the - // OTLP provider? - semconv.ServiceNameKey.String("grafana"), - attribute.String("environment", "production"), - ), - resource.WithAttributes(ots.customAttribs...), - ) - if err != nil { - return nil, err - } - - tp := tracesdk.NewTracerProvider( - tracesdk.WithBatcher(exp), - tracesdk.WithResource(res), - ) - - return tp, nil -} - -func (ots *Opentelemetry) initOTLPTracerProvider() (*tracesdk.TracerProvider, error) { - client := otlptracegrpc.NewClient(otlptracegrpc.WithEndpoint(ots.Address), otlptracegrpc.WithInsecure()) - exp, err := otlptrace.New(context.Background(), client) - if err != nil { - return nil, err - } - - return initTracerProvider(exp, ots.customAttribs...) -} - -func initTracerProvider(exp tracesdk.SpanExporter, customAttribs ...attribute.KeyValue) (*tracesdk.TracerProvider, error) { - res, err := resource.New( - context.Background(), - resource.WithAttributes( - semconv.ServiceNameKey.String("grafana"), - semconv.ServiceVersionKey.String(version.Version), - ), - resource.WithAttributes(customAttribs...), - resource.WithProcessRuntimeDescription(), - resource.WithTelemetrySDK(), - ) - if err != nil { - return nil, err - } - - tp := tracesdk.NewTracerProvider( - tracesdk.WithBatcher(exp), - tracesdk.WithSampler(tracesdk.ParentBased( - tracesdk.AlwaysSample(), - )), - tracesdk.WithResource(res), - ) - return tp, nil -} - -func (ots *Opentelemetry) initNoopTracerProvider() (tracerProvider, error) { - return &noopTracerProvider{TracerProvider: trace.NewNoopTracerProvider()}, nil -} - -func (ots *Opentelemetry) initOpentelemetryTracer() error { - var tp tracerProvider - var err error - switch ots.Enabled { - case jaegerExporter: - tp, err = ots.initJaegerTracerProvider() - if err != nil { - return err - } - case otlpExporter: - tp, err = ots.initOTLPTracerProvider() - if err != nil { - return err - } - default: - tp, err = ots.initNoopTracerProvider() - if err != nil { - return err - } - } - - // Register our TracerProvider as the global so any imported - // instrumentation in the future will default to using it - // only if tracing is enabled - if ots.Enabled != "" { - otel.SetTracerProvider(tp) - } - - propagators := []propagation.TextMapPropagator{} - for _, p := range strings.Split(ots.Propagation, ",") { - switch p { - case w3cPropagator: - propagators = append(propagators, propagation.TraceContext{}, propagation.Baggage{}) - case jaegerPropagator: - propagators = append(propagators, jaegerpropagator.Jaeger{}) - case "": - default: - return fmt.Errorf("unsupported OpenTelemetry propagator: %q", p) - } - } - - switch len(propagators) { - case 0: - otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( - propagation.TraceContext{}, propagation.Baggage{}, - )) - case 1: - otel.SetTextMapPropagator(propagators[0]) - default: - otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagators...)) - } - - if ots.tracerProvider == nil { - ots.tracerProvider = tp - } - - ots.tracer = otel.GetTracerProvider().Tracer("component-main") - - return nil -} - -func (ots *Opentelemetry) Run(ctx context.Context) error { - otel.SetErrorHandler(otelErrHandler(func(err error) { - err = level.Error(ots.log).Log("msg", "OpenTelemetry handler returned an error", "err", err) - if err != nil { - ots.log.Error("OpenTelemetry log returning error", err) - } - })) - <-ctx.Done() - - ots.log.Info("Closing tracing") - if ots.tracerProvider == nil { - return nil - } - ctxShutdown, cancel := context.WithTimeout(ctx, time.Second*5) - defer cancel() - - if err := ots.tracerProvider.Shutdown(ctxShutdown); err != nil { - return err - } - - return nil -} - -func (ots *Opentelemetry) Start(ctx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, Span) { - ctx, span := ots.tracer.Start(ctx, spanName, opts...) - opentelemetrySpan := OpentelemetrySpan{ - span: span, - } - - if traceID := span.SpanContext().TraceID(); traceID.IsValid() { - ctx = context.WithValue(ctx, traceKey{}, traceValue{traceID.String(), span.SpanContext().IsSampled()}) - } - - return ctx, opentelemetrySpan -} - -func (ots *Opentelemetry) Inject(ctx context.Context, header http.Header, _ Span) { - otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(header)) -} - -func (s OpentelemetrySpan) End() { - s.span.End() -} - -func (s OpentelemetrySpan) SetAttributes(key string, value interface{}, kv attribute.KeyValue) { - s.span.SetAttributes(kv) -} - -func (s OpentelemetrySpan) SetName(name string) { - s.span.SetName(name) -} - -func (s OpentelemetrySpan) SetStatus(code codes.Code, description string) { - s.span.SetStatus(code, description) -} - -func (s OpentelemetrySpan) RecordError(err error, options ...trace.EventOption) { - s.span.RecordError(err, options...) -} - -func (s OpentelemetrySpan) AddEvents(keys []string, values []EventValue) { - for i, v := range values { - if v.Str != "" { - s.span.AddEvent(keys[i], trace.WithAttributes(attribute.Key(keys[i]).String(v.Str))) - } - if v.Num != 0 { - s.span.AddEvent(keys[i], trace.WithAttributes(attribute.Key(keys[i]).Int64(v.Num))) - } - } -} - -func (s OpentelemetrySpan) contextWithSpan(ctx context.Context) context.Context { - if s.span != nil { - ctx = trace.ContextWithSpan(ctx, s.span) - // Grafana also manages its own separate traceID in the context in addition to what opentracing handles. - // It's derived from the span. Ensure that we propagate this too. - if traceID := s.span.SpanContext().TraceID(); traceID.IsValid() { - ctx = context.WithValue(ctx, traceKey{}, traceValue{traceID.String(), s.span.SpanContext().IsSampled()}) - } - } - return ctx -} diff --git a/pkg/infra/tracing/optentelemetry_tracing_test.go b/pkg/infra/tracing/optentelemetry_tracing_test.go deleted file mode 100644 index 292025e300e..00000000000 --- a/pkg/infra/tracing/optentelemetry_tracing_test.go +++ /dev/null @@ -1,89 +0,0 @@ -package tracing - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "go.opentelemetry.io/otel/attribute" - - "github.com/grafana/grafana/pkg/setting" -) - -func TestSplitCustomAttribs(t *testing.T) { - tests := []struct { - input string - expected []attribute.KeyValue - }{ - { - input: "key1:value:1", - expected: []attribute.KeyValue{attribute.String("key1", "value:1")}, - }, - { - input: "key1:value1,key2:value2", - expected: []attribute.KeyValue{ - attribute.String("key1", "value1"), - attribute.String("key2", "value2"), - }, - }, - { - input: "", - expected: []attribute.KeyValue{}, - }, - } - - for _, test := range tests { - attribs, err := splitCustomAttribs(test.input) - assert.NoError(t, err) - assert.EqualValues(t, test.expected, attribs) - } -} - -func TestSplitCustomAttribs_Malformed(t *testing.T) { - tests := []struct { - input string - expected []attribute.KeyValue - }{ - {input: "key1=value1"}, - {input: "key1"}, - } - - for _, test := range tests { - _, err := splitCustomAttribs(test.input) - assert.Error(t, err) - } -} - -func TestOptentelemetry_ParseSettingsOpentelemetry(t *testing.T) { - cfg := setting.NewCfg() - otel := &Opentelemetry{Cfg: cfg} - - otelsect := cfg.Raw.Section("tracing.opentelemetry") - jaegersect := cfg.Raw.Section("tracing.opentelemetry.jaeger") - otlpsect := cfg.Raw.Section("tracing.opentelemetry.otlp") - - assert.NoError(t, otel.parseSettingsOpentelemetry()) - assert.Equal(t, noopExporter, otel.Enabled) - - otelsect.Key("custom_attributes") - assert.NoError(t, otel.parseSettingsOpentelemetry()) - assert.Empty(t, otel.customAttribs) - - otelsect.Key("custom_attributes").SetValue("key1:value1,key2:value2") - assert.NoError(t, otel.parseSettingsOpentelemetry()) - expected := []attribute.KeyValue{ - attribute.String("key1", "value1"), - attribute.String("key2", "value2"), - } - assert.Equal(t, expected, otel.customAttribs) - - jaegersect.Key("address").SetValue("somehost:6831") - assert.NoError(t, otel.parseSettingsOpentelemetry()) - assert.Equal(t, "somehost:6831", otel.Address) - assert.Equal(t, jaegerExporter, otel.Enabled) - - jaegersect.Key("address").SetValue("") - otlpsect.Key("address").SetValue("somehost:4317") - assert.NoError(t, otel.parseSettingsOpentelemetry()) - assert.Equal(t, "somehost:4317", otel.Address) - assert.Equal(t, otlpExporter, otel.Enabled) -} diff --git a/pkg/infra/tracing/tracing.go b/pkg/infra/tracing/tracing.go index 145f540b1a4..ed12e9f7ba5 100644 --- a/pkg/infra/tracing/tracing.go +++ b/pkg/infra/tracing/tracing.go @@ -3,22 +3,30 @@ package tracing import ( "context" "fmt" - "io" + "math" + "net" "net/http" "os" "strings" + "sync" + "time" - opentracing "github.com/opentracing/opentracing-go" - "github.com/opentracing/opentracing-go/ext" - ol "github.com/opentracing/opentracing-go/log" - "github.com/uber/jaeger-client-go" - jaegercfg "github.com/uber/jaeger-client-go/config" - "github.com/uber/jaeger-client-go/zipkin" + "go.etcd.io/etcd/api/v3/version" + jaegerpropagator "go.opentelemetry.io/contrib/propagators/jaeger" + "go.opentelemetry.io/contrib/samplers/jaegerremote" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/exporters/jaeger" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + tracesdk "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" trace "go.opentelemetry.io/otel/trace" - "github.com/grafana/grafana/pkg/cmd/grafana-cli/logger" + "github.com/go-kit/log/level" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/setting" ) @@ -28,6 +36,48 @@ const ( envJaegerAgentPort = "JAEGER_AGENT_PORT" ) +const ( + jaegerExporter string = "jaeger" + otlpExporter string = "otlp" + noopExporter string = "noop" + + jaegerPropagator string = "jaeger" + w3cPropagator string = "w3c" +) + +type Opentelemetry struct { + enabled string + Address string + Propagation string + customAttribs []attribute.KeyValue + + sampler string + samplerParam float64 + samplerRemoteURL string + + log log.Logger + + tracerProvider tracerProvider + tracer trace.Tracer + + Cfg *setting.Cfg +} + +type tracerProvider interface { + trace.TracerProvider + + Shutdown(ctx context.Context) error +} + +type OpentelemetrySpan struct { + span trace.Span +} + +type EventValue struct { + Str string + Num int64 +} + // Tracer defines the service used to create new spans. type Tracer interface { // Run implements registry.BackgroundService. @@ -83,7 +133,7 @@ type Span interface { } func ProvideService(cfg *setting.Cfg) (Tracer, error) { - ts, ots, err := parseSettings(cfg) + ots, err := ParseSettings(cfg) if err != nil { return nil, err } @@ -95,43 +145,18 @@ func ProvideService(cfg *setting.Cfg) (Tracer, error) { return nil, false }) - - if ts.enabled { - return ts, ts.initJaegerGlobalTracer() + if err := ots.initOpentelemetryTracer(); err != nil { + return nil, err } - - return ots, ots.initOpentelemetryTracer() + return ots, nil } -func parseSettings(cfg *setting.Cfg) (*Opentracing, *Opentelemetry, error) { - ts, err := parseSettingsOpentracing(cfg) - if err != nil { - return ts, nil, err - } - ots, err := ParseSettingsOpentelemetry(cfg) - return ts, ots, err -} - -func parseSettingsOpentracing(cfg *setting.Cfg) (*Opentracing, error) { - ts := &Opentracing{ - Cfg: cfg, - log: log.New("tracing"), - } - if err := ts.parseSettings(); err != nil { - return ts, err - } - if ts.enabled { - cfg.Logger.Warn("[Deprecated] the configuration setting 'tracing.jaeger' is deprecated, please use 'tracing.opentelemetry.jaeger' instead") - } - return ts, nil -} - -func ParseSettingsOpentelemetry(cfg *setting.Cfg) (*Opentelemetry, error) { +func ParseSettings(cfg *setting.Cfg) (*Opentelemetry, error) { ots := &Opentelemetry{ Cfg: cfg, log: log.New("tracing"), } - err := ots.parseSettingsOpentelemetry() + err := ots.parseSettings() return ots, err } @@ -153,10 +178,6 @@ func TraceIDFromContext(c context.Context, requireSampled bool) string { // SpanFromContext returns the Span previously associated with ctx, or nil, if no such span could be found. // It is the equivalent of opentracing.SpanFromContext and trace.SpanFromContext. func SpanFromContext(ctx context.Context) Span { - // Look for both opentracing and opentelemetry spans. - if span := opentracing.SpanFromContext(ctx); span != nil { - return OpentracingSpan{span: span} - } if span := trace.SpanFromContext(ctx); span != nil { return OpentelemetrySpan{span: span} } @@ -173,219 +194,345 @@ func ContextWithSpan(ctx context.Context, span Span) context.Context { return ctx } -type Opentracing struct { - enabled bool - address string - customTags map[string]string - samplerType string - samplerParam float64 - samplingServerURL string - log log.Logger - closer io.Closer - zipkinPropagation bool - disableSharedZipkinSpans bool - - Cfg *setting.Cfg +type noopTracerProvider struct { + trace.TracerProvider } -type OpentracingSpan struct { - span opentracing.Span -} - -func (ts *Opentracing) parseSettings() error { - var section, err = ts.Cfg.Raw.GetSection("tracing.jaeger") - if err != nil { - return err - } - - ts.address = section.Key("address").MustString("") - if ts.address == "" { - host := os.Getenv(envJaegerAgentHost) - port := os.Getenv(envJaegerAgentPort) - if host != "" || port != "" { - ts.address = fmt.Sprintf("%s:%s", host, port) - } - } - if ts.address != "" { - ts.enabled = true - } - - ts.customTags = splitTagSettings(section.Key("always_included_tag").MustString("")) - ts.samplerType = section.Key("sampler_type").MustString("") - ts.samplerParam = section.Key("sampler_param").MustFloat64(1) - ts.zipkinPropagation = section.Key("zipkin_propagation").MustBool(false) - ts.disableSharedZipkinSpans = section.Key("disable_shared_zipkin_spans").MustBool(false) - ts.samplingServerURL = section.Key("sampling_server_url").MustString("") +func (noopTracerProvider) Shutdown(ctx context.Context) error { return nil } -func (ts *Opentracing) initJaegerCfg() (jaegercfg.Configuration, error) { - cfg := jaegercfg.Configuration{ - ServiceName: "grafana", - Disabled: !ts.enabled, - Sampler: &jaegercfg.SamplerConfig{ - Type: ts.samplerType, - Param: ts.samplerParam, - SamplingServerURL: ts.samplingServerURL, - }, - Reporter: &jaegercfg.ReporterConfig{ - LogSpans: false, - LocalAgentHostPort: ts.address, - }, - } - - _, err := cfg.FromEnv() - if err != nil { - return cfg, err - } - return cfg, nil -} - -func (ts *Opentracing) initJaegerGlobalTracer() error { - cfg, err := ts.initJaegerCfg() - if err != nil { - return err - } - - jLogger := &jaegerLogWrapper{logger: log.New("jaeger")} - - options := []jaegercfg.Option{} - options = append(options, jaegercfg.Logger(jLogger)) - - for tag, value := range ts.customTags { - options = append(options, jaegercfg.Tag(tag, value)) - } - - if ts.zipkinPropagation { - zipkinPropagator := zipkin.NewZipkinB3HTTPHeaderPropagator() - options = append(options, - jaegercfg.Injector(opentracing.HTTPHeaders, zipkinPropagator), - jaegercfg.Extractor(opentracing.HTTPHeaders, zipkinPropagator), - ) - - if !ts.disableSharedZipkinSpans { - options = append(options, jaegercfg.ZipkinSharedRPCSpan(true)) +func (ots *Opentelemetry) parseSettings() error { + legacyAddress, legacyTags := "", "" + if section, err := ots.Cfg.Raw.GetSection("tracing.jaeger"); err == nil { + legacyAddress = section.Key("address").MustString("") + if legacyAddress == "" { + host, port := os.Getenv(envJaegerAgentHost), os.Getenv(envJaegerAgentPort) + if host != "" || port != "" { + legacyAddress = fmt.Sprintf("%s:%s", host, port) + } } + legacyTags = section.Key("always_included_tag").MustString("") + ots.sampler = section.Key("sampler_type").MustString("") + ots.samplerParam = section.Key("sampler_param").MustFloat64(1) + ots.samplerRemoteURL = section.Key("sampling_server_url").MustString("") } - - tracer, closer, err := cfg.NewTracer(options...) + section := ots.Cfg.Raw.Section("tracing.opentelemetry") + var err error + // we default to legacy tag set (attributes) if the new config format is absent + ots.customAttribs, err = splitCustomAttribs(section.Key("custom_attributes").MustString(legacyTags)) if err != nil { return err } - opentracing.SetGlobalTracer(tracer) + section = ots.Cfg.Raw.Section("tracing.opentelemetry.jaeger") + ots.enabled = noopExporter - ts.closer = closer + // we default to legacy Jaeger agent address if the new config value is empty + ots.Address = section.Key("address").MustString(legacyAddress) + ots.Propagation = section.Key("propagation").MustString("") + if ots.Address != "" { + ots.enabled = jaegerExporter + return nil + } + + section = ots.Cfg.Raw.Section("tracing.opentelemetry.otlp") + ots.Address = section.Key("address").MustString("") + if ots.Address != "" { + ots.enabled = otlpExporter + } + ots.Propagation = section.Key("propagation").MustString("") return nil } -func (ts *Opentracing) Run(ctx context.Context) error { +func (ots *Opentelemetry) OTelExporterEnabled() bool { + return ots.enabled == otlpExporter +} + +func splitCustomAttribs(s string) ([]attribute.KeyValue, error) { + res := []attribute.KeyValue{} + + attribs := strings.Split(s, ",") + for _, v := range attribs { + parts := strings.SplitN(v, ":", 2) + if len(parts) > 1 { + res = append(res, attribute.String(parts[0], parts[1])) + } else if v != "" { + return nil, fmt.Errorf("custom attribute malformed - must be in 'key:value' form: %q", v) + } + } + + return res, nil +} + +func (ots *Opentelemetry) initJaegerTracerProvider() (*tracesdk.TracerProvider, error) { + var ep jaeger.EndpointOption + // Create the Jaeger exporter: address can be either agent address (host:port) or collector URL + if host, port, err := net.SplitHostPort(ots.Address); err == nil { + ep = jaeger.WithAgentEndpoint(jaeger.WithAgentHost(host), jaeger.WithAgentPort(port)) + } else { + ep = jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(ots.Address)) + } + exp, err := jaeger.New(ep) + if err != nil { + return nil, err + } + + res, err := resource.New( + context.Background(), + resource.WithAttributes( + // TODO: why are these attributes different from ones added to the + // OTLP provider? + semconv.ServiceNameKey.String("grafana"), + attribute.String("environment", "production"), + ), + resource.WithAttributes(ots.customAttribs...), + ) + if err != nil { + return nil, err + } + + sampler := tracesdk.AlwaysSample() + if ots.sampler == "const" || ots.sampler == "probabilistic" { + sampler = tracesdk.TraceIDRatioBased(ots.samplerParam) + } else if ots.sampler == "rateLimiting" { + sampler = newRateLimiter(ots.samplerParam) + } else if ots.sampler == "remote" { + sampler = jaegerremote.New("grafana", jaegerremote.WithSamplingServerURL(ots.samplerRemoteURL), + jaegerremote.WithInitialSampler(tracesdk.TraceIDRatioBased(ots.samplerParam))) + } else if ots.sampler != "" { + return nil, fmt.Errorf("invalid sampler type: %s", ots.sampler) + } + + tp := tracesdk.NewTracerProvider( + tracesdk.WithBatcher(exp), + tracesdk.WithResource(res), + tracesdk.WithSampler(sampler), + ) + + return tp, nil +} + +func (ots *Opentelemetry) initOTLPTracerProvider() (*tracesdk.TracerProvider, error) { + client := otlptracegrpc.NewClient(otlptracegrpc.WithEndpoint(ots.Address), otlptracegrpc.WithInsecure()) + exp, err := otlptrace.New(context.Background(), client) + if err != nil { + return nil, err + } + + return initTracerProvider(exp, ots.customAttribs...) +} + +func initTracerProvider(exp tracesdk.SpanExporter, customAttribs ...attribute.KeyValue) (*tracesdk.TracerProvider, error) { + res, err := resource.New( + context.Background(), + resource.WithAttributes( + semconv.ServiceNameKey.String("grafana"), + semconv.ServiceVersionKey.String(version.Version), + ), + resource.WithAttributes(customAttribs...), + resource.WithProcessRuntimeDescription(), + resource.WithTelemetrySDK(), + ) + if err != nil { + return nil, err + } + + tp := tracesdk.NewTracerProvider( + tracesdk.WithBatcher(exp), + tracesdk.WithSampler(tracesdk.ParentBased( + tracesdk.AlwaysSample(), + )), + tracesdk.WithResource(res), + ) + return tp, nil +} + +func (ots *Opentelemetry) initNoopTracerProvider() (tracerProvider, error) { + return &noopTracerProvider{TracerProvider: trace.NewNoopTracerProvider()}, nil +} + +func (ots *Opentelemetry) initOpentelemetryTracer() error { + var tp tracerProvider + var err error + switch ots.enabled { + case jaegerExporter: + tp, err = ots.initJaegerTracerProvider() + if err != nil { + return err + } + case otlpExporter: + tp, err = ots.initOTLPTracerProvider() + if err != nil { + return err + } + default: + tp, err = ots.initNoopTracerProvider() + if err != nil { + return err + } + } + + // Register our TracerProvider as the global so any imported + // instrumentation in the future will default to using it + // only if tracing is enabled + if ots.enabled != "" { + otel.SetTracerProvider(tp) + } + + propagators := []propagation.TextMapPropagator{} + for _, p := range strings.Split(ots.Propagation, ",") { + switch p { + case w3cPropagator: + propagators = append(propagators, propagation.TraceContext{}, propagation.Baggage{}) + case jaegerPropagator: + propagators = append(propagators, jaegerpropagator.Jaeger{}) + case "": + default: + return fmt.Errorf("unsupported OpenTelemetry propagator: %q", p) + } + } + + switch len(propagators) { + case 0: + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, propagation.Baggage{}, + )) + case 1: + otel.SetTextMapPropagator(propagators[0]) + default: + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagators...)) + } + + if ots.tracerProvider == nil { + ots.tracerProvider = tp + } + + ots.tracer = otel.GetTracerProvider().Tracer("component-main") + + return nil +} + +func (ots *Opentelemetry) Run(ctx context.Context) error { + otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) { + err = level.Error(ots.log).Log("msg", "OpenTelemetry handler returned an error", "err", err) + if err != nil { + ots.log.Error("OpenTelemetry log returning error", err) + } + })) <-ctx.Done() - if ts.closer != nil { - ts.log.Info("Closing tracing") - return ts.closer.Close() + ots.log.Info("Closing tracing") + if ots.tracerProvider == nil { + return nil + } + ctxShutdown, cancel := context.WithTimeout(ctx, time.Second*5) + defer cancel() + + if err := ots.tracerProvider.Shutdown(ctxShutdown); err != nil { + return err } return nil } -func (ts *Opentracing) Start(ctx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, Span) { - span, ctx := opentracing.StartSpanFromContext(ctx, spanName) - opentracingSpan := OpentracingSpan{span: span} - if sctx, ok := span.Context().(jaeger.SpanContext); ok { - ctx = context.WithValue(ctx, traceKey{}, traceValue{sctx.TraceID().String(), sctx.IsSampled()}) +func (ots *Opentelemetry) Start(ctx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, Span) { + ctx, span := ots.tracer.Start(ctx, spanName, opts...) + opentelemetrySpan := OpentelemetrySpan{ + span: span, } - return ctx, opentracingSpan -} -func (ts *Opentracing) Inject(ctx context.Context, header http.Header, span Span) { - opentracingSpan, ok := span.(OpentracingSpan) - if !ok { - logger.Error("Failed to cast opentracing span") + if traceID := span.SpanContext().TraceID(); traceID.IsValid() { + ctx = context.WithValue(ctx, traceKey{}, traceValue{traceID.String(), span.SpanContext().IsSampled()}) } - err := opentracing.GlobalTracer().Inject( - opentracingSpan.span.Context(), - opentracing.HTTPHeaders, - opentracing.HTTPHeadersCarrier(header)) - if err != nil { - logger.Error("Failed to inject span context instance", "err", err) - } + return ctx, opentelemetrySpan } -func (s OpentracingSpan) End() { - s.span.Finish() +func (ots *Opentelemetry) Inject(ctx context.Context, header http.Header, _ Span) { + otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(header)) } -func (s OpentracingSpan) SetAttributes(key string, value interface{}, kv attribute.KeyValue) { - s.span.SetTag(key, value) +func (s OpentelemetrySpan) End() { + s.span.End() } -func (s OpentracingSpan) SetName(name string) { - s.span.SetOperationName(name) +func (s OpentelemetrySpan) SetAttributes(key string, value interface{}, kv attribute.KeyValue) { + s.span.SetAttributes(kv) } -func (s OpentracingSpan) SetStatus(code codes.Code, description string) { - if code == codes.Error { - ext.Error.Set(s.span, true) - } +func (s OpentelemetrySpan) SetName(name string) { + s.span.SetName(name) } -func (s OpentracingSpan) RecordError(err error, options ...trace.EventOption) { - ext.Error.Set(s.span, true) +func (s OpentelemetrySpan) SetStatus(code codes.Code, description string) { + s.span.SetStatus(code, description) } -func (s OpentracingSpan) AddEvents(keys []string, values []EventValue) { - fields := []ol.Field{} +func (s OpentelemetrySpan) RecordError(err error, options ...trace.EventOption) { + s.span.RecordError(err, options...) +} + +func (s OpentelemetrySpan) AddEvents(keys []string, values []EventValue) { for i, v := range values { if v.Str != "" { - field := ol.String(keys[i], v.Str) - fields = append(fields, field) + s.span.AddEvent(keys[i], trace.WithAttributes(attribute.Key(keys[i]).String(v.Str))) } if v.Num != 0 { - field := ol.Int64(keys[i], v.Num) - fields = append(fields, field) + s.span.AddEvent(keys[i], trace.WithAttributes(attribute.Key(keys[i]).Int64(v.Num))) } } - s.span.LogFields(fields...) } -func (s OpentracingSpan) contextWithSpan(ctx context.Context) context.Context { +func (s OpentelemetrySpan) contextWithSpan(ctx context.Context) context.Context { if s.span != nil { - ctx = opentracing.ContextWithSpan(ctx, s.span) + ctx = trace.ContextWithSpan(ctx, s.span) // Grafana also manages its own separate traceID in the context in addition to what opentracing handles. // It's derived from the span. Ensure that we propagate this too. - if sctx, ok := s.span.Context().(jaeger.SpanContext); ok { - ctx = context.WithValue(ctx, traceKey{}, traceValue{sctx.TraceID().String(), sctx.IsSampled()}) + if traceID := s.span.SpanContext().TraceID(); traceID.IsValid() { + ctx = context.WithValue(ctx, traceKey{}, traceValue{traceID.String(), s.span.SpanContext().IsSampled()}) } } return ctx } -func splitTagSettings(input string) map[string]string { - res := map[string]string{} +type rateLimiter struct { + sync.Mutex + rps float64 + balance float64 + maxBalance float64 + lastTick time.Time - tags := strings.Split(input, ",") - for _, v := range tags { - kv := strings.Split(v, ":") - if len(kv) > 1 { - res[kv[0]] = kv[1] - } + now func() time.Time +} + +func newRateLimiter(rps float64) *rateLimiter { + return &rateLimiter{ + rps: rps, + balance: math.Max(rps, 1), + maxBalance: math.Max(rps, 1), + lastTick: time.Now(), + now: time.Now, } - - return res } -type jaegerLogWrapper struct { - logger log.Logger +func (rl *rateLimiter) ShouldSample(p tracesdk.SamplingParameters) tracesdk.SamplingResult { + rl.Lock() + defer rl.Unlock() + psc := trace.SpanContextFromContext(p.ParentContext) + if rl.balance >= 1 { + rl.balance -= 1 + return tracesdk.SamplingResult{Decision: tracesdk.RecordAndSample, Tracestate: psc.TraceState()} + } + currentTime := rl.now() + elapsedTime := currentTime.Sub(rl.lastTick).Seconds() + rl.lastTick = currentTime + rl.balance = math.Min(rl.maxBalance, rl.balance+elapsedTime*rl.rps) + if rl.balance >= 1 { + rl.balance -= 1 + return tracesdk.SamplingResult{Decision: tracesdk.RecordAndSample, Tracestate: psc.TraceState()} + } + return tracesdk.SamplingResult{Decision: tracesdk.Drop, Tracestate: psc.TraceState()} } -func (jlw *jaegerLogWrapper) Error(msg string) { - jlw.logger.Error(msg) -} - -func (jlw *jaegerLogWrapper) Infof(format string, args ...interface{}) { - msg := fmt.Sprintf(format, args...) - jlw.logger.Info(msg) -} +func (rl *rateLimiter) Description() string { return "RateLimitingSampler" } diff --git a/pkg/infra/tracing/tracing_test.go b/pkg/infra/tracing/tracing_test.go index 6c2a01b4c45..17bc34bcc72 100644 --- a/pkg/infra/tracing/tracing_test.go +++ b/pkg/infra/tracing/tracing_test.go @@ -5,137 +5,163 @@ import ( "testing" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" "github.com/grafana/grafana/pkg/setting" ) -func TestGroupSplit(t *testing.T) { +// TODO(zserge) Add proper tests for opentelemetry + +func TestSplitCustomAttribs(t *testing.T) { tests := []struct { input string - expected map[string]string + expected []attribute.KeyValue }{ { - input: "tag1:value1,tag2:value2", - expected: map[string]string{ - "tag1": "value1", - "tag2": "value2", + input: "key1:value:1", + expected: []attribute.KeyValue{attribute.String("key1", "value:1")}, + }, + { + input: "key1:value1,key2:value2", + expected: []attribute.KeyValue{ + attribute.String("key1", "value1"), + attribute.String("key2", "value2"), }, }, { input: "", - expected: map[string]string{}, - }, - { - input: "tag1", - expected: map[string]string{}, + expected: []attribute.KeyValue{}, }, } for _, test := range tests { - tags := splitTagSettings(test.input) - for k, v := range test.expected { - value, exists := tags[k] - assert.Truef(t, exists, "Tag %q not found for input %q", k, test.input) - assert.Equalf(t, v, value, "Tag %q has wrong value for input %q", k, test.input) - } + attribs, err := splitCustomAttribs(test.input) + assert.NoError(t, err) + assert.EqualValues(t, test.expected, attribs) } } -func TestInitJaegerCfg_Default(t *testing.T) { - ts := &Opentracing{} - cfg, err := ts.initJaegerCfg() - require.NoError(t, err) +func TestSplitCustomAttribs_Malformed(t *testing.T) { + tests := []struct { + input string + }{ + {input: "key1=value1"}, + {input: "key1"}, + } - assert.True(t, cfg.Disabled) + for _, test := range tests { + _, err := splitCustomAttribs(test.input) + assert.Error(t, err) + } } -func TestInitJaegerCfg_Enabled(t *testing.T) { - ts := &Opentracing{enabled: true} - cfg, err := ts.initJaegerCfg() - require.NoError(t, err) - - assert.False(t, cfg.Disabled) - assert.Equal(t, "localhost:6831", cfg.Reporter.LocalAgentHostPort) -} - -func TestInitJaegerCfg_DisabledViaEnv(t *testing.T) { - err := os.Setenv("JAEGER_DISABLED", "true") - require.NoError(t, err) - defer func() { - err := os.Unsetenv("JAEGER_DISABLED") - require.NoError(t, err) - }() - - ts := &Opentracing{enabled: true} - cfg, err := ts.initJaegerCfg() - require.NoError(t, err) - - assert.True(t, cfg.Disabled) -} - -func TestInitJaegerCfg_EnabledViaEnv(t *testing.T) { - err := os.Setenv("JAEGER_DISABLED", "false") - require.NoError(t, err) - defer func() { - err := os.Unsetenv("JAEGER_DISABLED") - require.NoError(t, err) - }() - - ts := &Opentracing{enabled: false} - cfg, err := ts.initJaegerCfg() - require.NoError(t, err) - - assert.False(t, cfg.Disabled) -} - -func TestInitJaegerCfg_InvalidEnvVar(t *testing.T) { - err := os.Setenv("JAEGER_DISABLED", "totallybogus") - require.NoError(t, err) - defer func() { - err := os.Unsetenv("JAEGER_DISABLED") - require.NoError(t, err) - }() - - ts := &Opentracing{} - _, err = ts.initJaegerCfg() - require.EqualError(t, err, "cannot parse env var JAEGER_DISABLED=totallybogus: strconv.ParseBool: parsing \"totallybogus\": invalid syntax") -} - -func TestInitJaegerCfg_EnabledViaHost(t *testing.T) { - require.NoError(t, os.Setenv("JAEGER_AGENT_HOST", "example.com")) - defer func() { - require.NoError(t, os.Unsetenv("JAEGER_AGENT_HOST")) - }() - - cfg := setting.NewCfg() - ts := &Opentracing{Cfg: cfg} - _, err := ts.Cfg.Raw.NewSection("tracing.jaeger") - require.NoError(t, err) - require.NoError(t, ts.parseSettings()) - jaegerCfg, err := ts.initJaegerCfg() - require.NoError(t, err) - - assert.False(t, jaegerCfg.Disabled) - assert.Equal(t, "example.com:6831", jaegerCfg.Reporter.LocalAgentHostPort) -} - -func TestInitJaegerCfg_EnabledViaHostPort(t *testing.T) { - require.NoError(t, os.Setenv("JAEGER_AGENT_HOST", "example.com")) - require.NoError(t, os.Setenv("JAEGER_AGENT_PORT", "12345")) - defer func() { - require.NoError(t, os.Unsetenv("JAEGER_AGENT_HOST")) - require.NoError(t, os.Unsetenv("JAEGER_AGENT_PORT")) - }() - - cfg := setting.NewCfg() - ts := &Opentracing{Cfg: cfg} - _, err := ts.Cfg.Raw.NewSection("tracing.jaeger") - require.NoError(t, err) - require.NoError(t, ts.parseSettings()) - jaegerCfg, err := ts.initJaegerCfg() - require.NoError(t, err) - - assert.False(t, jaegerCfg.Disabled) - assert.Equal(t, "example.com:12345", jaegerCfg.Reporter.LocalAgentHostPort) +func TestTracingConfig(t *testing.T) { + for _, test := range []struct { + Name string + Cfg string + Env map[string]string + ExpectedExporter string + ExpectedAddress string + ExpectedPropagator string + ExpectedAttrs []attribute.KeyValue + }{ + { + Name: "default config uses noop exporter", + Cfg: "", + ExpectedExporter: noopExporter, + ExpectedAttrs: []attribute.KeyValue{}, + }, + { + Name: "custom attributes are parsed", + Cfg: ` + [tracing.opentelemetry] + custom_attributes = key1:value1,key2:value2 + `, + ExpectedExporter: noopExporter, + ExpectedAttrs: []attribute.KeyValue{attribute.String("key1", "value1"), attribute.String("key2", "value2")}, + }, + { + Name: "jaeger address is parsed", + Cfg: ` + [tracing.opentelemetry.jaeger] + address = jaeger.example.com:6831 + `, + ExpectedExporter: jaegerExporter, + ExpectedAddress: "jaeger.example.com:6831", + ExpectedAttrs: []attribute.KeyValue{}, + }, + { + Name: "OTLP address is parsed", + Cfg: ` + [tracing.opentelemetry.otlp] + address = otlp.example.com:4317 + `, + ExpectedExporter: otlpExporter, + ExpectedAddress: "otlp.example.com:4317", + ExpectedAttrs: []attribute.KeyValue{}, + }, + { + Name: "legacy config format is supported", + Cfg: ` + [tracing.jaeger] + address = jaeger.example.com:6831 + `, + ExpectedExporter: jaegerExporter, + ExpectedAddress: "jaeger.example.com:6831", + ExpectedAttrs: []attribute.KeyValue{}, + }, + { + Name: "legacy env variables are supproted", + Cfg: `[tracing.jaeger]`, + Env: map[string]string{ + "JAEGER_AGENT_HOST": "example.com", + "JAEGER_AGENT_PORT": "12345", + }, + ExpectedExporter: jaegerExporter, + ExpectedAddress: "example.com:12345", + ExpectedAttrs: []attribute.KeyValue{}, + }, + { + Name: "opentelemetry config format is prioritised over legacy jaeger", + Cfg: ` + [tracing.jaeger] + address = foo.com:6831 + custom_tags = a:b + [tracing.opentelemetry] + custom_attributes = c:d + [tracing.opentelemetry.jaeger] + address = bar.com:6831 + `, + ExpectedExporter: jaegerExporter, + ExpectedAddress: "bar.com:6831", + ExpectedAttrs: []attribute.KeyValue{attribute.String("c", "d")}, + }, + } { + t.Run(test.Name, func(t *testing.T) { + // export envioronment variables + if test.Env != nil { + for k, v := range test.Env { + assert.NoError(t, os.Setenv(k, v)) + } + defer func() { + for k := range test.Env { + assert.NoError(t, os.Unsetenv(k)) + } + }() + } + // parse config sections + cfg := setting.NewCfg() + err := cfg.Raw.Append([]byte(test.Cfg)) + assert.NoError(t, err) + // create tracer + tracer, err := ProvideService(cfg) + assert.NoError(t, err) + // make sure tracker is properly configured + otel := tracer.(*Opentelemetry) + assert.Equal(t, test.ExpectedExporter, otel.enabled) + assert.Equal(t, test.ExpectedAddress, otel.Address) + assert.Equal(t, test.ExpectedPropagator, otel.Propagation) + assert.Equal(t, test.ExpectedAttrs, otel.customAttribs) + }) + } } diff --git a/pkg/services/pluginsintegration/config/tracing.go b/pkg/services/pluginsintegration/config/tracing.go index 1f5ad963a83..24e3230bd2b 100644 --- a/pkg/services/pluginsintegration/config/tracing.go +++ b/pkg/services/pluginsintegration/config/tracing.go @@ -11,7 +11,7 @@ import ( // newTracingCfg creates a plugins tracing configuration based on the provided Grafana tracing config. // If OpenTelemetry (OTLP) is disabled, a zero-value OpenTelemetryCfg is returned. func newTracingCfg(grafanaCfg *setting.Cfg) (pCfg.Tracing, error) { - ots, err := tracing.ParseSettingsOpentelemetry(grafanaCfg) + ots, err := tracing.ParseSettings(grafanaCfg) if err != nil { return pCfg.Tracing{}, fmt.Errorf("parse settings: %w", err) }