Unistore Chore: Add database-level observability (#92266)

* add testing harness

* fix mockery and linters

* WIP

* wip

* fix transactions

* fix transaction tracing; add tracing by default

* rename package

* move WithTx to simplify logic of DB implementations

* fix potential issue with context deadline

* add db instrumentation to dbutil

* add otel tests

* improve naming

* minor fix in semantics and add comprehensive OTel testing

* fix naming

* instrument resourceVersionAtomicInc

* provide a default testing tracer

* fix docs

* fix typo in docs

* add semconv for k8s
This commit is contained in:
Diego Augusto Molina 2024-10-18 00:32:08 -03:00 committed by GitHub
parent 9125f0df20
commit 3e1f5559a6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 851 additions and 22 deletions

View File

@ -10,6 +10,8 @@ import (
"time"
"github.com/google/uuid"
"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"google.golang.org/protobuf/proto"
@ -164,7 +166,7 @@ func (b *backend) create(ctx context.Context, event resource.WriteEvent) (int64,
// 3. TODO: Rebuild the whole folder tree structure if we're creating a folder
// 4. Atomically increment resource version for this kind
rv, err := resourceVersionAtomicInc(ctx, tx, b.dialect, event.Key)
rv, err := b.resourceVersionAtomicInc(ctx, tx, event.Key)
if err != nil {
return fmt.Errorf("increment resource version: %w", err)
}
@ -222,7 +224,7 @@ func (b *backend) update(ctx context.Context, event resource.WriteEvent) (int64,
// 3. TODO: Rebuild the whole folder tree structure if we're creating a folder
// 4. Atomically increment resource version for this kind
rv, err := resourceVersionAtomicInc(ctx, tx, b.dialect, event.Key)
rv, err := b.resourceVersionAtomicInc(ctx, tx, event.Key)
if err != nil {
return fmt.Errorf("increment resource version: %w", err)
}
@ -282,7 +284,7 @@ func (b *backend) delete(ctx context.Context, event resource.WriteEvent) (int64,
// 3. TODO: Rebuild the whole folder tree structure if we're creating a folder
// 4. Atomically increment resource version for this kind
rv, err := resourceVersionAtomicInc(ctx, tx, b.dialect, event.Key)
rv, err := b.resourceVersionAtomicInc(ctx, tx, event.Key)
if err != nil {
return fmt.Errorf("increment resource version: %w", err)
}
@ -661,10 +663,18 @@ func (b *backend) poll(ctx context.Context, grp string, res string, since int64,
// TODO: Ideally we should attempt to update the RV in the resource and resource_history tables
// in a single roundtrip. This would reduce the latency of the operation, and also increase the
// throughput of the system. This is a good candidate for a future optimization.
func resourceVersionAtomicInc(ctx context.Context, x db.ContextExecer, d sqltemplate.Dialect, key *resource.ResourceKey) (newVersion int64, err error) {
func (b *backend) resourceVersionAtomicInc(ctx context.Context, x db.ContextExecer, key *resource.ResourceKey) (newVersion int64, err error) {
ctx, span := b.tracer.Start(ctx, "resourceVersionAtomicInc", trace.WithAttributes(
semconv.K8SNamespaceName(key.Namespace),
// TODO: the following attributes could use some standardization.
attribute.String("k8s.resource.group", key.Group),
attribute.String("k8s.resource.type", key.Resource),
))
defer span.End()
// 1. Lock to row and prevent concurrent updates until the transaction is committed.
res, err := dbutil.QueryRow(ctx, x, sqlResourceVersionGet, sqlResourceVersionGetRequest{
SQLTemplate: sqltemplate.New(d),
SQLTemplate: sqltemplate.New(b.dialect),
Group: key.Group,
Resource: key.Resource,
@ -674,14 +684,14 @@ func resourceVersionAtomicInc(ctx context.Context, x db.ContextExecer, d sqltemp
if errors.Is(err, sql.ErrNoRows) {
// if there wasn't a row associated with the given resource, then we create it.
if _, err = dbutil.Exec(ctx, x, sqlResourceVersionInsert, sqlResourceVersionUpsertRequest{
SQLTemplate: sqltemplate.New(d),
SQLTemplate: sqltemplate.New(b.dialect),
Group: key.Group,
Resource: key.Resource,
}); err != nil {
return 0, fmt.Errorf("insert into resource_version: %w", err)
}
res, err = dbutil.QueryRow(ctx, x, sqlResourceVersionGet, sqlResourceVersionGetRequest{
SQLTemplate: sqltemplate.New(d),
SQLTemplate: sqltemplate.New(b.dialect),
Group: key.Group,
Resource: key.Resource,
Response: new(resourceVersionResponse),
@ -702,7 +712,7 @@ func resourceVersionAtomicInc(ctx context.Context, x db.ContextExecer, d sqltemp
nextRV := max(res.CurrentEpoch, res.ResourceVersion+1)
_, err = dbutil.Exec(ctx, x, sqlResourceVersionUpdate, sqlResourceVersionUpsertRequest{
SQLTemplate: sqltemplate.New(d),
SQLTemplate: sqltemplate.New(b.dialect),
Group: key.Group,
Resource: key.Resource,
ResourceVersion: nextRV,

View File

@ -11,7 +11,6 @@ import (
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
"github.com/grafana/grafana/pkg/storage/unified/sql/test"
"github.com/grafana/grafana/pkg/util/testutil"
)
@ -217,15 +216,13 @@ func expectUnsuccessfulResourceVersionAtomicInc(t *testing.T, b testBackend, err
func TestResourceVersionAtomicInc(t *testing.T) {
t.Parallel()
dialect := sqltemplate.MySQL
t.Run("happy path - insert new row", func(t *testing.T) {
t.Parallel()
b, ctx := setupBackendTest(t)
expectSuccessfulResourceVersionAtomicInc(t, b) // returns RV=1
v, err := resourceVersionAtomicInc(ctx, b.DB, dialect, resKey)
v, err := b.resourceVersionAtomicInc(ctx, b.DB, resKey)
require.NoError(t, err)
require.Equal(t, int64(23456), v)
})
@ -238,7 +235,7 @@ func TestResourceVersionAtomicInc(t *testing.T) {
b.QueryWithResult("select resource_version for update", 2, Rows{{12345, 23456}})
b.ExecWithResult("update resource_version", 0, 1)
v, err := resourceVersionAtomicInc(ctx, b.DB, dialect, resKey)
v, err := b.resourceVersionAtomicInc(ctx, b.DB, resKey)
require.NoError(t, err)
require.Equal(t, int64(23456), v)
})
@ -248,7 +245,7 @@ func TestResourceVersionAtomicInc(t *testing.T) {
b, ctx := setupBackendTest(t)
b.QueryWithErr("select resource_version for update", errTest)
v, err := resourceVersionAtomicInc(ctx, b.DB, dialect, resKey)
v, err := b.resourceVersionAtomicInc(ctx, b.DB, resKey)
require.Zero(t, v)
require.Error(t, err)
require.ErrorContains(t, err, "lock the resource version")
@ -262,7 +259,7 @@ func TestResourceVersionAtomicInc(t *testing.T) {
b.QueryWithResult("select resource_version", 0, Rows{})
b.ExecWithErr("insert resource_version", errTest)
v, err := resourceVersionAtomicInc(ctx, b.DB, dialect, resKey)
v, err := b.resourceVersionAtomicInc(ctx, b.DB, resKey)
require.Zero(t, v)
require.Error(t, err)
require.ErrorContains(t, err, "insert into resource_version")
@ -275,7 +272,7 @@ func TestResourceVersionAtomicInc(t *testing.T) {
b.QueryWithResult("select resource_version for update", 2, Rows{{12345, 23456}})
b.ExecWithErr("update resource_version", errTest)
v, err := resourceVersionAtomicInc(ctx, b.DB, dialect, resKey)
v, err := b.resourceVersionAtomicInc(ctx, b.DB, resKey)
require.Zero(t, v)
require.Error(t, err)
require.ErrorContains(t, err, "increase resource version")

View File

@ -7,9 +7,9 @@ import (
"sync"
"github.com/dlmiddlecote/sqlstats"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"xorm.io/xorm"
infraDB "github.com/grafana/grafana/pkg/infra/db"
@ -17,6 +17,7 @@ import (
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/sql/db"
"github.com/grafana/grafana/pkg/storage/unified/sql/db/migrations"
"github.com/grafana/grafana/pkg/storage/unified/sql/db/otel"
)
const (
@ -33,7 +34,10 @@ var errGrafanaDBInstrumentedNotSupported = errors.New("the Resource API is " +
grafanaDBInstrumentQueriesKey + "` is enabled in [database], and that" +
" setup is currently unsupported. Please, consider disabling that flag")
func ProvideResourceDB(grafanaDB infraDB.DB, cfg *setting.Cfg, tracer tracing.Tracer) (db.DBProvider, error) {
func ProvideResourceDB(grafanaDB infraDB.DB, cfg *setting.Cfg, tracer trace.Tracer) (db.DBProvider, error) {
if tracer == nil {
tracer = noop.NewTracerProvider().Tracer("test-tracer")
}
p, err := newResourceDBProvider(grafanaDB, cfg, tracer)
if err != nil {
return nil, fmt.Errorf("provide Resource DB: %w", err)
@ -148,6 +152,7 @@ func (p *resourceDBProvider) init(ctx context.Context) (db.DB, error) {
}
d := NewDB(p.engine.DB().DB, p.engine.Dialect().DriverName())
d = otel.NewInstrumentedDB(d, p.tracer)
return d, nil
}

View File

@ -75,7 +75,7 @@ func TestReproIncident2144UsingGrafanaDB(t *testing.T) {
cfg := newCfgFromIniMap(t, cfgMap)
setupDBForGrafana(t, ctx, cfgMap)
grafanaDB := newTestInfraDB(t, cfgMap)
resourceDB, err := ProvideResourceDB(grafanaDB, cfg, testGrafanaTracer{})
resourceDB, err := ProvideResourceDB(grafanaDB, cfg, nil)
require.NotNil(t, resourceDB)
require.NoError(t, err)
})
@ -105,7 +105,7 @@ func TestReproIncident2144UsingGrafanaDB(t *testing.T) {
t.Run("Resource API provides a reasonable error for this case", func(t *testing.T) {
t.Parallel()
cfg := newCfgFromIniMap(t, cfgMap)
resourceDB, err := ProvideResourceDB(grafanaDB, cfg, testGrafanaTracer{})
resourceDB, err := ProvideResourceDB(grafanaDB, cfg, nil)
require.Nil(t, resourceDB)
require.Error(t, err)
require.ErrorIs(t, err, errGrafanaDBInstrumentedNotSupported)

View File

@ -0,0 +1,416 @@
package otel
import (
"context"
"database/sql"
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"github.com/grafana/grafana/pkg/storage/unified/sql/db"
)
const (
dbVersionDefaultSQL = `SELECT VERSION()`
dbVersionSQLiteSQL = `SELECT SQLITE_VERSION()`
dbTracePrefix = "sql.db."
dbTraceExecContext = dbTracePrefix + "exec_context"
dbTraceQueryContext = dbTracePrefix + "query_context"
dbTraceQueryRowContext = dbTracePrefix + "query_row_context"
dbTraceBeginTx = dbTracePrefix + "begin_tx"
dbTraceWithTx = dbTracePrefix + "with_tx"
dbTracePingContext = dbTracePrefix + "ping_context"
dbTraceTx = dbTracePrefix + "transaction"
txTracePrefix = "sql.db.tx."
txTraceExecContext = txTracePrefix + "exec_context"
txTraceQueryContext = txTracePrefix + "query_context"
txTraceQueryRowContext = txTracePrefix + "query_row_context"
txTraceCommit = txTracePrefix + "commit"
txTraceRollback = txTracePrefix + "rollback"
attrDriverName = "driver_name"
attrServerVersion = "server_version"
attrIsolationLevel = "isolation_level"
attrReadOnly = "read_only"
attrTxTerminationOp = "termination_op"
attrValTxTerminationOpBegin = "begin"
attrValTxTerminationOpCommit = "commit"
attrValTxTerminationOpRollback = "rollback"
)
var (
spanOptKindClient = trace.WithSpanKind(trace.SpanKindClient)
defaultTxOpts = new(sql.TxOptions)
)
type ctxKey struct{}
// SetAttributes returns a context with one-time use tracing attributes that
// will be attached to the next database operation span. The prupose of this is
// to trace specific SQL queries and operations.
// Example:
//
// // the following QueryContext operation will have an extra attribute
// ctx = SetAttributes(attribute.String("query", "get user by id"))
// res, err := myTracedTx.QueryContext(ctx, getUserByIDSQL, userID)
//
// // the following ExecContext operation will have a different extra attribute
// ctx = SetAttributes(attribute.String("query", "disable user"))
// err = myTracedTx.ExecContext(ctx, disableUserSQL, userID)
//
// // the following Commit operation will NOT have any extra attribute
// err = myTracedTx.Commit(ctx)
//
// NOTE: if you want several database operations to share the same set of
// attributes, consider using a span for them instead.
func SetAttributes(ctx context.Context, attrs ...attribute.KeyValue) context.Context {
// we will use a pointer so that we can set that pointer to `emptyAttrsList`
// after the first use. This will prevent the same attributes to be reused,
// even if the same context is used later. Additionally, if a context that
// was already used for this purpose is passed, we don't need to derive a
// new context
val, _ := ctx.Value(ctxKey{}).(*[]attribute.KeyValue)
if val == nil {
ctx = context.WithValue(ctx, ctxKey{}, &attrs)
} else {
*val = attrs
}
return ctx
}
func consumeAttributes(ctx context.Context) []attribute.KeyValue {
val, _ := ctx.Value(ctxKey{}).(*[]attribute.KeyValue)
if val == nil {
return nil
}
ret := *val
*val = nil
return ret
}
type otelDB struct {
db.DB
withTxFunc db.WithTxFunc
tracer trace.Tracer
driverName string
initOnce sync.Once
dbServerVersion string
}
// NewInstrumentedDB wraps the given db.DB, instrumenting it to provide
// OTEL-based observability.
func NewInstrumentedDB(d db.DB, tracer trace.Tracer) db.DB {
// TODO: periodically report metrics for stats returned by `db.Stats`
ret := &otelDB{
DB: d,
tracer: tracer,
driverName: d.DriverName(),
dbServerVersion: "unknown",
}
ret.withTxFunc = db.NewWithTxFunc(ret.BeginTx)
return ret
}
func (x *otelDB) init(ctx context.Context) {
x.initOnce.Do(func() {
// there is a chance that the context of the first operation run on the
// `*otelDB` has a very soon deadline, is cancelled by the client while
// we use it, or is even already done. This would cause this operation,
// which is run only once, to fail and we would no longer be able to
// know the database details. Thus, we impose a long timeout that we
// know is very likely to succeed, since getting the server version
// should be nearly a nop. We could instead create a new context with
// timeout from context.Background(), but we opt to derive one from the
// provided context just in case any value in it would be meaningful to
// the downstream implementation in `x.DB`. This is also to future-proof
// this implementation for the case that we might have use another
// decorator wrapping the actual implementation.
const timeout = 5 * time.Second
ctx = context.WithoutCancel(ctx)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
row := x.DB.QueryRowContext(ctx, dbVersionDefaultSQL)
if row.Err() != nil {
row = x.DB.QueryRowContext(ctx, dbVersionSQLiteSQL)
if row.Err() != nil {
return
}
}
var dbServerVersion string
if err := row.Scan(&dbServerVersion); err == nil {
x.dbServerVersion = dbServerVersion
}
})
}
func (x *otelDB) startSpan(ctx context.Context, name string) (context.Context, trace.Span) {
x.init(ctx)
attrs := append(consumeAttributes(ctx),
attribute.String(attrDriverName, x.driverName),
attribute.String(attrServerVersion, x.dbServerVersion),
)
ctx, span := x.tracer.Start(ctx, name, spanOptKindClient,
trace.WithAttributes(attrs...))
return ctx, span
}
func (x *otelDB) ExecContext(ctx context.Context, query string, args ...any) (db.Result, error) {
ctx, span := x.startSpan(ctx, dbTraceExecContext)
defer span.End()
res, err := x.DB.ExecContext(ctx, query, args...)
setSpanOutcome(span, err)
return res, err
}
func (x *otelDB) QueryContext(ctx context.Context, query string, args ...any) (db.Rows, error) {
ctx, span := x.startSpan(ctx, dbTraceQueryContext)
defer span.End()
rows, err := x.DB.QueryContext(ctx, query, args...)
setSpanOutcome(span, err)
return rows, err
}
func (x *otelDB) QueryRowContext(ctx context.Context, query string, args ...any) db.Row {
ctx, span := x.startSpan(ctx, dbTraceQueryRowContext)
defer span.End()
row := x.DB.QueryRowContext(ctx, query, args...)
setSpanOutcome(span, row.Err())
return row
}
func (x *otelDB) BeginTx(ctx context.Context, opts *sql.TxOptions) (db.Tx, error) {
parentSpanID := trace.SpanFromContext(ctx).SpanContext().SpanID().String()
// create a new span that will encompass the whole transaction as a single
// operation. This span will be ended if we fail in BEGIN, or when COMMIT or
// ROLLBACK are called on the returned instrumented db.Tx
txCtx, txSpan := x.startSpan(ctx, dbTraceTx)
setSpanTxOpts(txSpan, opts)
// make sure to end the transaction span if BeginTx later fails
var err error
defer func() {
if err != nil {
txSpan.SetAttributes(attribute.String(attrTxTerminationOp,
attrValTxTerminationOpBegin))
setSpanOutcome(txSpan, err)
txSpan.End()
}
}()
ret := otelTx{
// we only miss defining `tx` here, which is defined later
span: txSpan, // will only be used during COMMIT/ROLLBACK
tracerStartFunc: func(n string, o ...trace.SpanStartOption) trace.Span {
_, span := x.tracer.Start(txCtx, n, o...)
return span
},
parentSpanID: parentSpanID,
}
// start the span for BEGIN as a regular child span of the transaction span
ctx, span := ret.startSpan(ctx, dbTraceBeginTx)
defer span.End()
ret.tx, err = x.DB.BeginTx(ctx, opts) // set ret.tx that we were missing
setSpanOutcome(span, err)
if err != nil {
return nil, err
}
return ret, nil
}
func (x *otelDB) WithTx(ctx context.Context, opts *sql.TxOptions, f db.TxFunc) error {
return x.withTxFunc(ctx, opts, f)
}
func (x *otelDB) PingContext(ctx context.Context) error {
ctx, span := x.startSpan(ctx, dbTracePingContext)
defer span.End()
err := x.DB.PingContext(ctx)
setSpanOutcome(span, err)
return err
}
type otelTx struct {
tx db.Tx
span trace.Span
tracerStartFunc func(string, ...trace.SpanStartOption) trace.Span
parentSpanID string
}
// startSpan returns a new span, and optionally a context.Context if
// `optionalCtx` is not nil. It will be nil in the cases of the `Commit` and
// `Rollback` methods, since they do not have a context.Context either. The
// returned span will be a child span of the transaction span started at the
// beginning of the transaction. If `optionalCtx` is not nil, then the returned
// span will also be linked to the span found in that context (if it differs from
// the one used to create the transaction).
//
// Example operation with additional spans with current implementation:
//
// parentSpan
// |
// +------------------+
// | |
// | v
// | exampleSubSpan
// | |
// | +-------------+
// | : |
// v : v
// transactionSpan (spanLink) nonTxQuerySpan
// | :
// +------------------+-----------------------------+
// | | |
// v v v
// beginTxSpan execSpan commitSpan
//
// Note that understanding what is being executed in the same transaction is
// very clear because you just need to follow the vertical solid lines, which
// denote a parent-child relationship. We also know that the `execSpan` was
// originated in `exampleSubSpan` because of the spanLink (with a vertical
// dotted line), but there is no chance that we get confused and think that
// `nonTxQuerySpan` was part of the transaction (it is not related in any
// manner). What we do here is to make any transactional db operation a child of
// the transaction span, which aligns with the OTEL concept of a span being 'a
// unit of work'. In this sense, a transaction is our unit of work, which also
// aligns semantically with the database concept of transaction. This allows us
// to clearly visualize important database semantics within the OTEL framework.
// In OTEL, span links exist to associate one span with one or more spans,
// implying a causal relationship. In our case, we can see that while `execSpan`
// is part of the unit of work called "database transaction", it actually has a
// different cause: the `exampleSubSpan` for `execSpan`.
//
// For comparison, consider the following naïve alternative just passing the
// context around:
//
// parentSpan commitSpan
// |
// +------------------+
// | |
// v v
// beginTxSpan exampleSubSpan
// |
// +-------------+
// | |
// v v
// execSpan nonTxQuerySpan
//
// In this case, it is not straightforward to know what operations are part of
// the transaction. When looking at the traces, it will be very easy to be
// confused and think that `nonTxQuerySpan` was part of the transaction.
// Additionally, note that `commitSpan` is a root span, since the `Commit`
// method doesn't receive a context (same as `Rollback`), hence all such
// operations would end up being a scattered root span, making it hard to
// correlate.
func (x otelTx) startSpan(optionalCtx context.Context, name string) (context.Context, trace.Span) {
// minimum number of options for the span
startOpts := make([]trace.SpanStartOption, 0, 2)
startOpts = append(startOpts, spanOptKindClient)
if optionalCtx != nil {
attrs := consumeAttributes(optionalCtx)
spanLink := trace.LinkFromContext(optionalCtx, attrs...)
if spanLink.SpanContext.SpanID().String() != x.parentSpanID {
// it only makes sense to create a link to the span in `optionalCtx`
// if it's not the same as the one used during BEGIN
startOpts = append(startOpts, trace.WithLinks(spanLink))
} else if len(attrs) > 0 {
// otherwise, we just add the extra attributes added with
// `SetAttributes`.
startOpts = append(startOpts, trace.WithAttributes(attrs...))
}
}
span := x.tracerStartFunc(name, startOpts...)
if optionalCtx != nil {
// we preserve the original intended context, we only override the span
// with the one we created
optionalCtx = trace.ContextWithSpan(optionalCtx, span)
}
return optionalCtx, span
}
func (x otelTx) ExecContext(ctx context.Context, query string, args ...any) (db.Result, error) {
ctx, span := x.startSpan(ctx, txTraceExecContext)
defer span.End()
res, err := x.tx.ExecContext(ctx, query, args...)
setSpanOutcome(span, err)
return res, err
}
func (x otelTx) QueryContext(ctx context.Context, query string, args ...any) (db.Rows, error) {
ctx, span := x.startSpan(ctx, txTraceQueryContext)
defer span.End()
rows, err := x.tx.QueryContext(ctx, query, args...)
setSpanOutcome(span, err)
return rows, err
}
func (x otelTx) QueryRowContext(ctx context.Context, query string, args ...any) db.Row {
ctx, span := x.startSpan(ctx, txTraceQueryRowContext)
defer span.End()
row := x.tx.QueryRowContext(ctx, query, args...)
setSpanOutcome(span, row.Err())
return row
}
func (x otelTx) Commit() error {
x.span.SetAttributes(attribute.String(attrTxTerminationOp,
attrValTxTerminationOpCommit))
defer x.span.End()
_, span := x.startSpan(nil, txTraceCommit) //nolint:staticcheck
defer span.End()
err := x.tx.Commit()
setSpanOutcome(span, err)
setSpanOutcome(x.span, err)
return err
}
func (x otelTx) Rollback() error {
x.span.SetAttributes(attribute.String(attrTxTerminationOp,
attrValTxTerminationOpRollback))
defer x.span.End()
_, span := x.startSpan(nil, txTraceRollback) //nolint:staticcheck
defer span.End()
err := x.tx.Rollback()
setSpanOutcome(span, err)
setSpanOutcome(x.span, err)
return err
}
func setSpanTxOpts(span trace.Span, opts *sql.TxOptions) {
if opts == nil {
opts = defaultTxOpts
}
span.SetAttributes(
attribute.String(attrIsolationLevel, opts.Isolation.String()),
attribute.Bool(attrReadOnly, opts.ReadOnly),
)
}
func setSpanOutcome(span trace.Span, err error) {
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "")
}
}

View File

@ -0,0 +1,396 @@
package otel
import (
"context"
"database/sql"
"errors"
"testing"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
sdktracetest "go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.opentelemetry.io/otel/trace"
"github.com/grafana/grafana/pkg/storage/unified/sql/db"
dbmocks "github.com/grafana/grafana/pkg/storage/unified/sql/db/mocks"
"github.com/grafana/grafana/pkg/util/testutil"
)
var errTest = errors.New("because of reasons")
func TestContextAttributes(t *testing.T) {
t.Parallel()
// test context. Note that it's perfectly safe to use context.Background()
// and there is no risk of the test blocking at any point because we will
// not use the deadline or signal cancellation features of the context
ctx := context.Background()
// test attributes
attr1 := attribute.String("the key", "the value")
attr2 := attribute.String("the other key", "the other value")
attr3 := attribute.String("why not", "have another value")
attr4 := attribute.String("it's free", "they say")
// the subtests are not Parallel because we define this test as a storyline,
// since we are interested in testing state changes in the context
t.Run("consumeAttributes returns nil if SetAttributes was never called",
func(t *testing.T) {
attrs := consumeAttributes(ctx)
require.Nil(t, attrs)
})
t.Run("setting and getting attributes", func(t *testing.T) {
ctx = SetAttributes(ctx, attr1, attr2)
attrs := consumeAttributes(ctx)
require.Len(t, attrs, 2)
require.Equal(t, attr1, attrs[0])
require.Equal(t, attr2, attrs[1])
})
t.Run("attributes are now cleared", func(t *testing.T) {
attrs := consumeAttributes(ctx)
require.Len(t, attrs, 0)
})
t.Run("SetAttributes overwrites previous attributes", func(t *testing.T) {
ctx = SetAttributes(ctx, attr1, attr2)
ctx = SetAttributes(ctx, attr3, attr4)
attrs := consumeAttributes(ctx)
require.Len(t, attrs, 2)
require.Equal(t, attr3, attrs[0])
require.Equal(t, attr4, attrs[1])
})
t.Run("attributes are now cleared again", func(t *testing.T) {
attrs := consumeAttributes(ctx)
require.Len(t, attrs, 0)
})
}
func TestOTelTransactions(t *testing.T) {
t.Parallel()
const (
rootSpanName = "root of the operation"
internalSpanName = "sub-operation"
)
ctx := context.Context(testutil.NewDefaultTestContext(t))
d := newTestInstrumentedDB(ctx, t)
mTx := dbmocks.NewTx(t)
txOpts := &sql.TxOptions{
Isolation: sql.LevelReadCommitted,
}
// create the root span
ctx, rootSpan := d.tracer.Start(ctx, rootSpanName)
// begin
d.mock.EXPECT().BeginTx(mock.Anything, txOpts).Return(mTx, nil)
tx, err := d.BeginTx(ctx, txOpts)
require.NoError(t, err)
// create a new span for the new operations
ctx, internalSpan := d.tracer.Start(ctx, internalSpanName)
// execute an operation within the transaction
mTx.EXPECT().ExecContext(mock.Anything, mock.Anything).
Return(dbmocks.NewResult(t), nil)
res, err := tx.ExecContext(ctx, `DELETE FROM users; -- :)`)
require.NoError(t, err)
require.NotNil(t, res)
// run a query concurrently outside of the transaction, but while the
// transaction is still open
d.mock.EXPECT().QueryContext(mock.Anything, mock.Anything).
Return(dbmocks.NewRows(t), nil)
rows, err := d.QueryContext(ctx, `SELECT * FROM users;`)
require.NoError(t, err)
require.NotNil(t, rows)
internalSpan.End()
// commit
mTx.EXPECT().Commit().Return(nil)
err = tx.Commit()
require.NoError(t, err)
rootSpan.End()
// assert spans
spanm := newSpanMap(d.tracer.Spans(ctx))
require.Len(t, spanm, 7)
// span creation order
strictPartialOrder(t, spanm,
rootSpanName,
dbTraceTx,
dbTraceBeginTx,
internalSpanName,
txTraceExecContext,
dbTraceQueryContext,
txTraceCommit,
)
// parent-child hierarchy relationships
root(t, spanm, rootSpanName)
directChildren(t, spanm, rootSpanName,
dbTraceTx,
internalSpanName,
)
directChildren(t, spanm, dbTraceTx,
dbTraceBeginTx,
txTraceExecContext,
txTraceCommit,
)
directChildren(t, spanm, internalSpanName,
dbTraceQueryContext,
)
// link relationships
links(t, spanm, internalSpanName,
txTraceExecContext,
)
}
func TestOTelDB_PingContext(t *testing.T) {
t.Parallel()
t.Run("happy path - default DB version", func(t *testing.T) {
t.Parallel()
ctx := testutil.NewDefaultTestContext(t)
db := newTestInstrumentedDBWithVersionSQL(ctx, t, dbVersionDefaultSQL)
db.mock.EXPECT().PingContext(mock.Anything).Return(nil)
err := db.PingContext(ctx)
require.NoError(t, err)
spans := db.tracer.Spans(ctx)
require.Len(t, spans, 1)
require.Equal(t, dbTracePingContext, spans[0].Name)
v := getAttr(spans[0], attrServerVersion)
require.Equal(t, attribute.StringValue(testDefaultDBVersion), v)
})
t.Run("happy path - SQLite DB version", func(t *testing.T) {
t.Parallel()
ctx := testutil.NewDefaultTestContext(t)
db := newTestInstrumentedDBWithVersionSQL(ctx, t, dbVersionSQLiteSQL)
db.mock.EXPECT().PingContext(mock.Anything).Return(nil)
err := db.PingContext(ctx)
require.NoError(t, err)
spans := db.tracer.Spans(ctx)
require.Len(t, spans, 1)
require.Equal(t, dbTracePingContext, spans[0].Name)
v := getAttr(spans[0], attrServerVersion)
require.Equal(t, attribute.StringValue(testSQLiteDBVersion), v)
})
t.Run("happy path - unknown DB version", func(t *testing.T) {
t.Parallel()
ctx := testutil.NewDefaultTestContext(t)
db := newTestInstrumentedDBWithVersionSQL(ctx, t, "")
db.mock.EXPECT().PingContext(mock.Anything).Return(nil)
err := db.PingContext(ctx)
require.NoError(t, err)
spans := db.tracer.Spans(ctx)
require.Len(t, spans, 1)
require.Equal(t, dbTracePingContext, spans[0].Name)
v := getAttr(spans[0], attrServerVersion)
require.Equal(t, attribute.StringValue("unknown"), v)
})
t.Run("fail making ping", func(t *testing.T) {
t.Parallel()
ctx := testutil.NewDefaultTestContext(t)
db := newTestInstrumentedDBWithVersionSQL(ctx, t, "")
db.mock.EXPECT().PingContext(mock.Anything).Return(errTest)
err := db.PingContext(ctx)
require.Error(t, err)
require.ErrorIs(t, err, errTest)
spans := db.tracer.Spans(ctx)
require.Len(t, spans, 1)
require.Equal(t, dbTracePingContext, spans[0].Name)
v := getAttr(spans[0], attrServerVersion)
require.Equal(t, attribute.StringValue("unknown"), v)
})
}
const (
testDriverName = "mysql"
testDefaultDBVersion = "8.0.39" // e.g. MySQL
testSQLiteDBVersion = "3.45.1"
)
type testOTelDB struct {
mock *dbmocks.DB
tracer otelTestTracer
db.DB
}
func newTestInstrumentedDB(ctx context.Context, t *testing.T) testOTelDB {
return newTestInstrumentedDBWithVersionSQL(ctx, t, dbVersionDefaultSQL)
}
func newTestInstrumentedDBWithVersionSQL(ctx context.Context, t *testing.T, dbVersionSQL string) testOTelDB {
tr := newTestOTelTracer(ctx, t)
mDB := dbmocks.NewDB(t)
row := dbmocks.NewRow(t)
mDB.EXPECT().DriverName().Return(testDriverName).Once()
mDB.EXPECT().QueryRowContext(mock.Anything, dbVersionDefaultSQL).Return(row)
if dbVersionSQL == dbVersionDefaultSQL {
row.EXPECT().Err().Return(nil)
dbmocks.ExpectRowValues(t, row, testDefaultDBVersion)
} else {
row.EXPECT().Err().Return(errTest)
row := dbmocks.NewRow(t)
mDB.EXPECT().QueryRowContext(mock.Anything, dbVersionSQLiteSQL).Return(row)
if dbVersionSQL == dbVersionSQLiteSQL {
row.EXPECT().Err().Return(nil)
dbmocks.ExpectRowValues(t, row, testSQLiteDBVersion)
} else {
row.EXPECT().Err().Return(errTest)
}
}
return testOTelDB{
mock: mDB,
tracer: tr,
DB: NewInstrumentedDB(mDB, tr),
}
}
// otelTestTracer is a valid test trace.Tracer that records all spans as stubs.
// It has an additional method `Spans` that returns these stubs so that you can
// assert the correct behaviour of your instrumentation code.
type otelTestTracer struct {
t *testing.T
exporter *sdktracetest.InMemoryExporter
provider *sdktrace.TracerProvider
trace.Tracer
}
// newTestOTelTracer returns a new otelTestTracer. The provided context will be
// used to automatically shutdown the trace.TracerProvider implmentation when
// the test exits.
func newTestOTelTracer(ctx context.Context, t *testing.T) otelTestTracer {
exporter := sdktracetest.NewInMemoryExporter()
provider := sdktrace.NewTracerProvider(
sdktrace.WithSyncer(exporter),
sdktrace.WithSampler(sdktrace.AlwaysSample()),
)
t.Cleanup(func() {
err := provider.Shutdown(ctx)
require.NoError(t, err)
})
return otelTestTracer{
t: t,
exporter: exporter,
provider: provider,
Tracer: provider.Tracer("testtracer"),
}
}
// Spans returns all the stubs recorded so far. The provided context is used to
// first flush all spans.
func (t otelTestTracer) Spans(ctx context.Context) sdktracetest.SpanStubs {
err := t.provider.ForceFlush(ctx)
require.NoError(t.t, err)
return t.exporter.GetSpans()
}
// getAttr returns the value of the first attribute in a span with the given
// key. Returns an invalid attribute if it was not found.
func getAttr(s sdktracetest.SpanStub, key string) attribute.Value {
for _, attr := range s.Attributes {
if attr.Key == attribute.Key(key) {
return attr.Value
}
}
return attribute.Value{} // of type attribute.INVALID
}
type spanMap = map[string]sdktracetest.SpanStub
func newSpanMap(spans sdktracetest.SpanStubs) spanMap {
ret := make(map[string]sdktracetest.SpanStub, len(spans))
for _, span := range spans {
ret[span.Name] = span
}
return ret
}
func strictPartialOrder(t *testing.T, m spanMap, spanNames ...string) {
t.Helper()
visited := make(map[string]struct{}, len(spanNames))
for i := 1; i < len(spanNames); i++ {
curName, nextName := spanNames[i-1], spanNames[i]
visited[curName] = struct{}{}
visited[nextName] = struct{}{}
cur, ok := m[curName]
require.True(t, ok, "span %q not found", curName)
next, ok := m[nextName]
require.True(t, ok, "span %q not found", nextName)
require.True(t, !next.StartTime.Before(cur.StartTime), "span with "+
"name %q did not happen before %q", curName, nextName)
}
for spanName := range m {
if _, ok := visited[spanName]; !ok {
t.Errorf("untested span %q", spanName)
}
}
}
func root(t *testing.T, m spanMap, rootSpanNames ...string) {
for _, rootSpanName := range rootSpanNames {
rootSpan, ok := m[rootSpanName]
require.True(t, ok, "root span %q not found", rootSpanName)
require.False(t, rootSpan.Parent.IsValid(), "%q is not a root span",
rootSpanName)
}
}
func directChildren(t *testing.T, m spanMap, parentName string, childrenNames ...string) {
parent, ok := m[parentName]
require.True(t, ok, "parent span %q not found", parentName)
for _, childName := range childrenNames {
child, ok := m[childName]
require.True(t, ok, "child span %q not found", child)
require.True(t, parent.SpanContext.Equal(child.Parent),
"%q is not a child of %q", childName, parentName)
}
}
func links(t *testing.T, m spanMap, linkToName string, linkFromNames ...string) {
linkTo, ok := m[linkToName]
require.True(t, ok, "LinkTo span %q not found", linkToName)
for _, linkFromName := range linkFromNames {
linkFrom, ok := m[linkFromName]
require.True(t, ok, "LinkFrom span %q not found", linkFromName)
var found bool
for i := 0; i < len(linkFrom.Links) && !found; i++ {
found = linkFrom.Links[i].SpanContext.Equal(linkTo.SpanContext)
}
require.True(t, found, "%q is not linked to %q", linkFromName,
linkToName)
}
}

View File

@ -10,11 +10,13 @@ import (
"strings"
"text/template"
"go.opentelemetry.io/otel/attribute"
"github.com/grafana/grafana/pkg/storage/unified/sql/db"
"github.com/grafana/grafana/pkg/storage/unified/sql/db/otel"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
)
//nolint:unused
const (
otelAttrBaseKey = "dbutil_"
otelAttrTemplateNameKey = otelAttrBaseKey + "template"
@ -22,7 +24,10 @@ const (
)
func withOtelAttrs(ctx context.Context, tmplName, dialectName string) context.Context {
return ctx // TODO: in next PR
return otel.SetAttributes(ctx,
attribute.String(otelAttrTemplateNameKey, tmplName),
attribute.String(otelAttrDialectKey, dialectName),
)
}
// SQLError is an error returned by the database, which includes additionally