From ced7a84f8b0e327c192e63b76c032056692058e8 Mon Sep 17 00:00:00 2001 From: Dave Henderson Date: Thu, 5 Dec 2024 17:22:19 -0500 Subject: [PATCH] chore(metrics): Add metrics & traces to DB migration (#97181) Signed-off-by: Dave Henderson --- pkg/infra/metrics/metricutil/utils.go | 35 ++++ pkg/infra/tracing/tracing_config.go | 3 +- pkg/services/sqlstore/migrator/migrator.go | 219 +++++++++++++++------ pkg/services/sqlstore/sqlstore.go | 34 ++-- 4 files changed, 219 insertions(+), 72 deletions(-) diff --git a/pkg/infra/metrics/metricutil/utils.go b/pkg/infra/metrics/metricutil/utils.go index c26f5363f8a..03d126f022a 100644 --- a/pkg/infra/metrics/metricutil/utils.go +++ b/pkg/infra/metrics/metricutil/utils.go @@ -1,11 +1,14 @@ package metricutil import ( + "context" "errors" "fmt" "strings" + "time" "github.com/prometheus/client_golang/prometheus" + "go.opentelemetry.io/otel/trace" ) // SanitizeLabelName removes all invalid chars from the label name. @@ -84,3 +87,35 @@ func copyLabelSet(ls prometheus.Labels) prometheus.Labels { } return newLs } + +// wraps prometheus.NewHistogram, adding a central native histogram config +func NewHistogramVec(opts prometheus.HistogramOpts, labels []string) *prometheus.HistogramVec { + if opts.NativeHistogramBucketFactor == 0 { + // Enable native histograms, with the factor suggested in the docs + opts.NativeHistogramBucketFactor = 1.1 + } + if opts.NativeHistogramMaxBucketNumber == 0 { + // OTel default + opts.NativeHistogramMaxBucketNumber = 160 + } + if opts.NativeHistogramMinResetDuration == 0 { + // Reset buckets every hour by default - override this if you want to + // keep buckets around for longer + opts.NativeHistogramMinResetDuration = 1 * time.Hour + } + + return prometheus.NewHistogramVec(opts, labels) +} + +func ObserveWithExemplar(ctx context.Context, histogram prometheus.Observer, value float64) { + traceID := trace.SpanContextFromContext(ctx).TraceID() + if traceID.IsValid() { + histogram.(prometheus.ExemplarObserver).ObserveWithExemplar( + value, + prometheus.Labels{"traceID": traceID.String()}, + ) + return + } + + histogram.Observe(value) +} diff --git a/pkg/infra/tracing/tracing_config.go b/pkg/infra/tracing/tracing_config.go index 36d5ca6f908..b6dc4863c89 100644 --- a/pkg/infra/tracing/tracing_config.go +++ b/pkg/infra/tracing/tracing_config.go @@ -5,8 +5,9 @@ import ( "os" "strings" - "github.com/grafana/grafana/pkg/setting" "go.opentelemetry.io/otel/attribute" + + "github.com/grafana/grafana/pkg/setting" ) type TracingConfig struct { diff --git a/pkg/services/sqlstore/migrator/migrator.go b/pkg/services/sqlstore/migrator/migrator.go index 4cc23d2b855..cdf5603cf31 100644 --- a/pkg/services/sqlstore/migrator/migrator.go +++ b/pkg/services/sqlstore/migrator/migrator.go @@ -1,6 +1,7 @@ package migrator import ( + "context" "errors" "fmt" "time" @@ -9,10 +10,17 @@ import ( "github.com/golang-migrate/migrate/v4/database" _ "github.com/lib/pq" "github.com/mattn/go-sqlite3" + "github.com/prometheus/client_golang/prometheus" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" "go.uber.org/atomic" "xorm.io/xorm" "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/infra/metrics/metricutil" + "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/setting" ) @@ -21,6 +29,8 @@ var ( ErrMigratorIsUnlocked = fmt.Errorf("migrator is unlocked") ) +var tracer = otel.Tracer("github.com/grafana/grafana/pkg/services/sqlstore/migrator") + type Migrator struct { DBEngine *xorm.Engine Dialect Dialect @@ -31,6 +41,8 @@ type Migrator struct { isLocked atomic.Bool logMap map[string]MigrationLog tableName string + + metrics migratorMetrics } type MigrationLog struct { @@ -42,6 +54,12 @@ type MigrationLog struct { Timestamp time.Time } +type migratorMetrics struct { + migCount *prometheus.CounterVec + migDuration *prometheus.HistogramVec + totalMigDuration *prometheus.HistogramVec +} + func NewMigrator(engine *xorm.Engine, cfg *setting.Cfg) *Migrator { return NewScopedMigrator(engine, cfg, "") } @@ -54,6 +72,26 @@ func NewScopedMigrator(engine *xorm.Engine, cfg *setting.Cfg, scope string) *Mig migrations: make([]Migration, 0), migrationIds: make(map[string]struct{}), Dialect: NewDialect(engine.DriverName()), + metrics: migratorMetrics{ + migCount: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "grafana_database", + Subsystem: scope, + Name: "migrations_total", + Help: "Total number of SQL migrations", + }, []string{"success"}), + migDuration: metricutil.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "grafana_database", + Subsystem: scope, + Name: "migration_duration_seconds", + Help: "Individual SQL migration duration in seconds", + }, []string{"success"}), + totalMigDuration: metricutil.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "grafana_database", + Subsystem: scope, + Name: "all_migrations_duration_seconds", + Help: "Duration of the entire SQL migration process in seconds", + }, []string{"success"}), + }, } if scope == "" { mg.tableName = "migration_log" @@ -65,6 +103,20 @@ func NewScopedMigrator(engine *xorm.Engine, cfg *setting.Cfg, scope string) *Mig return mg } +// Collect implements Prometheus.Collector. +func (mg *Migrator) Collect(ch chan<- prometheus.Metric) { + mg.metrics.migCount.Collect(ch) + mg.metrics.migDuration.Collect(ch) + mg.metrics.totalMigDuration.Collect(ch) +} + +// Describe implements Prometheus.Collector. +func (mg *Migrator) Describe(ch chan<- *prometheus.Desc) { + mg.metrics.migCount.Describe(ch) + mg.metrics.migDuration.Describe(ch) + mg.metrics.totalMigDuration.Describe(ch) +} + // AddCreateMigration adds the initial migration log table -- this should likely be // automatic and first, but enough tests exists that do not expect that we can keep it explicit func (mg *Migrator) AddCreateMigration() { @@ -139,9 +191,14 @@ func (mg *Migrator) RemoveMigrationLogs(migrationsIDs ...string) { } } +// soft-deprecated: use RunMigrations instead (will be fully deprecated later) func (mg *Migrator) Start(isDatabaseLockingEnabled bool, lockAttemptTimeout int) (err error) { + return mg.RunMigrations(context.Background(), isDatabaseLockingEnabled, lockAttemptTimeout) +} + +func (mg *Migrator) RunMigrations(ctx context.Context, isDatabaseLockingEnabled bool, lockAttemptTimeout int) (err error) { if !isDatabaseLockingEnabled { - return mg.run() + return mg.run(ctx) } dbName, err := mg.Dialect.GetDBName(mg.DBEngine.DataSourceName()) @@ -153,8 +210,10 @@ func (mg *Migrator) Start(isDatabaseLockingEnabled bool, lockAttemptTimeout int) return err } + logger := mg.Logger.FromContext(ctx) + return mg.InTransaction(func(sess *xorm.Session) error { - mg.Logger.Info("Locking database") + logger.Info("Locking database") lockCfg := LockCfg{ Session: sess, Key: key, @@ -162,110 +221,154 @@ func (mg *Migrator) Start(isDatabaseLockingEnabled bool, lockAttemptTimeout int) } if err := casRestoreOnErr(&mg.isLocked, false, true, ErrMigratorIsLocked, mg.Dialect.Lock, lockCfg); err != nil { - mg.Logger.Error("Failed to lock database", "error", err) + logger.Error("Failed to lock database", "error", err) return err } defer func() { - mg.Logger.Info("Unlocking database") + logger.Info("Unlocking database") unlockErr := casRestoreOnErr(&mg.isLocked, true, false, ErrMigratorIsUnlocked, mg.Dialect.Unlock, lockCfg) if unlockErr != nil { - mg.Logger.Error("Failed to unlock database", "error", unlockErr) + logger.Error("Failed to unlock database", "error", unlockErr) } }() // migration will run inside a nested transaction - return mg.run() + return mg.run(ctx) }) } -func (mg *Migrator) run() (err error) { - mg.Logger.Info("Starting DB migrations") +func (mg *Migrator) run(ctx context.Context) (err error) { + ctx, span := tracer.Start(ctx, "Migrator.run") + defer span.End() + + logger := mg.Logger.FromContext(ctx) + + logger.Info("Starting DB migrations") _, err = mg.GetMigrationLog() if err != nil { return err } + successLabel := prometheus.Labels{"success": "true"} + migrationsPerformed := 0 migrationsSkipped := 0 start := time.Now() for _, m := range mg.migrations { - m := m _, exists := mg.logMap[m.Id()] if exists { - mg.Logger.Debug("Skipping migration: Already executed", "id", m.Id()) + logger.Debug("Skipping migration: Already executed", "id", m.Id()) + span.AddEvent("Skipping migration: Already executed", + trace.WithAttributes(attribute.String("migration_id", m.Id())), + ) migrationsSkipped++ continue } - sql := m.SQL(mg.Dialect) + migStart := time.Now() - record := MigrationLog{ - MigrationID: m.Id(), - SQL: sql, - Timestamp: time.Now(), - } - - err := mg.InTransaction(func(sess *xorm.Session) error { - err := mg.exec(m, sess) - // if we get an sqlite busy/locked error, sleep 100ms and try again - cnt := 0 - for cnt < 3 && (errors.Is(err, sqlite3.ErrLocked) || errors.Is(err, sqlite3.ErrBusy)) { - cnt++ - mg.Logger.Debug("Database locked, sleeping then retrying", "error", err, "sql", sql) - time.Sleep(100 * time.Millisecond) - err = mg.exec(m, sess) - } - - if err != nil { - mg.Logger.Error("Exec failed", "error", err, "sql", sql) - record.Error = err.Error() - if !m.SkipMigrationLog() { - if _, err := sess.Table(mg.tableName).Insert(&record); err != nil { - return err - } - } - return err - } - record.Success = true - if !m.SkipMigrationLog() { - _, err = sess.Table(mg.tableName).Insert(&record) - } - if err == nil { - migrationsPerformed++ - } + if err := mg.doMigration(ctx, m); err != nil { + failLabel := prometheus.Labels{"success": "false"} + metricutil.ObserveWithExemplar(ctx, mg.metrics.migDuration.With(failLabel), time.Since(migStart).Seconds()) + mg.metrics.migCount.With(failLabel).Inc() return err - }) - if err != nil { - return fmt.Errorf("%v: %w", fmt.Sprintf("migration failed (id = %s)", m.Id()), err) } + + metricutil.ObserveWithExemplar(ctx, mg.metrics.migDuration.With(successLabel), time.Since(migStart).Seconds()) + mg.metrics.migCount.With(successLabel).Inc() + + migrationsPerformed++ } - mg.Logger.Info("migrations completed", "performed", migrationsPerformed, "skipped", migrationsSkipped, "duration", time.Since(start)) + metricutil.ObserveWithExemplar(ctx, mg.metrics.totalMigDuration.With(successLabel), time.Since(start).Seconds()) + + logger.Info("migrations completed", "performed", migrationsPerformed, "skipped", migrationsSkipped, "duration", time.Since(start)) // Make sure migrations are synced return mg.DBEngine.Sync2() } -func (mg *Migrator) exec(m Migration, sess *xorm.Session) error { +func (mg *Migrator) doMigration(ctx context.Context, m Migration) error { + ctx, span := tracer.Start(ctx, "Migrator.doMigration", trace.WithAttributes( + attribute.String("migration_id", m.Id()), + )) + defer span.End() + + logger := mg.Logger.FromContext(ctx) + + sql := m.SQL(mg.Dialect) + + record := MigrationLog{ + MigrationID: m.Id(), + SQL: sql, + Timestamp: time.Now(), + } + + err := mg.InTransaction(func(sess *xorm.Session) error { + // propagate context + sess = sess.Context(ctx) + + err := mg.exec(ctx, m, sess) + // if we get an sqlite busy/locked error, sleep 100ms and try again + cnt := 0 + for cnt < 3 && (errors.Is(err, sqlite3.ErrLocked) || errors.Is(err, sqlite3.ErrBusy)) { + cnt++ + logger.Debug("Database locked, sleeping then retrying", "error", err, "sql", sql) + span.AddEvent("Database locked, sleeping then retrying", + trace.WithAttributes(attribute.String("error", err.Error())), + trace.WithAttributes(attribute.String("sql", sql)), + ) + time.Sleep(100 * time.Millisecond) + err = mg.exec(ctx, m, sess) + } + + if err != nil { + logger.Error("Exec failed", "error", err, "sql", sql) + record.Error = err.Error() + if !m.SkipMigrationLog() { + if _, err := sess.Table(mg.tableName).Insert(&record); err != nil { + return err + } + } + return err + } + record.Success = true + if !m.SkipMigrationLog() { + _, err = sess.Table(mg.tableName).Insert(&record) + } + return err + }) + if err != nil { + return tracing.Errorf(span, "migration failed (id = %s): %w", m.Id(), err) + } + + span.SetStatus(codes.Ok, "") + + return nil +} + +func (mg *Migrator) exec(ctx context.Context, m Migration, sess *xorm.Session) error { + logger := mg.Logger.FromContext(ctx) + start := time.Now() - mg.Logger.Info("Executing migration", "id", m.Id()) + logger.Info("Executing migration", "id", m.Id()) condition := m.GetCondition() if condition != nil { sql, args := condition.SQL(mg.Dialect) if sql != "" { - mg.Logger.Debug("Executing migration condition SQL", "id", m.Id(), "sql", sql, "args", args) + logger.Debug("Executing migration condition SQL", "id", m.Id(), "sql", sql, "args", args) results, err := sess.SQL(sql, args...).Query() if err != nil { - mg.Logger.Error("Executing migration condition failed", "id", m.Id(), "error", err) + logger.Error("Executing migration condition failed", "id", m.Id(), "error", err) return err } if !condition.IsFulfilled(results) { - mg.Logger.Warn("Skipping migration: Already executed, but not recorded in migration log", "id", m.Id()) + logger.Warn("Skipping migration: Already executed, but not recorded in migration log", "id", m.Id()) return nil } } @@ -273,20 +376,20 @@ func (mg *Migrator) exec(m Migration, sess *xorm.Session) error { var err error if codeMigration, ok := m.(CodeMigration); ok { - mg.Logger.Debug("Executing code migration", "id", m.Id()) + logger.Debug("Executing code migration", "id", m.Id()) err = codeMigration.Exec(sess, mg) } else { sql := m.SQL(mg.Dialect) - mg.Logger.Debug("Executing sql migration", "id", m.Id(), "sql", sql) + logger.Debug("Executing sql migration", "id", m.Id(), "sql", sql) _, err = sess.Exec(sql) } if err != nil { - mg.Logger.Error("Executing migration failed", "id", m.Id(), "error", err, "duration", time.Since(start)) + logger.Error("Executing migration failed", "id", m.Id(), "error", err, "duration", time.Since(start)) return err } - mg.Logger.Info("Migration successfully executed", "id", m.Id(), "duration", time.Since(start)) + logger.Info("Migration successfully executed", "id", m.Id(), "duration", time.Since(start)) return nil } diff --git a/pkg/services/sqlstore/sqlstore.go b/pkg/services/sqlstore/sqlstore.go index 4e314f21ec7..3b150ad5e95 100644 --- a/pkg/services/sqlstore/sqlstore.go +++ b/pkg/services/sqlstore/sqlstore.go @@ -75,18 +75,6 @@ func ProvideService(cfg *setting.Cfg, } s.tracer = tracer - // initialize and register metrics wrapper around the *sql.DB - db := s.engine.DB().DB - - // register the go_sql_stats_connections_* metrics - if err := prometheus.Register(sqlstats.NewStatsCollector("grafana", db)); err != nil { - s.log.Warn("Failed to register sqlstore stats collector", "error", err) - } - // TODO: deprecate/remove these metrics - if err := prometheus.Register(newSQLStoreMetrics(db)); err != nil { - s.log.Warn("Failed to register sqlstore metrics", "error", err) - } - return s, nil } @@ -150,7 +138,14 @@ func (ss *SQLStore) Migrate(isDatabaseLockingEnabled bool) error { migrator := migrator.NewMigrator(ss.engine, ss.cfg) ss.migrations.AddMigration(migrator) - return migrator.Start(isDatabaseLockingEnabled, ss.dbCfg.MigrationLockAttemptTimeout) + if err := prometheus.Register(migrator); err != nil { + ss.log.Warn("Failed to register migrator metrics", "error", err) + } + + ctx, span := ss.tracer.Start(context.Background(), "SQLStore.Migrate") + defer span.End() + + return migrator.RunMigrations(ctx, isDatabaseLockingEnabled, ss.dbCfg.MigrationLockAttemptTimeout) } // Reset resets database state. @@ -327,6 +322,19 @@ func (ss *SQLStore) initEngine(engine *xorm.Engine) error { engine.ShowExecTime(true) } + // initialize and register metrics wrapper around the *sql.DB + db := engine.DB().DB + + // register the go_sql_stats_connections_* metrics + if err := prometheus.Register(sqlstats.NewStatsCollector("grafana", db)); err != nil { + ss.log.Warn("Failed to register sqlstore stats collector", "error", err) + } + + // TODO: deprecate/remove these metrics + if err := prometheus.Register(newSQLStoreMetrics(db)); err != nil { + ss.log.Warn("Failed to register sqlstore metrics", "error", err) + } + ss.engine = engine return nil }