Chore: Remove opentracing and use opentelemetry instead (#67200)

* remove opentracing and use otel instead

* add various samplers for jaeger

* remove useless test that is covered in otel now

* we do not need a struct there

* remove old tests

* restore tests that parse various configurations

* check errors in tests

* Update pkg/infra/tracing/tracing_test.go

fix typo

Co-authored-by: Sofia Papagiannaki <1632407+papagian@users.noreply.github.com>

* add test for both legacy and new config formats

* use named constants

---------

Co-authored-by: Sofia Papagiannaki <1632407+papagian@users.noreply.github.com>
This commit is contained in:
Serge Zaitsev 2023-04-27 15:04:43 +02:00 committed by GitHub
parent 69f1116f59
commit 6d8f9c5bf4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 489 additions and 740 deletions

5
go.mod
View File

@ -81,7 +81,7 @@ require (
github.com/mattn/go-sqlite3 v1.14.16
github.com/matttproud/golang_protobuf_extensions v1.0.4
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f
github.com/opentracing/opentracing-go v1.2.0
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
github.com/pkg/errors v0.9.1
@ -95,7 +95,7 @@ require (
github.com/stretchr/testify v1.8.2
github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf
github.com/ua-parser/uap-go v0.0.0-20211112212520-00c877edfe0f
github.com/uber/jaeger-client-go v2.29.1+incompatible
github.com/uber/jaeger-client-go v2.29.1+incompatible // indirect
github.com/urfave/cli/v2 v2.24.4
github.com/vectordotdev/go-datemath v0.1.1-0.20220323213446-f3954d0b18ae
github.com/yalue/merged_fs v1.2.2
@ -271,6 +271,7 @@ require (
github.com/redis/go-redis/v9 v9.0.2
github.com/weaveworks/common v0.0.0-20230208133027-16871410fca4
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f
go.opentelemetry.io/contrib/samplers/jaegerremote v0.9.0
)
require (

2
go.sum
View File

@ -2428,6 +2428,8 @@ go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.21.0/go.mod h1:JQAtechjxLEL81EjmbRwxBq/XEzGaHcsPuDHAx54hg4=
go.opentelemetry.io/contrib/propagators/jaeger v1.15.0 h1:xdJjwy5t/8I+TZehMMQ+r2h50HREihH2oMUhimQ+jug=
go.opentelemetry.io/contrib/propagators/jaeger v1.15.0/go.mod h1:tU0nwW4QTvKceNUP60/PQm0FI8zDSwey7gIFt3RR/yw=
go.opentelemetry.io/contrib/samplers/jaegerremote v0.9.0 h1:zRi6a8uX+cJGTPLXRPjFEzN27a26k5R7LiLK87ntXgg=
go.opentelemetry.io/contrib/samplers/jaegerremote v0.9.0/go.mod h1:pzJOLTppaPbiPjPZEwGRf0VWx6G07hhOqznjKXIMkEk=
go.opentelemetry.io/contrib/zpages v0.0.0-20210722161726-7668016acb73/go.mod h1:NAkejuYm41lpyL43Fu1XdnCOYxN5NVV80/MJ03JQ/X8=
go.opentelemetry.io/otel v1.0.0-RC1/go.mod h1:x9tRa9HK4hSSq7jf2TKbqFbtt58/TGk0f9XiEYISI1I=
go.opentelemetry.io/otel v1.0.0/go.mod h1:AjRVh9A5/5DE7S+mZtTR6t8vpKKryam+0lREnfmS4cg=

View File

@ -1,338 +0,0 @@
package tracing
import (
"context"
"fmt"
"net/http"
"strings"
"time"
"github.com/go-kit/log/level"
"go.etcd.io/etcd/api/v3/version"
jaegerpropagator "go.opentelemetry.io/contrib/propagators/jaeger"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
trace "go.opentelemetry.io/otel/trace"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/setting"
)
const (
jaegerExporter string = "jaeger"
otlpExporter string = "otlp"
noopExporter string = "noop"
jaegerPropagator string = "jaeger"
w3cPropagator string = "w3c"
)
type Opentelemetry struct {
Enabled string
Address string
Propagation string
customAttribs []attribute.KeyValue
log log.Logger
tracerProvider tracerProvider
tracer trace.Tracer
Cfg *setting.Cfg
}
type tracerProvider interface {
trace.TracerProvider
Shutdown(ctx context.Context) error
}
type OpentelemetrySpan struct {
span trace.Span
}
type EventValue struct {
Str string
Num int64
}
type otelErrHandler func(err error)
func (o otelErrHandler) Handle(err error) {
o(err)
}
type noopTracerProvider struct {
trace.TracerProvider
}
func (noopTracerProvider) Shutdown(ctx context.Context) error {
return nil
}
func (ots *Opentelemetry) parseSettingsOpentelemetry() error {
section := ots.Cfg.Raw.Section("tracing.opentelemetry")
var err error
ots.customAttribs, err = splitCustomAttribs(section.Key("custom_attributes").MustString(""))
if err != nil {
return err
}
section = ots.Cfg.Raw.Section("tracing.opentelemetry.jaeger")
ots.Enabled = noopExporter
ots.Address = section.Key("address").MustString("")
ots.Propagation = section.Key("propagation").MustString("")
if ots.Address != "" {
ots.Enabled = jaegerExporter
return nil
}
section = ots.Cfg.Raw.Section("tracing.opentelemetry.otlp")
ots.Address = section.Key("address").MustString("")
if ots.Address != "" {
ots.Enabled = otlpExporter
}
ots.Propagation = section.Key("propagation").MustString("")
return nil
}
func (ots *Opentelemetry) OTelExporterEnabled() bool {
return ots.Enabled == otlpExporter
}
func splitCustomAttribs(s string) ([]attribute.KeyValue, error) {
res := []attribute.KeyValue{}
attribs := strings.Split(s, ",")
for _, v := range attribs {
parts := strings.SplitN(v, ":", 2)
if len(parts) > 1 {
res = append(res, attribute.String(parts[0], parts[1]))
} else if v != "" {
return nil, fmt.Errorf("custom attribute malformed - must be in 'key:value' form: %q", v)
}
}
return res, nil
}
func (ots *Opentelemetry) initJaegerTracerProvider() (*tracesdk.TracerProvider, error) {
// Create the Jaeger exporter
exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(ots.Address)))
if err != nil {
return nil, err
}
res, err := resource.New(
context.Background(),
resource.WithAttributes(
// TODO: why are these attributes different from ones added to the
// OTLP provider?
semconv.ServiceNameKey.String("grafana"),
attribute.String("environment", "production"),
),
resource.WithAttributes(ots.customAttribs...),
)
if err != nil {
return nil, err
}
tp := tracesdk.NewTracerProvider(
tracesdk.WithBatcher(exp),
tracesdk.WithResource(res),
)
return tp, nil
}
func (ots *Opentelemetry) initOTLPTracerProvider() (*tracesdk.TracerProvider, error) {
client := otlptracegrpc.NewClient(otlptracegrpc.WithEndpoint(ots.Address), otlptracegrpc.WithInsecure())
exp, err := otlptrace.New(context.Background(), client)
if err != nil {
return nil, err
}
return initTracerProvider(exp, ots.customAttribs...)
}
func initTracerProvider(exp tracesdk.SpanExporter, customAttribs ...attribute.KeyValue) (*tracesdk.TracerProvider, error) {
res, err := resource.New(
context.Background(),
resource.WithAttributes(
semconv.ServiceNameKey.String("grafana"),
semconv.ServiceVersionKey.String(version.Version),
),
resource.WithAttributes(customAttribs...),
resource.WithProcessRuntimeDescription(),
resource.WithTelemetrySDK(),
)
if err != nil {
return nil, err
}
tp := tracesdk.NewTracerProvider(
tracesdk.WithBatcher(exp),
tracesdk.WithSampler(tracesdk.ParentBased(
tracesdk.AlwaysSample(),
)),
tracesdk.WithResource(res),
)
return tp, nil
}
func (ots *Opentelemetry) initNoopTracerProvider() (tracerProvider, error) {
return &noopTracerProvider{TracerProvider: trace.NewNoopTracerProvider()}, nil
}
func (ots *Opentelemetry) initOpentelemetryTracer() error {
var tp tracerProvider
var err error
switch ots.Enabled {
case jaegerExporter:
tp, err = ots.initJaegerTracerProvider()
if err != nil {
return err
}
case otlpExporter:
tp, err = ots.initOTLPTracerProvider()
if err != nil {
return err
}
default:
tp, err = ots.initNoopTracerProvider()
if err != nil {
return err
}
}
// Register our TracerProvider as the global so any imported
// instrumentation in the future will default to using it
// only if tracing is enabled
if ots.Enabled != "" {
otel.SetTracerProvider(tp)
}
propagators := []propagation.TextMapPropagator{}
for _, p := range strings.Split(ots.Propagation, ",") {
switch p {
case w3cPropagator:
propagators = append(propagators, propagation.TraceContext{}, propagation.Baggage{})
case jaegerPropagator:
propagators = append(propagators, jaegerpropagator.Jaeger{})
case "":
default:
return fmt.Errorf("unsupported OpenTelemetry propagator: %q", p)
}
}
switch len(propagators) {
case 0:
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{}, propagation.Baggage{},
))
case 1:
otel.SetTextMapPropagator(propagators[0])
default:
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagators...))
}
if ots.tracerProvider == nil {
ots.tracerProvider = tp
}
ots.tracer = otel.GetTracerProvider().Tracer("component-main")
return nil
}
func (ots *Opentelemetry) Run(ctx context.Context) error {
otel.SetErrorHandler(otelErrHandler(func(err error) {
err = level.Error(ots.log).Log("msg", "OpenTelemetry handler returned an error", "err", err)
if err != nil {
ots.log.Error("OpenTelemetry log returning error", err)
}
}))
<-ctx.Done()
ots.log.Info("Closing tracing")
if ots.tracerProvider == nil {
return nil
}
ctxShutdown, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
if err := ots.tracerProvider.Shutdown(ctxShutdown); err != nil {
return err
}
return nil
}
func (ots *Opentelemetry) Start(ctx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, Span) {
ctx, span := ots.tracer.Start(ctx, spanName, opts...)
opentelemetrySpan := OpentelemetrySpan{
span: span,
}
if traceID := span.SpanContext().TraceID(); traceID.IsValid() {
ctx = context.WithValue(ctx, traceKey{}, traceValue{traceID.String(), span.SpanContext().IsSampled()})
}
return ctx, opentelemetrySpan
}
func (ots *Opentelemetry) Inject(ctx context.Context, header http.Header, _ Span) {
otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(header))
}
func (s OpentelemetrySpan) End() {
s.span.End()
}
func (s OpentelemetrySpan) SetAttributes(key string, value interface{}, kv attribute.KeyValue) {
s.span.SetAttributes(kv)
}
func (s OpentelemetrySpan) SetName(name string) {
s.span.SetName(name)
}
func (s OpentelemetrySpan) SetStatus(code codes.Code, description string) {
s.span.SetStatus(code, description)
}
func (s OpentelemetrySpan) RecordError(err error, options ...trace.EventOption) {
s.span.RecordError(err, options...)
}
func (s OpentelemetrySpan) AddEvents(keys []string, values []EventValue) {
for i, v := range values {
if v.Str != "" {
s.span.AddEvent(keys[i], trace.WithAttributes(attribute.Key(keys[i]).String(v.Str)))
}
if v.Num != 0 {
s.span.AddEvent(keys[i], trace.WithAttributes(attribute.Key(keys[i]).Int64(v.Num)))
}
}
}
func (s OpentelemetrySpan) contextWithSpan(ctx context.Context) context.Context {
if s.span != nil {
ctx = trace.ContextWithSpan(ctx, s.span)
// Grafana also manages its own separate traceID in the context in addition to what opentracing handles.
// It's derived from the span. Ensure that we propagate this too.
if traceID := s.span.SpanContext().TraceID(); traceID.IsValid() {
ctx = context.WithValue(ctx, traceKey{}, traceValue{traceID.String(), s.span.SpanContext().IsSampled()})
}
}
return ctx
}

