diff --git a/pkg/storage/unified/sql/backend.go b/pkg/storage/unified/sql/backend.go index a5777e26189..499931fce80 100644 --- a/pkg/storage/unified/sql/backend.go +++ b/pkg/storage/unified/sql/backend.go @@ -10,6 +10,8 @@ import ( "time" "github.com/google/uuid" + "go.opentelemetry.io/otel/attribute" + semconv "go.opentelemetry.io/otel/semconv/v1.26.0" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace/noop" "google.golang.org/protobuf/proto" @@ -164,7 +166,7 @@ func (b *backend) create(ctx context.Context, event resource.WriteEvent) (int64, // 3. TODO: Rebuild the whole folder tree structure if we're creating a folder // 4. Atomically increment resource version for this kind - rv, err := resourceVersionAtomicInc(ctx, tx, b.dialect, event.Key) + rv, err := b.resourceVersionAtomicInc(ctx, tx, event.Key) if err != nil { return fmt.Errorf("increment resource version: %w", err) } @@ -222,7 +224,7 @@ func (b *backend) update(ctx context.Context, event resource.WriteEvent) (int64, // 3. TODO: Rebuild the whole folder tree structure if we're creating a folder // 4. Atomically increment resource version for this kind - rv, err := resourceVersionAtomicInc(ctx, tx, b.dialect, event.Key) + rv, err := b.resourceVersionAtomicInc(ctx, tx, event.Key) if err != nil { return fmt.Errorf("increment resource version: %w", err) } @@ -282,7 +284,7 @@ func (b *backend) delete(ctx context.Context, event resource.WriteEvent) (int64, // 3. TODO: Rebuild the whole folder tree structure if we're creating a folder // 4. Atomically increment resource version for this kind - rv, err := resourceVersionAtomicInc(ctx, tx, b.dialect, event.Key) + rv, err := b.resourceVersionAtomicInc(ctx, tx, event.Key) if err != nil { return fmt.Errorf("increment resource version: %w", err) } @@ -661,10 +663,18 @@ func (b *backend) poll(ctx context.Context, grp string, res string, since int64, // TODO: Ideally we should attempt to update the RV in the resource and resource_history tables // in a single roundtrip. This would reduce the latency of the operation, and also increase the // throughput of the system. This is a good candidate for a future optimization. -func resourceVersionAtomicInc(ctx context.Context, x db.ContextExecer, d sqltemplate.Dialect, key *resource.ResourceKey) (newVersion int64, err error) { +func (b *backend) resourceVersionAtomicInc(ctx context.Context, x db.ContextExecer, key *resource.ResourceKey) (newVersion int64, err error) { + ctx, span := b.tracer.Start(ctx, "resourceVersionAtomicInc", trace.WithAttributes( + semconv.K8SNamespaceName(key.Namespace), + // TODO: the following attributes could use some standardization. + attribute.String("k8s.resource.group", key.Group), + attribute.String("k8s.resource.type", key.Resource), + )) + defer span.End() + // 1. Lock to row and prevent concurrent updates until the transaction is committed. res, err := dbutil.QueryRow(ctx, x, sqlResourceVersionGet, sqlResourceVersionGetRequest{ - SQLTemplate: sqltemplate.New(d), + SQLTemplate: sqltemplate.New(b.dialect), Group: key.Group, Resource: key.Resource, @@ -674,14 +684,14 @@ func resourceVersionAtomicInc(ctx context.Context, x db.ContextExecer, d sqltemp if errors.Is(err, sql.ErrNoRows) { // if there wasn't a row associated with the given resource, then we create it. if _, err = dbutil.Exec(ctx, x, sqlResourceVersionInsert, sqlResourceVersionUpsertRequest{ - SQLTemplate: sqltemplate.New(d), + SQLTemplate: sqltemplate.New(b.dialect), Group: key.Group, Resource: key.Resource, }); err != nil { return 0, fmt.Errorf("insert into resource_version: %w", err) } res, err = dbutil.QueryRow(ctx, x, sqlResourceVersionGet, sqlResourceVersionGetRequest{ - SQLTemplate: sqltemplate.New(d), + SQLTemplate: sqltemplate.New(b.dialect), Group: key.Group, Resource: key.Resource, Response: new(resourceVersionResponse), @@ -702,7 +712,7 @@ func resourceVersionAtomicInc(ctx context.Context, x db.ContextExecer, d sqltemp nextRV := max(res.CurrentEpoch, res.ResourceVersion+1) _, err = dbutil.Exec(ctx, x, sqlResourceVersionUpdate, sqlResourceVersionUpsertRequest{ - SQLTemplate: sqltemplate.New(d), + SQLTemplate: sqltemplate.New(b.dialect), Group: key.Group, Resource: key.Resource, ResourceVersion: nextRV, diff --git a/pkg/storage/unified/sql/backend_test.go b/pkg/storage/unified/sql/backend_test.go index 0d96d73896d..367c5b6941a 100644 --- a/pkg/storage/unified/sql/backend_test.go +++ b/pkg/storage/unified/sql/backend_test.go @@ -11,7 +11,6 @@ import ( "github.com/grafana/grafana/pkg/storage/unified/resource" "github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl" - "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" "github.com/grafana/grafana/pkg/storage/unified/sql/test" "github.com/grafana/grafana/pkg/util/testutil" ) @@ -217,15 +216,13 @@ func expectUnsuccessfulResourceVersionAtomicInc(t *testing.T, b testBackend, err func TestResourceVersionAtomicInc(t *testing.T) { t.Parallel() - dialect := sqltemplate.MySQL - t.Run("happy path - insert new row", func(t *testing.T) { t.Parallel() b, ctx := setupBackendTest(t) expectSuccessfulResourceVersionAtomicInc(t, b) // returns RV=1 - v, err := resourceVersionAtomicInc(ctx, b.DB, dialect, resKey) + v, err := b.resourceVersionAtomicInc(ctx, b.DB, resKey) require.NoError(t, err) require.Equal(t, int64(23456), v) }) @@ -238,7 +235,7 @@ func TestResourceVersionAtomicInc(t *testing.T) { b.QueryWithResult("select resource_version for update", 2, Rows{{12345, 23456}}) b.ExecWithResult("update resource_version", 0, 1) - v, err := resourceVersionAtomicInc(ctx, b.DB, dialect, resKey) + v, err := b.resourceVersionAtomicInc(ctx, b.DB, resKey) require.NoError(t, err) require.Equal(t, int64(23456), v) }) @@ -248,7 +245,7 @@ func TestResourceVersionAtomicInc(t *testing.T) { b, ctx := setupBackendTest(t) b.QueryWithErr("select resource_version for update", errTest) - v, err := resourceVersionAtomicInc(ctx, b.DB, dialect, resKey) + v, err := b.resourceVersionAtomicInc(ctx, b.DB, resKey) require.Zero(t, v) require.Error(t, err) require.ErrorContains(t, err, "lock the resource version") @@ -262,7 +259,7 @@ func TestResourceVersionAtomicInc(t *testing.T) { b.QueryWithResult("select resource_version", 0, Rows{}) b.ExecWithErr("insert resource_version", errTest) - v, err := resourceVersionAtomicInc(ctx, b.DB, dialect, resKey) + v, err := b.resourceVersionAtomicInc(ctx, b.DB, resKey) require.Zero(t, v) require.Error(t, err) require.ErrorContains(t, err, "insert into resource_version") @@ -275,7 +272,7 @@ func TestResourceVersionAtomicInc(t *testing.T) { b.QueryWithResult("select resource_version for update", 2, Rows{{12345, 23456}}) b.ExecWithErr("update resource_version", errTest) - v, err := resourceVersionAtomicInc(ctx, b.DB, dialect, resKey) + v, err := b.resourceVersionAtomicInc(ctx, b.DB, resKey) require.Zero(t, v) require.Error(t, err) require.ErrorContains(t, err, "increase resource version") diff --git a/pkg/storage/unified/sql/db/dbimpl/dbimpl.go b/pkg/storage/unified/sql/db/dbimpl/dbimpl.go index 7c171e3a1ca..13e6b7fbe42 100644 --- a/pkg/storage/unified/sql/db/dbimpl/dbimpl.go +++ b/pkg/storage/unified/sql/db/dbimpl/dbimpl.go @@ -7,9 +7,9 @@ import ( "sync" "github.com/dlmiddlecote/sqlstats" - "github.com/grafana/grafana/pkg/infra/tracing" "github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/trace/noop" "xorm.io/xorm" infraDB "github.com/grafana/grafana/pkg/infra/db" @@ -17,6 +17,7 @@ import ( "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/storage/unified/sql/db" "github.com/grafana/grafana/pkg/storage/unified/sql/db/migrations" + "github.com/grafana/grafana/pkg/storage/unified/sql/db/otel" ) const ( @@ -33,7 +34,10 @@ var errGrafanaDBInstrumentedNotSupported = errors.New("the Resource API is " + grafanaDBInstrumentQueriesKey + "` is enabled in [database], and that" + " setup is currently unsupported. Please, consider disabling that flag") -func ProvideResourceDB(grafanaDB infraDB.DB, cfg *setting.Cfg, tracer tracing.Tracer) (db.DBProvider, error) { +func ProvideResourceDB(grafanaDB infraDB.DB, cfg *setting.Cfg, tracer trace.Tracer) (db.DBProvider, error) { + if tracer == nil { + tracer = noop.NewTracerProvider().Tracer("test-tracer") + } p, err := newResourceDBProvider(grafanaDB, cfg, tracer) if err != nil { return nil, fmt.Errorf("provide Resource DB: %w", err) @@ -148,6 +152,7 @@ func (p *resourceDBProvider) init(ctx context.Context) (db.DB, error) { } d := NewDB(p.engine.DB().DB, p.engine.Dialect().DriverName()) + d = otel.NewInstrumentedDB(d, p.tracer) return d, nil } diff --git a/pkg/storage/unified/sql/db/dbimpl/regression_incident_2144_test.go b/pkg/storage/unified/sql/db/dbimpl/regression_incident_2144_test.go index ad29caf8cd2..38b2f8dc89e 100644 --- a/pkg/storage/unified/sql/db/dbimpl/regression_incident_2144_test.go +++ b/pkg/storage/unified/sql/db/dbimpl/regression_incident_2144_test.go @@ -75,7 +75,7 @@ func TestReproIncident2144UsingGrafanaDB(t *testing.T) { cfg := newCfgFromIniMap(t, cfgMap) setupDBForGrafana(t, ctx, cfgMap) grafanaDB := newTestInfraDB(t, cfgMap) - resourceDB, err := ProvideResourceDB(grafanaDB, cfg, testGrafanaTracer{}) + resourceDB, err := ProvideResourceDB(grafanaDB, cfg, nil) require.NotNil(t, resourceDB) require.NoError(t, err) }) @@ -105,7 +105,7 @@ func TestReproIncident2144UsingGrafanaDB(t *testing.T) { t.Run("Resource API provides a reasonable error for this case", func(t *testing.T) { t.Parallel() cfg := newCfgFromIniMap(t, cfgMap) - resourceDB, err := ProvideResourceDB(grafanaDB, cfg, testGrafanaTracer{}) + resourceDB, err := ProvideResourceDB(grafanaDB, cfg, nil) require.Nil(t, resourceDB) require.Error(t, err) require.ErrorIs(t, err, errGrafanaDBInstrumentedNotSupported) diff --git a/pkg/storage/unified/sql/db/otel/otel.go b/pkg/storage/unified/sql/db/otel/otel.go new file mode 100644 index 00000000000..2a872f8db54 --- /dev/null +++ b/pkg/storage/unified/sql/db/otel/otel.go @@ -0,0 +1,416 @@ +package otel + +import ( + "context" + "database/sql" + "sync" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + + "github.com/grafana/grafana/pkg/storage/unified/sql/db" +) + +const ( + dbVersionDefaultSQL = `SELECT VERSION()` + dbVersionSQLiteSQL = `SELECT SQLITE_VERSION()` + + dbTracePrefix = "sql.db." + dbTraceExecContext = dbTracePrefix + "exec_context" + dbTraceQueryContext = dbTracePrefix + "query_context" + dbTraceQueryRowContext = dbTracePrefix + "query_row_context" + dbTraceBeginTx = dbTracePrefix + "begin_tx" + dbTraceWithTx = dbTracePrefix + "with_tx" + dbTracePingContext = dbTracePrefix + "ping_context" + dbTraceTx = dbTracePrefix + "transaction" + + txTracePrefix = "sql.db.tx." + txTraceExecContext = txTracePrefix + "exec_context" + txTraceQueryContext = txTracePrefix + "query_context" + txTraceQueryRowContext = txTracePrefix + "query_row_context" + txTraceCommit = txTracePrefix + "commit" + txTraceRollback = txTracePrefix + "rollback" + + attrDriverName = "driver_name" + attrServerVersion = "server_version" + attrIsolationLevel = "isolation_level" + attrReadOnly = "read_only" + attrTxTerminationOp = "termination_op" + + attrValTxTerminationOpBegin = "begin" + attrValTxTerminationOpCommit = "commit" + attrValTxTerminationOpRollback = "rollback" +) + +var ( + spanOptKindClient = trace.WithSpanKind(trace.SpanKindClient) + defaultTxOpts = new(sql.TxOptions) +) + +type ctxKey struct{} + +// SetAttributes returns a context with one-time use tracing attributes that +// will be attached to the next database operation span. The prupose of this is +// to trace specific SQL queries and operations. +// Example: +// +// // the following QueryContext operation will have an extra attribute +// ctx = SetAttributes(attribute.String("query", "get user by id")) +// res, err := myTracedTx.QueryContext(ctx, getUserByIDSQL, userID) +// +// // the following ExecContext operation will have a different extra attribute +// ctx = SetAttributes(attribute.String("query", "disable user")) +// err = myTracedTx.ExecContext(ctx, disableUserSQL, userID) +// +// // the following Commit operation will NOT have any extra attribute +// err = myTracedTx.Commit(ctx) +// +// NOTE: if you want several database operations to share the same set of +// attributes, consider using a span for them instead. +func SetAttributes(ctx context.Context, attrs ...attribute.KeyValue) context.Context { + // we will use a pointer so that we can set that pointer to `emptyAttrsList` + // after the first use. This will prevent the same attributes to be reused, + // even if the same context is used later. Additionally, if a context that + // was already used for this purpose is passed, we don't need to derive a + // new context + val, _ := ctx.Value(ctxKey{}).(*[]attribute.KeyValue) + if val == nil { + ctx = context.WithValue(ctx, ctxKey{}, &attrs) + } else { + *val = attrs + } + + return ctx +} + +func consumeAttributes(ctx context.Context) []attribute.KeyValue { + val, _ := ctx.Value(ctxKey{}).(*[]attribute.KeyValue) + if val == nil { + return nil + } + ret := *val + *val = nil + return ret +} + +type otelDB struct { + db.DB + withTxFunc db.WithTxFunc + tracer trace.Tracer + driverName string + + initOnce sync.Once + dbServerVersion string +} + +// NewInstrumentedDB wraps the given db.DB, instrumenting it to provide +// OTEL-based observability. +func NewInstrumentedDB(d db.DB, tracer trace.Tracer) db.DB { + // TODO: periodically report metrics for stats returned by `db.Stats` + + ret := &otelDB{ + DB: d, + tracer: tracer, + driverName: d.DriverName(), + dbServerVersion: "unknown", + } + ret.withTxFunc = db.NewWithTxFunc(ret.BeginTx) + return ret +} + +func (x *otelDB) init(ctx context.Context) { + x.initOnce.Do(func() { + // there is a chance that the context of the first operation run on the + // `*otelDB` has a very soon deadline, is cancelled by the client while + // we use it, or is even already done. This would cause this operation, + // which is run only once, to fail and we would no longer be able to + // know the database details. Thus, we impose a long timeout that we + // know is very likely to succeed, since getting the server version + // should be nearly a nop. We could instead create a new context with + // timeout from context.Background(), but we opt to derive one from the + // provided context just in case any value in it would be meaningful to + // the downstream implementation in `x.DB`. This is also to future-proof + // this implementation for the case that we might have use another + // decorator wrapping the actual implementation. + const timeout = 5 * time.Second + ctx = context.WithoutCancel(ctx) + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + row := x.DB.QueryRowContext(ctx, dbVersionDefaultSQL) + if row.Err() != nil { + row = x.DB.QueryRowContext(ctx, dbVersionSQLiteSQL) + if row.Err() != nil { + return + } + } + + var dbServerVersion string + if err := row.Scan(&dbServerVersion); err == nil { + x.dbServerVersion = dbServerVersion + } + }) +} + +func (x *otelDB) startSpan(ctx context.Context, name string) (context.Context, trace.Span) { + x.init(ctx) + + attrs := append(consumeAttributes(ctx), + attribute.String(attrDriverName, x.driverName), + attribute.String(attrServerVersion, x.dbServerVersion), + ) + + ctx, span := x.tracer.Start(ctx, name, spanOptKindClient, + trace.WithAttributes(attrs...)) + + return ctx, span +} + +func (x *otelDB) ExecContext(ctx context.Context, query string, args ...any) (db.Result, error) { + ctx, span := x.startSpan(ctx, dbTraceExecContext) + defer span.End() + res, err := x.DB.ExecContext(ctx, query, args...) + setSpanOutcome(span, err) + return res, err +} + +func (x *otelDB) QueryContext(ctx context.Context, query string, args ...any) (db.Rows, error) { + ctx, span := x.startSpan(ctx, dbTraceQueryContext) + defer span.End() + rows, err := x.DB.QueryContext(ctx, query, args...) + setSpanOutcome(span, err) + return rows, err +} + +func (x *otelDB) QueryRowContext(ctx context.Context, query string, args ...any) db.Row { + ctx, span := x.startSpan(ctx, dbTraceQueryRowContext) + defer span.End() + row := x.DB.QueryRowContext(ctx, query, args...) + setSpanOutcome(span, row.Err()) + return row +} + +func (x *otelDB) BeginTx(ctx context.Context, opts *sql.TxOptions) (db.Tx, error) { + parentSpanID := trace.SpanFromContext(ctx).SpanContext().SpanID().String() + + // create a new span that will encompass the whole transaction as a single + // operation. This span will be ended if we fail in BEGIN, or when COMMIT or + // ROLLBACK are called on the returned instrumented db.Tx + txCtx, txSpan := x.startSpan(ctx, dbTraceTx) + setSpanTxOpts(txSpan, opts) + + // make sure to end the transaction span if BeginTx later fails + var err error + defer func() { + if err != nil { + txSpan.SetAttributes(attribute.String(attrTxTerminationOp, + attrValTxTerminationOpBegin)) + setSpanOutcome(txSpan, err) + txSpan.End() + } + }() + + ret := otelTx{ + // we only miss defining `tx` here, which is defined later + span: txSpan, // will only be used during COMMIT/ROLLBACK + tracerStartFunc: func(n string, o ...trace.SpanStartOption) trace.Span { + _, span := x.tracer.Start(txCtx, n, o...) + return span + }, + parentSpanID: parentSpanID, + } + + // start the span for BEGIN as a regular child span of the transaction span + ctx, span := ret.startSpan(ctx, dbTraceBeginTx) + defer span.End() + ret.tx, err = x.DB.BeginTx(ctx, opts) // set ret.tx that we were missing + setSpanOutcome(span, err) + if err != nil { + return nil, err + } + + return ret, nil +} + +func (x *otelDB) WithTx(ctx context.Context, opts *sql.TxOptions, f db.TxFunc) error { + return x.withTxFunc(ctx, opts, f) +} + +func (x *otelDB) PingContext(ctx context.Context) error { + ctx, span := x.startSpan(ctx, dbTracePingContext) + defer span.End() + err := x.DB.PingContext(ctx) + setSpanOutcome(span, err) + return err +} + +type otelTx struct { + tx db.Tx + span trace.Span + tracerStartFunc func(string, ...trace.SpanStartOption) trace.Span + parentSpanID string +} + +// startSpan returns a new span, and optionally a context.Context if +// `optionalCtx` is not nil. It will be nil in the cases of the `Commit` and +// `Rollback` methods, since they do not have a context.Context either. The +// returned span will be a child span of the transaction span started at the +// beginning of the transaction. If `optionalCtx` is not nil, then the returned +// span will also be linked to the span found in that context (if it differs from +// the one used to create the transaction). +// +// Example operation with additional spans with current implementation: +// +// parentSpan +// | +// +------------------+ +// | | +// | v +// | exampleSubSpan +// | | +// | +-------------+ +// | : | +// v : v +// transactionSpan (spanLink) nonTxQuerySpan +// | : +// +------------------+-----------------------------+ +// | | | +// v v v +// beginTxSpan execSpan commitSpan +// +// Note that understanding what is being executed in the same transaction is +// very clear because you just need to follow the vertical solid lines, which +// denote a parent-child relationship. We also know that the `execSpan` was +// originated in `exampleSubSpan` because of the spanLink (with a vertical +// dotted line), but there is no chance that we get confused and think that +// `nonTxQuerySpan` was part of the transaction (it is not related in any +// manner). What we do here is to make any transactional db operation a child of +// the transaction span, which aligns with the OTEL concept of a span being 'a +// unit of work'. In this sense, a transaction is our unit of work, which also +// aligns semantically with the database concept of transaction. This allows us +// to clearly visualize important database semantics within the OTEL framework. +// In OTEL, span links exist to associate one span with one or more spans, +// implying a causal relationship. In our case, we can see that while `execSpan` +// is part of the unit of work called "database transaction", it actually has a +// different cause: the `exampleSubSpan` for `execSpan`. +// +// For comparison, consider the following naïve alternative just passing the +// context around: +// +// parentSpan commitSpan +// | +// +------------------+ +// | | +// v v +// beginTxSpan exampleSubSpan +// | +// +-------------+ +// | | +// v v +// execSpan nonTxQuerySpan +// +// In this case, it is not straightforward to know what operations are part of +// the transaction. When looking at the traces, it will be very easy to be +// confused and think that `nonTxQuerySpan` was part of the transaction. +// Additionally, note that `commitSpan` is a root span, since the `Commit` +// method doesn't receive a context (same as `Rollback`), hence all such +// operations would end up being a scattered root span, making it hard to +// correlate. +func (x otelTx) startSpan(optionalCtx context.Context, name string) (context.Context, trace.Span) { + // minimum number of options for the span + startOpts := make([]trace.SpanStartOption, 0, 2) + startOpts = append(startOpts, spanOptKindClient) + + if optionalCtx != nil { + attrs := consumeAttributes(optionalCtx) + spanLink := trace.LinkFromContext(optionalCtx, attrs...) + if spanLink.SpanContext.SpanID().String() != x.parentSpanID { + // it only makes sense to create a link to the span in `optionalCtx` + // if it's not the same as the one used during BEGIN + startOpts = append(startOpts, trace.WithLinks(spanLink)) + } else if len(attrs) > 0 { + // otherwise, we just add the extra attributes added with + // `SetAttributes`. + startOpts = append(startOpts, trace.WithAttributes(attrs...)) + } + } + + span := x.tracerStartFunc(name, startOpts...) + + if optionalCtx != nil { + // we preserve the original intended context, we only override the span + // with the one we created + optionalCtx = trace.ContextWithSpan(optionalCtx, span) + } + + return optionalCtx, span +} + +func (x otelTx) ExecContext(ctx context.Context, query string, args ...any) (db.Result, error) { + ctx, span := x.startSpan(ctx, txTraceExecContext) + defer span.End() + res, err := x.tx.ExecContext(ctx, query, args...) + setSpanOutcome(span, err) + + return res, err +} + +func (x otelTx) QueryContext(ctx context.Context, query string, args ...any) (db.Rows, error) { + ctx, span := x.startSpan(ctx, txTraceQueryContext) + defer span.End() + rows, err := x.tx.QueryContext(ctx, query, args...) + setSpanOutcome(span, err) + + return rows, err +} + +func (x otelTx) QueryRowContext(ctx context.Context, query string, args ...any) db.Row { + ctx, span := x.startSpan(ctx, txTraceQueryRowContext) + defer span.End() + row := x.tx.QueryRowContext(ctx, query, args...) + setSpanOutcome(span, row.Err()) + return row +} + +func (x otelTx) Commit() error { + x.span.SetAttributes(attribute.String(attrTxTerminationOp, + attrValTxTerminationOpCommit)) + defer x.span.End() + _, span := x.startSpan(nil, txTraceCommit) //nolint:staticcheck + defer span.End() + err := x.tx.Commit() + setSpanOutcome(span, err) + setSpanOutcome(x.span, err) + return err +} + +func (x otelTx) Rollback() error { + x.span.SetAttributes(attribute.String(attrTxTerminationOp, + attrValTxTerminationOpRollback)) + defer x.span.End() + _, span := x.startSpan(nil, txTraceRollback) //nolint:staticcheck + defer span.End() + err := x.tx.Rollback() + setSpanOutcome(span, err) + setSpanOutcome(x.span, err) + return err +} + +func setSpanTxOpts(span trace.Span, opts *sql.TxOptions) { + if opts == nil { + opts = defaultTxOpts + } + span.SetAttributes( + attribute.String(attrIsolationLevel, opts.Isolation.String()), + attribute.Bool(attrReadOnly, opts.ReadOnly), + ) +} + +func setSpanOutcome(span trace.Span, err error) { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "") + } +} diff --git a/pkg/storage/unified/sql/db/otel/otel_test.go b/pkg/storage/unified/sql/db/otel/otel_test.go new file mode 100644 index 00000000000..77d39085f38 --- /dev/null +++ b/pkg/storage/unified/sql/db/otel/otel_test.go @@ -0,0 +1,396 @@ +package otel + +import ( + "context" + "database/sql" + "errors" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + sdktracetest "go.opentelemetry.io/otel/sdk/trace/tracetest" + "go.opentelemetry.io/otel/trace" + + "github.com/grafana/grafana/pkg/storage/unified/sql/db" + dbmocks "github.com/grafana/grafana/pkg/storage/unified/sql/db/mocks" + "github.com/grafana/grafana/pkg/util/testutil" +) + +var errTest = errors.New("because of reasons") + +func TestContextAttributes(t *testing.T) { + t.Parallel() + + // test context. Note that it's perfectly safe to use context.Background() + // and there is no risk of the test blocking at any point because we will + // not use the deadline or signal cancellation features of the context + ctx := context.Background() + + // test attributes + attr1 := attribute.String("the key", "the value") + attr2 := attribute.String("the other key", "the other value") + attr3 := attribute.String("why not", "have another value") + attr4 := attribute.String("it's free", "they say") + + // the subtests are not Parallel because we define this test as a storyline, + // since we are interested in testing state changes in the context + + t.Run("consumeAttributes returns nil if SetAttributes was never called", + func(t *testing.T) { + attrs := consumeAttributes(ctx) + require.Nil(t, attrs) + }) + + t.Run("setting and getting attributes", func(t *testing.T) { + ctx = SetAttributes(ctx, attr1, attr2) + attrs := consumeAttributes(ctx) + require.Len(t, attrs, 2) + require.Equal(t, attr1, attrs[0]) + require.Equal(t, attr2, attrs[1]) + }) + + t.Run("attributes are now cleared", func(t *testing.T) { + attrs := consumeAttributes(ctx) + require.Len(t, attrs, 0) + }) + + t.Run("SetAttributes overwrites previous attributes", func(t *testing.T) { + ctx = SetAttributes(ctx, attr1, attr2) + ctx = SetAttributes(ctx, attr3, attr4) + attrs := consumeAttributes(ctx) + require.Len(t, attrs, 2) + require.Equal(t, attr3, attrs[0]) + require.Equal(t, attr4, attrs[1]) + }) + + t.Run("attributes are now cleared again", func(t *testing.T) { + attrs := consumeAttributes(ctx) + require.Len(t, attrs, 0) + }) +} + +func TestOTelTransactions(t *testing.T) { + t.Parallel() + const ( + rootSpanName = "root of the operation" + internalSpanName = "sub-operation" + ) + ctx := context.Context(testutil.NewDefaultTestContext(t)) + d := newTestInstrumentedDB(ctx, t) + mTx := dbmocks.NewTx(t) + txOpts := &sql.TxOptions{ + Isolation: sql.LevelReadCommitted, + } + + // create the root span + ctx, rootSpan := d.tracer.Start(ctx, rootSpanName) + + // begin + d.mock.EXPECT().BeginTx(mock.Anything, txOpts).Return(mTx, nil) + tx, err := d.BeginTx(ctx, txOpts) + require.NoError(t, err) + + // create a new span for the new operations + ctx, internalSpan := d.tracer.Start(ctx, internalSpanName) + + // execute an operation within the transaction + mTx.EXPECT().ExecContext(mock.Anything, mock.Anything). + Return(dbmocks.NewResult(t), nil) + res, err := tx.ExecContext(ctx, `DELETE FROM users; -- :)`) + require.NoError(t, err) + require.NotNil(t, res) + + // run a query concurrently outside of the transaction, but while the + // transaction is still open + d.mock.EXPECT().QueryContext(mock.Anything, mock.Anything). + Return(dbmocks.NewRows(t), nil) + rows, err := d.QueryContext(ctx, `SELECT * FROM users;`) + require.NoError(t, err) + require.NotNil(t, rows) + + internalSpan.End() + + // commit + mTx.EXPECT().Commit().Return(nil) + err = tx.Commit() + require.NoError(t, err) + + rootSpan.End() + + // assert spans + spanm := newSpanMap(d.tracer.Spans(ctx)) + require.Len(t, spanm, 7) + + // span creation order + strictPartialOrder(t, spanm, + rootSpanName, + dbTraceTx, + dbTraceBeginTx, + internalSpanName, + txTraceExecContext, + dbTraceQueryContext, + txTraceCommit, + ) + + // parent-child hierarchy relationships + root(t, spanm, rootSpanName) + directChildren(t, spanm, rootSpanName, + dbTraceTx, + internalSpanName, + ) + directChildren(t, spanm, dbTraceTx, + dbTraceBeginTx, + txTraceExecContext, + txTraceCommit, + ) + directChildren(t, spanm, internalSpanName, + dbTraceQueryContext, + ) + + // link relationships + links(t, spanm, internalSpanName, + txTraceExecContext, + ) +} + +func TestOTelDB_PingContext(t *testing.T) { + t.Parallel() + + t.Run("happy path - default DB version", func(t *testing.T) { + t.Parallel() + ctx := testutil.NewDefaultTestContext(t) + db := newTestInstrumentedDBWithVersionSQL(ctx, t, dbVersionDefaultSQL) + + db.mock.EXPECT().PingContext(mock.Anything).Return(nil) + + err := db.PingContext(ctx) + require.NoError(t, err) + + spans := db.tracer.Spans(ctx) + require.Len(t, spans, 1) + require.Equal(t, dbTracePingContext, spans[0].Name) + + v := getAttr(spans[0], attrServerVersion) + require.Equal(t, attribute.StringValue(testDefaultDBVersion), v) + }) + + t.Run("happy path - SQLite DB version", func(t *testing.T) { + t.Parallel() + ctx := testutil.NewDefaultTestContext(t) + db := newTestInstrumentedDBWithVersionSQL(ctx, t, dbVersionSQLiteSQL) + + db.mock.EXPECT().PingContext(mock.Anything).Return(nil) + + err := db.PingContext(ctx) + require.NoError(t, err) + + spans := db.tracer.Spans(ctx) + require.Len(t, spans, 1) + require.Equal(t, dbTracePingContext, spans[0].Name) + + v := getAttr(spans[0], attrServerVersion) + require.Equal(t, attribute.StringValue(testSQLiteDBVersion), v) + }) + + t.Run("happy path - unknown DB version", func(t *testing.T) { + t.Parallel() + ctx := testutil.NewDefaultTestContext(t) + db := newTestInstrumentedDBWithVersionSQL(ctx, t, "") + + db.mock.EXPECT().PingContext(mock.Anything).Return(nil) + + err := db.PingContext(ctx) + require.NoError(t, err) + + spans := db.tracer.Spans(ctx) + require.Len(t, spans, 1) + require.Equal(t, dbTracePingContext, spans[0].Name) + + v := getAttr(spans[0], attrServerVersion) + require.Equal(t, attribute.StringValue("unknown"), v) + }) + + t.Run("fail making ping", func(t *testing.T) { + t.Parallel() + ctx := testutil.NewDefaultTestContext(t) + db := newTestInstrumentedDBWithVersionSQL(ctx, t, "") + + db.mock.EXPECT().PingContext(mock.Anything).Return(errTest) + + err := db.PingContext(ctx) + require.Error(t, err) + require.ErrorIs(t, err, errTest) + + spans := db.tracer.Spans(ctx) + require.Len(t, spans, 1) + require.Equal(t, dbTracePingContext, spans[0].Name) + + v := getAttr(spans[0], attrServerVersion) + require.Equal(t, attribute.StringValue("unknown"), v) + }) +} + +const ( + testDriverName = "mysql" + testDefaultDBVersion = "8.0.39" // e.g. MySQL + testSQLiteDBVersion = "3.45.1" +) + +type testOTelDB struct { + mock *dbmocks.DB + tracer otelTestTracer + db.DB +} + +func newTestInstrumentedDB(ctx context.Context, t *testing.T) testOTelDB { + return newTestInstrumentedDBWithVersionSQL(ctx, t, dbVersionDefaultSQL) +} + +func newTestInstrumentedDBWithVersionSQL(ctx context.Context, t *testing.T, dbVersionSQL string) testOTelDB { + tr := newTestOTelTracer(ctx, t) + mDB := dbmocks.NewDB(t) + row := dbmocks.NewRow(t) + + mDB.EXPECT().DriverName().Return(testDriverName).Once() + mDB.EXPECT().QueryRowContext(mock.Anything, dbVersionDefaultSQL).Return(row) + if dbVersionSQL == dbVersionDefaultSQL { + row.EXPECT().Err().Return(nil) + dbmocks.ExpectRowValues(t, row, testDefaultDBVersion) + } else { + row.EXPECT().Err().Return(errTest) + row := dbmocks.NewRow(t) + mDB.EXPECT().QueryRowContext(mock.Anything, dbVersionSQLiteSQL).Return(row) + if dbVersionSQL == dbVersionSQLiteSQL { + row.EXPECT().Err().Return(nil) + dbmocks.ExpectRowValues(t, row, testSQLiteDBVersion) + } else { + row.EXPECT().Err().Return(errTest) + } + } + + return testOTelDB{ + mock: mDB, + tracer: tr, + DB: NewInstrumentedDB(mDB, tr), + } +} + +// otelTestTracer is a valid test trace.Tracer that records all spans as stubs. +// It has an additional method `Spans` that returns these stubs so that you can +// assert the correct behaviour of your instrumentation code. +type otelTestTracer struct { + t *testing.T + exporter *sdktracetest.InMemoryExporter + provider *sdktrace.TracerProvider + trace.Tracer +} + +// newTestOTelTracer returns a new otelTestTracer. The provided context will be +// used to automatically shutdown the trace.TracerProvider implmentation when +// the test exits. +func newTestOTelTracer(ctx context.Context, t *testing.T) otelTestTracer { + exporter := sdktracetest.NewInMemoryExporter() + provider := sdktrace.NewTracerProvider( + sdktrace.WithSyncer(exporter), + sdktrace.WithSampler(sdktrace.AlwaysSample()), + ) + t.Cleanup(func() { + err := provider.Shutdown(ctx) + require.NoError(t, err) + }) + return otelTestTracer{ + t: t, + exporter: exporter, + provider: provider, + Tracer: provider.Tracer("testtracer"), + } +} + +// Spans returns all the stubs recorded so far. The provided context is used to +// first flush all spans. +func (t otelTestTracer) Spans(ctx context.Context) sdktracetest.SpanStubs { + err := t.provider.ForceFlush(ctx) + require.NoError(t.t, err) + return t.exporter.GetSpans() +} + +// getAttr returns the value of the first attribute in a span with the given +// key. Returns an invalid attribute if it was not found. +func getAttr(s sdktracetest.SpanStub, key string) attribute.Value { + for _, attr := range s.Attributes { + if attr.Key == attribute.Key(key) { + return attr.Value + } + } + return attribute.Value{} // of type attribute.INVALID +} + +type spanMap = map[string]sdktracetest.SpanStub + +func newSpanMap(spans sdktracetest.SpanStubs) spanMap { + ret := make(map[string]sdktracetest.SpanStub, len(spans)) + for _, span := range spans { + ret[span.Name] = span + } + return ret +} + +func strictPartialOrder(t *testing.T, m spanMap, spanNames ...string) { + t.Helper() + visited := make(map[string]struct{}, len(spanNames)) + for i := 1; i < len(spanNames); i++ { + curName, nextName := spanNames[i-1], spanNames[i] + visited[curName] = struct{}{} + visited[nextName] = struct{}{} + + cur, ok := m[curName] + require.True(t, ok, "span %q not found", curName) + next, ok := m[nextName] + require.True(t, ok, "span %q not found", nextName) + require.True(t, !next.StartTime.Before(cur.StartTime), "span with "+ + "name %q did not happen before %q", curName, nextName) + } + + for spanName := range m { + if _, ok := visited[spanName]; !ok { + t.Errorf("untested span %q", spanName) + } + } +} + +func root(t *testing.T, m spanMap, rootSpanNames ...string) { + for _, rootSpanName := range rootSpanNames { + rootSpan, ok := m[rootSpanName] + require.True(t, ok, "root span %q not found", rootSpanName) + require.False(t, rootSpan.Parent.IsValid(), "%q is not a root span", + rootSpanName) + } +} + +func directChildren(t *testing.T, m spanMap, parentName string, childrenNames ...string) { + parent, ok := m[parentName] + require.True(t, ok, "parent span %q not found", parentName) + for _, childName := range childrenNames { + child, ok := m[childName] + require.True(t, ok, "child span %q not found", child) + require.True(t, parent.SpanContext.Equal(child.Parent), + "%q is not a child of %q", childName, parentName) + } +} + +func links(t *testing.T, m spanMap, linkToName string, linkFromNames ...string) { + linkTo, ok := m[linkToName] + require.True(t, ok, "LinkTo span %q not found", linkToName) + for _, linkFromName := range linkFromNames { + linkFrom, ok := m[linkFromName] + require.True(t, ok, "LinkFrom span %q not found", linkFromName) + var found bool + for i := 0; i < len(linkFrom.Links) && !found; i++ { + found = linkFrom.Links[i].SpanContext.Equal(linkTo.SpanContext) + } + require.True(t, found, "%q is not linked to %q", linkFromName, + linkToName) + } +} diff --git a/pkg/storage/unified/sql/dbutil/dbutil.go b/pkg/storage/unified/sql/dbutil/dbutil.go index 7f51dd623e5..03e6fbe0be7 100644 --- a/pkg/storage/unified/sql/dbutil/dbutil.go +++ b/pkg/storage/unified/sql/dbutil/dbutil.go @@ -10,11 +10,13 @@ import ( "strings" "text/template" + "go.opentelemetry.io/otel/attribute" + "github.com/grafana/grafana/pkg/storage/unified/sql/db" + "github.com/grafana/grafana/pkg/storage/unified/sql/db/otel" "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" ) -//nolint:unused const ( otelAttrBaseKey = "dbutil_" otelAttrTemplateNameKey = otelAttrBaseKey + "template" @@ -22,7 +24,10 @@ const ( ) func withOtelAttrs(ctx context.Context, tmplName, dialectName string) context.Context { - return ctx // TODO: in next PR + return otel.SetAttributes(ctx, + attribute.String(otelAttrTemplateNameKey, tmplName), + attribute.String(otelAttrDialectKey, dialectName), + ) } // SQLError is an error returned by the database, which includes additionally