View File

@ -1,89 +0,0 @@
package tracing
import (
"testing"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/attribute"
"github.com/grafana/grafana/pkg/setting"
)
func TestSplitCustomAttribs(t *testing.T) {
tests := []struct {
input string
expected []attribute.KeyValue
}{
{
input: "key1:value:1",
expected: []attribute.KeyValue{attribute.String("key1", "value:1")},
},
{
input: "key1:value1,key2:value2",
expected: []attribute.KeyValue{
attribute.String("key1", "value1"),
attribute.String("key2", "value2"),
},
},
{
input: "",
expected: []attribute.KeyValue{},
},
}
for _, test := range tests {
attribs, err := splitCustomAttribs(test.input)
assert.NoError(t, err)
assert.EqualValues(t, test.expected, attribs)
}
}
func TestSplitCustomAttribs_Malformed(t *testing.T) {
tests := []struct {
input string
expected []attribute.KeyValue
}{
{input: "key1=value1"},
{input: "key1"},
}
for _, test := range tests {
_, err := splitCustomAttribs(test.input)
assert.Error(t, err)
}
}
func TestOptentelemetry_ParseSettingsOpentelemetry(t *testing.T) {
cfg := setting.NewCfg()
otel := &Opentelemetry{Cfg: cfg}
otelsect := cfg.Raw.Section("tracing.opentelemetry")
jaegersect := cfg.Raw.Section("tracing.opentelemetry.jaeger")
otlpsect := cfg.Raw.Section("tracing.opentelemetry.otlp")
assert.NoError(t, otel.parseSettingsOpentelemetry())
assert.Equal(t, noopExporter, otel.Enabled)
otelsect.Key("custom_attributes")
assert.NoError(t, otel.parseSettingsOpentelemetry())
assert.Empty(t, otel.customAttribs)
otelsect.Key("custom_attributes").SetValue("key1:value1,key2:value2")
assert.NoError(t, otel.parseSettingsOpentelemetry())
expected := []attribute.KeyValue{
attribute.String("key1", "value1"),
attribute.String("key2", "value2"),
}
assert.Equal(t, expected, otel.customAttribs)
jaegersect.Key("address").SetValue("somehost:6831")
assert.NoError(t, otel.parseSettingsOpentelemetry())
assert.Equal(t, "somehost:6831", otel.Address)
assert.Equal(t, jaegerExporter, otel.Enabled)
jaegersect.Key("address").SetValue("")
otlpsect.Key("address").SetValue("somehost:4317")
assert.NoError(t, otel.parseSettingsOpentelemetry())
assert.Equal(t, "somehost:4317", otel.Address)
assert.Equal(t, otlpExporter, otel.Enabled)
}

View File

@ -3,22 +3,30 @@ package tracing
import (
"context"
"fmt"
"io"
"math"
"net"
"net/http"
"os"
"strings"
"sync"
"time"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
ol "github.com/opentracing/opentracing-go/log"
"github.com/uber/jaeger-client-go"
jaegercfg "github.com/uber/jaeger-client-go/config"
"github.com/uber/jaeger-client-go/zipkin"
"go.etcd.io/etcd/api/v3/version"
jaegerpropagator "go.opentelemetry.io/contrib/propagators/jaeger"
"go.opentelemetry.io/contrib/samplers/jaegerremote"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
trace "go.opentelemetry.io/otel/trace"
"github.com/grafana/grafana/pkg/cmd/grafana-cli/logger"
"github.com/go-kit/log/level"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/setting"
)
@ -28,6 +36,48 @@ const (
envJaegerAgentPort = "JAEGER_AGENT_PORT"
)
const (
jaegerExporter string = "jaeger"
otlpExporter string = "otlp"
noopExporter string = "noop"
jaegerPropagator string = "jaeger"
w3cPropagator string = "w3c"
)
type Opentelemetry struct {
enabled string
Address string
Propagation string
customAttribs []attribute.KeyValue
sampler string
samplerParam float64
samplerRemoteURL string
log log.Logger
tracerProvider tracerProvider
tracer trace.Tracer
Cfg *setting.Cfg
}
type tracerProvider interface {
trace.TracerProvider
Shutdown(ctx context.Context) error
}
type OpentelemetrySpan struct {
span trace.Span
}
type EventValue struct {
Str string
Num int64
}
// Tracer defines the service used to create new spans.
type Tracer interface {
// Run implements registry.BackgroundService.
@ -83,7 +133,7 @@ type Span interface {
}
func ProvideService(cfg *setting.Cfg) (Tracer, error) {
ts, ots, err := parseSettings(cfg)
ots, err := ParseSettings(cfg)
if err != nil {
return nil, err
}
@ -95,43 +145,18 @@ func ProvideService(cfg *setting.Cfg) (Tracer, error) {
return nil, false
})
if ts.enabled {
return ts, ts.initJaegerGlobalTracer()
if err := ots.initOpentelemetryTracer(); err != nil {
return nil, err
}
return ots, ots.initOpentelemetryTracer()
return ots, nil
}
func parseSettings(cfg *setting.Cfg) (*Opentracing, *Opentelemetry, error) {
ts, err := parseSettingsOpentracing(cfg)
if err != nil {
return ts, nil, err
}
ots, err := ParseSettingsOpentelemetry(cfg)
return ts, ots, err
}
func parseSettingsOpentracing(cfg *setting.Cfg) (*Opentracing, error) {
ts := &Opentracing{
Cfg: cfg,
log: log.New("tracing"),
}
if err := ts.parseSettings(); err != nil {
return ts, err
}
if ts.enabled {
cfg.Logger.Warn("[Deprecated] the configuration setting 'tracing.jaeger' is deprecated, please use 'tracing.opentelemetry.jaeger' instead")
}
return ts, nil
}
func ParseSettingsOpentelemetry(cfg *setting.Cfg) (*Opentelemetry, error) {
func ParseSettings(cfg *setting.Cfg) (*Opentelemetry, error) {
ots := &Opentelemetry{
Cfg: cfg,
log: log.New("tracing"),
}
err := ots.parseSettingsOpentelemetry()
err := ots.parseSettings()
return ots, err
}
@ -153,10 +178,6 @@ func TraceIDFromContext(c context.Context, requireSampled bool) string {
// SpanFromContext returns the Span previously associated with ctx, or nil, if no such span could be found.
// It is the equivalent of opentracing.SpanFromContext and trace.SpanFromContext.
func SpanFromContext(ctx context.Context) Span {
// Look for both opentracing and opentelemetry spans.
if span := opentracing.SpanFromContext(ctx); span != nil {
return OpentracingSpan{span: span}
}
if span := trace.SpanFromContext(ctx); span != nil {
return OpentelemetrySpan{span: span}
}
@ -173,219 +194,345 @@ func ContextWithSpan(ctx context.Context, span Span) context.Context {
return ctx
}
type Opentracing struct {
enabled bool
address string
customTags map[string]string
samplerType string
samplerParam float64
samplingServerURL string
log log.Logger
closer io.Closer
zipkinPropagation bool
disableSharedZipkinSpans bool
Cfg *setting.Cfg
type noopTracerProvider struct {
trace.TracerProvider
}
type OpentracingSpan struct {
span opentracing.Span
}
func (ts *Opentracing) parseSettings() error {
var section, err = ts.Cfg.Raw.GetSection("tracing.jaeger")
if err != nil {
return err
}
ts.address = section.Key("address").MustString("")
if ts.address == "" {
host := os.Getenv(envJaegerAgentHost)
port := os.Getenv(envJaegerAgentPort)
if host != "" || port != "" {
ts.address = fmt.Sprintf("%s:%s", host, port)
}
}
if ts.address != "" {
ts.enabled = true
}
ts.customTags = splitTagSettings(section.Key("always_included_tag").MustString(""))
ts.samplerType = section.Key("sampler_type").MustString("")
ts.samplerParam = section.Key("sampler_param").MustFloat64(1)
ts.zipkinPropagation = section.Key("zipkin_propagation").MustBool(false)
ts.disableSharedZipkinSpans = section.Key("disable_shared_zipkin_spans").MustBool(false)
ts.samplingServerURL = section.Key("sampling_server_url").MustString("")
func (noopTracerProvider) Shutdown(ctx context.Context) error {
return nil
}
func (ts *Opentracing) initJaegerCfg() (jaegercfg.Configuration, error) {
cfg := jaegercfg.Configuration{
ServiceName: "grafana",
Disabled: !ts.enabled,
Sampler: &jaegercfg.SamplerConfig{
Type: ts.samplerType,
Param: ts.samplerParam,
SamplingServerURL: ts.samplingServerURL,
},
Reporter: &jaegercfg.ReporterConfig{
LogSpans: false,
LocalAgentHostPort: ts.address,
},
}
_, err := cfg.FromEnv()
if err != nil {
return cfg, err
}
return cfg, nil
}
func (ts *Opentracing) initJaegerGlobalTracer() error {
cfg, err := ts.initJaegerCfg()
if err != nil {
return err
}
jLogger := &jaegerLogWrapper{logger: log.New("jaeger")}
options := []jaegercfg.Option{}
options = append(options, jaegercfg.Logger(jLogger))
for tag, value := range ts.customTags {
options = append(options, jaegercfg.Tag(tag, value))
}
if ts.zipkinPropagation {
zipkinPropagator := zipkin.NewZipkinB3HTTPHeaderPropagator()
options = append(options,
jaegercfg.Injector(opentracing.HTTPHeaders, zipkinPropagator),
jaegercfg.Extractor(opentracing.HTTPHeaders, zipkinPropagator),
)
if !ts.disableSharedZipkinSpans {
options = append(options, jaegercfg.ZipkinSharedRPCSpan(true))
func (ots *Opentelemetry) parseSettings() error {
legacyAddress, legacyTags := "", ""
if section, err := ots.Cfg.Raw.GetSection("tracing.jaeger"); err == nil {
legacyAddress = section.Key("address").MustString("")
if legacyAddress == "" {
host, port := os.Getenv(envJaegerAgentHost), os.Getenv(envJaegerAgentPort)
if host != "" || port != "" {
legacyAddress = fmt.Sprintf("%s:%s", host, port)
}
}
legacyTags = section.Key("always_included_tag").MustString("")
ots.sampler = section.Key("sampler_type").MustString("")
ots.samplerParam = section.Key("sampler_param").MustFloat64(1)
ots.samplerRemoteURL = section.Key("sampling_server_url").MustString("")
}
tracer, closer, err := cfg.NewTracer(options...)
section := ots.Cfg.Raw.Section("tracing.opentelemetry")
var err error
// we default to legacy tag set (attributes) if the new config format is absent
ots.customAttribs, err = splitCustomAttribs(section.Key("custom_attributes").MustString(legacyTags))
if err != nil {
return err
}
opentracing.SetGlobalTracer(tracer)
section = ots.Cfg.Raw.Section("tracing.opentelemetry.jaeger")
ots.enabled = noopExporter
ts.closer = closer
// we default to legacy Jaeger agent address if the new config value is empty
ots.Address = section.Key("address").MustString(legacyAddress)
ots.Propagation = section.Key("propagation").MustString("")
if ots.Address != "" {
ots.enabled = jaegerExporter
return nil
}
section = ots.Cfg.Raw.Section("tracing.opentelemetry.otlp")
ots.Address = section.Key("address").MustString("")
if ots.Address != "" {
ots.enabled = otlpExporter
}
ots.Propagation = section.Key("propagation").MustString("")
return nil
}
func (ts *Opentracing) Run(ctx context.Context) error {
func (ots *Opentelemetry) OTelExporterEnabled() bool {
return ots.enabled == otlpExporter
}
func splitCustomAttribs(s string) ([]attribute.KeyValue, error) {
res := []attribute.KeyValue{}
attribs := strings.Split(s, ",")
for _, v := range attribs {
parts := strings.SplitN(v, ":", 2)
if len(parts) > 1 {
res = append(res, attribute.String(parts[0], parts[1]))
} else if v != "" {
return nil, fmt.Errorf("custom attribute malformed - must be in 'key:value' form: %q", v)
}
}
return res, nil
}
func (ots *Opentelemetry) initJaegerTracerProvider() (*tracesdk.TracerProvider, error) {
var ep jaeger.EndpointOption
// Create the Jaeger exporter: address can be either agent address (host:port) or collector URL
if host, port, err := net.SplitHostPort(ots.Address); err == nil {
ep = jaeger.WithAgentEndpoint(jaeger.WithAgentHost(host), jaeger.WithAgentPort(port))
} else {
ep = jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(ots.Address))
}
exp, err := jaeger.New(ep)
if err != nil {
return nil, err
}
res, err := resource.New(
context.Background(),
resource.WithAttributes(
// TODO: why are these attributes different from ones added to the
// OTLP provider?
semconv.ServiceNameKey.String("grafana"),
attribute.String("environment", "production"),
),
resource.WithAttributes(ots.customAttribs...),
)
if err != nil {
return nil, err
}
sampler := tracesdk.AlwaysSample()
if ots.sampler == "const" || ots.sampler == "probabilistic" {
sampler = tracesdk.TraceIDRatioBased(ots.samplerParam)
} else if ots.sampler == "rateLimiting" {
sampler = newRateLimiter(ots.samplerParam)
} else if ots.sampler == "remote" {
sampler = jaegerremote.New("grafana", jaegerremote.WithSamplingServerURL(ots.samplerRemoteURL),
jaegerremote.WithInitialSampler(tracesdk.TraceIDRatioBased(ots.samplerParam)))
} else if ots.sampler != "" {
return nil, fmt.Errorf("invalid sampler type: %s", ots.sampler)
}
tp := tracesdk.NewTracerProvider(
tracesdk.WithBatcher(exp),
tracesdk.WithResource(res),
tracesdk.WithSampler(sampler),
)
return tp, nil
}
func (ots *Opentelemetry) initOTLPTracerProvider() (*tracesdk.TracerProvider, error) {
client := otlptracegrpc.NewClient(otlptracegrpc.WithEndpoint(ots.Address), otlptracegrpc.WithInsecure())
exp, err := otlptrace.New(context.Background(), client)
if err != nil {
return nil, err
}
return initTracerProvider(exp, ots.customAttribs...)
}
func initTracerProvider(exp tracesdk.SpanExporter, customAttribs ...attribute.KeyValue) (*tracesdk.TracerProvider, error) {
res, err := resource.New(
context.Background(),
resource.WithAttributes(
semconv.ServiceNameKey.String("grafana"),
semconv.ServiceVersionKey.String(version.Version),
),
resource.WithAttributes(customAttribs...),
resource.WithProcessRuntimeDescription(),
resource.WithTelemetrySDK(),
)
if err != nil {
return nil, err
}
tp := tracesdk.NewTracerProvider(
tracesdk.WithBatcher(exp),
tracesdk.WithSampler(tracesdk.ParentBased(
tracesdk.AlwaysSample(),
)),
tracesdk.WithResource(res),
)
return tp, nil
}
func (ots *Opentelemetry) initNoopTracerProvider() (tracerProvider, error) {
return &noopTracerProvider{TracerProvider: trace.NewNoopTracerProvider()}, nil
}
func (ots *Opentelemetry) initOpentelemetryTracer() error {
var tp tracerProvider
var err error
switch ots.enabled {
case jaegerExporter:
tp, err = ots.initJaegerTracerProvider()
if err != nil {
return err
}
case otlpExporter:
tp, err = ots.initOTLPTracerProvider()
if err != nil {
return err
}
default:
tp, err = ots.initNoopTracerProvider()
if err != nil {
return err
}
}
// Register our TracerProvider as the global so any imported
// instrumentation in the future will default to using it
// only if tracing is enabled
if ots.enabled != "" {
otel.SetTracerProvider(tp)
}
propagators := []propagation.TextMapPropagator{}
for _, p := range strings.Split(ots.Propagation, ",") {
switch p {
case w3cPropagator:
propagators = append(propagators, propagation.TraceContext{}, propagation.Baggage{})
case jaegerPropagator:
propagators = append(propagators, jaegerpropagator.Jaeger{})
case "":
default:
return fmt.Errorf("unsupported OpenTelemetry propagator: %q", p)
}
}
switch len(propagators) {
case 0:
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{}, propagation.Baggage{},
))
case 1:
otel.SetTextMapPropagator(propagators[0])
default:
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagators...))
}
if ots.tracerProvider == nil {
ots.tracerProvider = tp
}
ots.tracer = otel.GetTracerProvider().Tracer("component-main")
return nil
}
func (ots *Opentelemetry) Run(ctx context.Context) error {
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
err = level.Error(ots.log).Log("msg", "OpenTelemetry handler returned an error", "err", err)
if err != nil {
ots.log.Error("OpenTelemetry log returning error", err)
}
}))
<-ctx.Done()
if ts.closer != nil {
ts.log.Info("Closing tracing")
return ts.closer.Close()
ots.log.Info("Closing tracing")
if ots.tracerProvider == nil {
return nil
}
ctxShutdown, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
if err := ots.tracerProvider.Shutdown(ctxShutdown); err != nil {
return err
}
return nil
}
func (ts *Opentracing) Start(ctx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, Span) {
span, ctx := opentracing.StartSpanFromContext(ctx, spanName)
opentracingSpan := OpentracingSpan{span: span}
if sctx, ok := span.Context().(jaeger.SpanContext); ok {
ctx = context.WithValue(ctx, traceKey{}, traceValue{sctx.TraceID().String(), sctx.IsSampled()})
func (ots *Opentelemetry) Start(ctx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, Span) {
ctx, span := ots.tracer.Start(ctx, spanName, opts...)
opentelemetrySpan := OpentelemetrySpan{
span: span,
}
return ctx, opentracingSpan
}
func (ts *Opentracing) Inject(ctx context.Context, header http.Header, span Span) {
opentracingSpan, ok := span.(OpentracingSpan)
if !ok {
logger.Error("Failed to cast opentracing span")
if traceID := span.SpanContext().TraceID(); traceID.IsValid() {
ctx = context.WithValue(ctx, traceKey{}, traceValue{traceID.String(), span.SpanContext().IsSampled()})
}
err := opentracing.GlobalTracer().Inject(
opentracingSpan.span.Context(),
opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(header))
if err != nil {
logger.Error("Failed to inject span context instance", "err", err)
}
return ctx, opentelemetrySpan
}
func (s OpentracingSpan) End() {
s.span.Finish()
func (ots *Opentelemetry) Inject(ctx context.Context, header http.Header, _ Span) {
otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(header))
}
func (s OpentracingSpan) SetAttributes(key string, value interface{}, kv attribute.KeyValue) {
s.span.SetTag(key, value)
func (s OpentelemetrySpan) End() {
s.span.End()
}
func (s OpentracingSpan) SetName(name string) {
s.span.SetOperationName(name)
func (s OpentelemetrySpan) SetAttributes(key string, value interface{}, kv attribute.KeyValue) {
s.span.SetAttributes(kv)
}
func (s OpentracingSpan) SetStatus(code codes.Code, description string) {
if code == codes.Error {
ext.Error.Set(s.span, true)
}
func (s OpentelemetrySpan) SetName(name string) {
s.span.SetName(name)
}
func (s OpentracingSpan) RecordError(err error, options ...trace.EventOption) {
ext.Error.Set(s.span, true)
func (s OpentelemetrySpan) SetStatus(code codes.Code, description string) {
s.span.SetStatus(code, description)
}
func (s OpentracingSpan) AddEvents(keys []string, values []EventValue) {
fields := []ol.Field{}
func (s OpentelemetrySpan) RecordError(err error, options ...trace.EventOption) {
s.span.RecordError(err, options...)
}
func (s OpentelemetrySpan) AddEvents(keys []string, values []EventValue) {
for i, v := range values {
if v.Str != "" {
field := ol.String(keys[i], v.Str)
fields = append(fields, field)
s.span.AddEvent(keys[i], trace.WithAttributes(attribute.Key(keys[i]).String(v.Str)))
}
if v.Num != 0 {
field := ol.Int64(keys[i], v.Num)
fields = append(fields, field)
s.span.AddEvent(keys[i], trace.WithAttributes(attribute.Key(keys[i]).Int64(v.Num)))
}
}
s.span.LogFields(fields...)
}
func (s OpentracingSpan) contextWithSpan(ctx context.Context) context.Context {
func (s OpentelemetrySpan) contextWithSpan(ctx context.Context) context.Context {
if s.span != nil {
ctx = opentracing.ContextWithSpan(ctx, s.span)
ctx = trace.ContextWithSpan(ctx, s.span)
// Grafana also manages its own separate traceID in the context in addition to what opentracing handles.
// It's derived from the span. Ensure that we propagate this too.
if sctx, ok := s.span.Context().(jaeger.SpanContext); ok {
ctx = context.WithValue(ctx, traceKey{}, traceValue{sctx.TraceID().String(), sctx.IsSampled()})
if traceID := s.span.SpanContext().TraceID(); traceID.IsValid() {
ctx = context.WithValue(ctx, traceKey{}, traceValue{traceID.String(), s.span.SpanContext().IsSampled()})
}
}
return ctx
}
func splitTagSettings(input string) map[string]string {
res := map[string]string{}
type rateLimiter struct {
sync.Mutex
rps float64
balance float64
maxBalance float64
lastTick time.Time
tags := strings.Split(input, ",")
for _, v := range tags {
kv := strings.Split(v, ":")
if len(kv) > 1 {
res[kv[0]] = kv[1]
}
now func() time.Time
}
func newRateLimiter(rps float64) *rateLimiter {
return &rateLimiter{
rps: rps,
balance: math.Max(rps, 1),
maxBalance: math.Max(rps, 1),
lastTick: time.Now(),
now: time.Now,
}
return res
}
type jaegerLogWrapper struct {
logger log.Logger
func (rl *rateLimiter) ShouldSample(p tracesdk.SamplingParameters) tracesdk.SamplingResult {
rl.Lock()
defer rl.Unlock()
psc := trace.SpanContextFromContext(p.ParentContext)
if rl.balance >= 1 {
rl.balance -= 1
return tracesdk.SamplingResult{Decision: tracesdk.RecordAndSample, Tracestate: psc.TraceState()}
}
currentTime := rl.now()
elapsedTime := currentTime.Sub(rl.lastTick).Seconds()
rl.lastTick = currentTime
rl.balance = math.Min(rl.maxBalance, rl.balance+elapsedTime*rl.rps)
if rl.balance >= 1 {
rl.balance -= 1
return tracesdk.SamplingResult{Decision: tracesdk.RecordAndSample, Tracestate: psc.TraceState()}
}
return tracesdk.SamplingResult{Decision: tracesdk.Drop, Tracestate: psc.TraceState()}
}
func (jlw *jaegerLogWrapper) Error(msg string) {
jlw.logger.Error(msg)
}
func (jlw *jaegerLogWrapper) Infof(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
jlw.logger.Info(msg)
}
func (rl *rateLimiter) Description() string { return "RateLimitingSampler" }

View File

@ -5,137 +5,163 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
"github.com/grafana/grafana/pkg/setting"
)
func TestGroupSplit(t *testing.T) {
// TODO(zserge) Add proper tests for opentelemetry
func TestSplitCustomAttribs(t *testing.T) {
tests := []struct {
input string
expected map[string]string
expected []attribute.KeyValue
}{
{
input: "tag1:value1,tag2:value2",
expected: map[string]string{
"tag1": "value1",
"tag2": "value2",
input: "key1:value:1",
expected: []attribute.KeyValue{attribute.String("key1", "value:1")},
},
{
input: "key1:value1,key2:value2",
expected: []attribute.KeyValue{
attribute.String("key1", "value1"),
attribute.String("key2", "value2"),
},
},
{
input: "",
expected: map[string]string{},
},
{
input: "tag1",
expected: map[string]string{},
expected: []attribute.KeyValue{},
},
}
for _, test := range tests {
tags := splitTagSettings(test.input)
for k, v := range test.expected {
value, exists := tags[k]
assert.Truef(t, exists, "Tag %q not found for input %q", k, test.input)
assert.Equalf(t, v, value, "Tag %q has wrong value for input %q", k, test.input)
}
attribs, err := splitCustomAttribs(test.input)
assert.NoError(t, err)
assert.EqualValues(t, test.expected, attribs)
}
}
func TestInitJaegerCfg_Default(t *testing.T) {
ts := &Opentracing{}
cfg, err := ts.initJaegerCfg()
require.NoError(t, err)
func TestSplitCustomAttribs_Malformed(t *testing.T) {
tests := []struct {
input string
}{
{input: "key1=value1"},
{input: "key1"},
}
assert.True(t, cfg.Disabled)
for _, test := range tests {
_, err := splitCustomAttribs(test.input)
assert.Error(t, err)
}
}
func TestInitJaegerCfg_Enabled(t *testing.T) {
ts := &Opentracing{enabled: true}
cfg, err := ts.initJaegerCfg()
require.NoError(t, err)
assert.False(t, cfg.Disabled)
assert.Equal(t, "localhost:6831", cfg.Reporter.LocalAgentHostPort)
}
func TestInitJaegerCfg_DisabledViaEnv(t *testing.T) {
err := os.Setenv("JAEGER_DISABLED", "true")
require.NoError(t, err)
defer func() {
err := os.Unsetenv("JAEGER_DISABLED")
require.NoError(t, err)
}()
ts := &Opentracing{enabled: true}
cfg, err := ts.initJaegerCfg()
require.NoError(t, err)
assert.True(t, cfg.Disabled)
}
func TestInitJaegerCfg_EnabledViaEnv(t *testing.T) {
err := os.Setenv("JAEGER_DISABLED", "false")
require.NoError(t, err)
defer func() {
err := os.Unsetenv("JAEGER_DISABLED")
require.NoError(t, err)
}()
ts := &Opentracing{enabled: false}
cfg, err := ts.initJaegerCfg()
require.NoError(t, err)
assert.False(t, cfg.Disabled)
}
func TestInitJaegerCfg_InvalidEnvVar(t *testing.T) {
err := os.Setenv("JAEGER_DISABLED", "totallybogus")
require.NoError(t, err)
defer func() {
err := os.Unsetenv("JAEGER_DISABLED")
require.NoError(t, err)
}()
ts := &Opentracing{}
_, err = ts.initJaegerCfg()
require.EqualError(t, err, "cannot parse env var JAEGER_DISABLED=totallybogus: strconv.ParseBool: parsing \"totallybogus\": invalid syntax")
}
func TestInitJaegerCfg_EnabledViaHost(t *testing.T) {
require.NoError(t, os.Setenv("JAEGER_AGENT_HOST", "example.com"))
defer func() {
require.NoError(t, os.Unsetenv("JAEGER_AGENT_HOST"))
}()
cfg := setting.NewCfg()
ts := &Opentracing{Cfg: cfg}
_, err := ts.Cfg.Raw.NewSection("tracing.jaeger")
require.NoError(t, err)
require.NoError(t, ts.parseSettings())
jaegerCfg, err := ts.initJaegerCfg()
require.NoError(t, err)
assert.False(t, jaegerCfg.Disabled)
assert.Equal(t, "example.com:6831", jaegerCfg.Reporter.LocalAgentHostPort)
}
func TestInitJaegerCfg_EnabledViaHostPort(t *testing.T) {
require.NoError(t, os.Setenv("JAEGER_AGENT_HOST", "example.com"))
require.NoError(t, os.Setenv("JAEGER_AGENT_PORT", "12345"))
defer func() {
require.NoError(t, os.Unsetenv("JAEGER_AGENT_HOST"))
require.NoError(t, os.Unsetenv("JAEGER_AGENT_PORT"))
}()
cfg := setting.NewCfg()
ts := &Opentracing{Cfg: cfg}
_, err := ts.Cfg.Raw.NewSection("tracing.jaeger")
require.NoError(t, err)
require.NoError(t, ts.parseSettings())
jaegerCfg, err := ts.initJaegerCfg()
require.NoError(t, err)
assert.False(t, jaegerCfg.Disabled)
assert.Equal(t, "example.com:12345", jaegerCfg.Reporter.LocalAgentHostPort)
func TestTracingConfig(t *testing.T) {
for _, test := range []struct {
Name string
Cfg string
Env map[string]string
ExpectedExporter string
ExpectedAddress string
ExpectedPropagator string
ExpectedAttrs []attribute.KeyValue
}{
{
Name: "default config uses noop exporter",
Cfg: "",
ExpectedExporter: noopExporter,
ExpectedAttrs: []attribute.KeyValue{},
},
{
Name: "custom attributes are parsed",
Cfg: `
[tracing.opentelemetry]
custom_attributes = key1:value1,key2:value2
`,
ExpectedExporter: noopExporter,
ExpectedAttrs: []attribute.KeyValue{attribute.String("key1", "value1"), attribute.String("key2", "value2")},
},
{
Name: "jaeger address is parsed",
Cfg: `
[tracing.opentelemetry.jaeger]
address = jaeger.example.com:6831
`,
ExpectedExporter: jaegerExporter,
ExpectedAddress: "jaeger.example.com:6831",
ExpectedAttrs: []attribute.KeyValue{},
},
{
Name: "OTLP address is parsed",
Cfg: `
[tracing.opentelemetry.otlp]
address = otlp.example.com:4317
`,
ExpectedExporter: otlpExporter,
ExpectedAddress: "otlp.example.com:4317",
ExpectedAttrs: []attribute.KeyValue{},
},
{
Name: "legacy config format is supported",
Cfg: `
[tracing.jaeger]
address = jaeger.example.com:6831
`,
ExpectedExporter: jaegerExporter,
ExpectedAddress: "jaeger.example.com:6831",
ExpectedAttrs: []attribute.KeyValue{},
},
{
Name: "legacy env variables are supproted",
Cfg: `[tracing.jaeger]`,
Env: map[string]string{
"JAEGER_AGENT_HOST": "example.com",
"JAEGER_AGENT_PORT": "12345",
},
ExpectedExporter: jaegerExporter,
ExpectedAddress: "example.com:12345",
ExpectedAttrs: []attribute.KeyValue{},
},
{
Name: "opentelemetry config format is prioritised over legacy jaeger",
Cfg: `
[tracing.jaeger]
address = foo.com:6831
custom_tags = a:b
[tracing.opentelemetry]
custom_attributes = c:d
[tracing.opentelemetry.jaeger]
address = bar.com:6831
`,
ExpectedExporter: jaegerExporter,
ExpectedAddress: "bar.com:6831",
ExpectedAttrs: []attribute.KeyValue{attribute.String("c", "d")},
},
} {
t.Run(test.Name, func(t *testing.T) {
// export envioronment variables
if test.Env != nil {
for k, v := range test.Env {
assert.NoError(t, os.Setenv(k, v))
}
defer func() {
for k := range test.Env {
assert.NoError(t, os.Unsetenv(k))
}
}()
}
// parse config sections
cfg := setting.NewCfg()
err := cfg.Raw.Append([]byte(test.Cfg))
assert.NoError(t, err)
// create tracer
tracer, err := ProvideService(cfg)
assert.NoError(t, err)
// make sure tracker is properly configured
otel := tracer.(*Opentelemetry)
assert.Equal(t, test.ExpectedExporter, otel.enabled)
assert.Equal(t, test.ExpectedAddress, otel.Address)
assert.Equal(t, test.ExpectedPropagator, otel.Propagation)
assert.Equal(t, test.ExpectedAttrs, otel.customAttribs)
})
}
}

View File

@ -11,7 +11,7 @@ import (
// newTracingCfg creates a plugins tracing configuration based on the provided Grafana tracing config.
// If OpenTelemetry (OTLP) is disabled, a zero-value OpenTelemetryCfg is returned.
func newTracingCfg(grafanaCfg *setting.Cfg) (pCfg.Tracing, error) {
ots, err := tracing.ParseSettingsOpentelemetry(grafanaCfg)
ots, err := tracing.ParseSettings(grafanaCfg)
if err != nil {
return pCfg.Tracing{}, fmt.Errorf("parse settings: %w", err)
}