mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
chore(metrics): Add metrics & traces to DB migration (#97181)
Signed-off-by: Dave Henderson <dave.henderson@grafana.com>
This commit is contained in:
parent
0025876659
commit
ced7a84f8b
@ -1,11 +1,14 @@
|
||||
package metricutil
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
// SanitizeLabelName removes all invalid chars from the label name.
|
||||
@ -84,3 +87,35 @@ func copyLabelSet(ls prometheus.Labels) prometheus.Labels {
|
||||
}
|
||||
return newLs
|
||||
}
|
||||
|
||||
// wraps prometheus.NewHistogram, adding a central native histogram config
|
||||
func NewHistogramVec(opts prometheus.HistogramOpts, labels []string) *prometheus.HistogramVec {
|
||||
if opts.NativeHistogramBucketFactor == 0 {
|
||||
// Enable native histograms, with the factor suggested in the docs
|
||||
opts.NativeHistogramBucketFactor = 1.1
|
||||
}
|
||||
if opts.NativeHistogramMaxBucketNumber == 0 {
|
||||
// OTel default
|
||||
opts.NativeHistogramMaxBucketNumber = 160
|
||||
}
|
||||
if opts.NativeHistogramMinResetDuration == 0 {
|
||||
// Reset buckets every hour by default - override this if you want to
|
||||
// keep buckets around for longer
|
||||
opts.NativeHistogramMinResetDuration = 1 * time.Hour
|
||||
}
|
||||
|
||||
return prometheus.NewHistogramVec(opts, labels)
|
||||
}
|
||||
|
||||
func ObserveWithExemplar(ctx context.Context, histogram prometheus.Observer, value float64) {
|
||||
traceID := trace.SpanContextFromContext(ctx).TraceID()
|
||||
if traceID.IsValid() {
|
||||
histogram.(prometheus.ExemplarObserver).ObserveWithExemplar(
|
||||
value,
|
||||
prometheus.Labels{"traceID": traceID.String()},
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
histogram.Observe(value)
|
||||
}
|
||||
|
@ -5,8 +5,9 @@ import (
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
)
|
||||
|
||||
type TracingConfig struct {
|
||||
|
@ -1,6 +1,7 @@
|
||||
package migrator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
@ -9,10 +10,17 @@ import (
|
||||
"github.com/golang-migrate/migrate/v4/database"
|
||||
_ "github.com/lib/pq"
|
||||
"github.com/mattn/go-sqlite3"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/atomic"
|
||||
"xorm.io/xorm"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/metrics/metricutil"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
)
|
||||
|
||||
@ -21,6 +29,8 @@ var (
|
||||
ErrMigratorIsUnlocked = fmt.Errorf("migrator is unlocked")
|
||||
)
|
||||
|
||||
var tracer = otel.Tracer("github.com/grafana/grafana/pkg/services/sqlstore/migrator")
|
||||
|
||||
type Migrator struct {
|
||||
DBEngine *xorm.Engine
|
||||
Dialect Dialect
|
||||
@ -31,6 +41,8 @@ type Migrator struct {
|
||||
isLocked atomic.Bool
|
||||
logMap map[string]MigrationLog
|
||||
tableName string
|
||||
|
||||
metrics migratorMetrics
|
||||
}
|
||||
|
||||
type MigrationLog struct {
|
||||
@ -42,6 +54,12 @@ type MigrationLog struct {
|
||||
Timestamp time.Time
|
||||
}
|
||||
|
||||
type migratorMetrics struct {
|
||||
migCount *prometheus.CounterVec
|
||||
migDuration *prometheus.HistogramVec
|
||||
totalMigDuration *prometheus.HistogramVec
|
||||
}
|
||||
|
||||
func NewMigrator(engine *xorm.Engine, cfg *setting.Cfg) *Migrator {
|
||||
return NewScopedMigrator(engine, cfg, "")
|
||||
}
|
||||
@ -54,6 +72,26 @@ func NewScopedMigrator(engine *xorm.Engine, cfg *setting.Cfg, scope string) *Mig
|
||||
migrations: make([]Migration, 0),
|
||||
migrationIds: make(map[string]struct{}),
|
||||
Dialect: NewDialect(engine.DriverName()),
|
||||
metrics: migratorMetrics{
|
||||
migCount: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "grafana_database",
|
||||
Subsystem: scope,
|
||||
Name: "migrations_total",
|
||||
Help: "Total number of SQL migrations",
|
||||
}, []string{"success"}),
|
||||
migDuration: metricutil.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Namespace: "grafana_database",
|
||||
Subsystem: scope,
|
||||
Name: "migration_duration_seconds",
|
||||
Help: "Individual SQL migration duration in seconds",
|
||||
}, []string{"success"}),
|
||||
totalMigDuration: metricutil.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Namespace: "grafana_database",
|
||||
Subsystem: scope,
|
||||
Name: "all_migrations_duration_seconds",
|
||||
Help: "Duration of the entire SQL migration process in seconds",
|
||||
}, []string{"success"}),
|
||||
},
|
||||
}
|
||||
if scope == "" {
|
||||
mg.tableName = "migration_log"
|
||||
@ -65,6 +103,20 @@ func NewScopedMigrator(engine *xorm.Engine, cfg *setting.Cfg, scope string) *Mig
|
||||
return mg
|
||||
}
|
||||
|
||||
// Collect implements Prometheus.Collector.
|
||||
func (mg *Migrator) Collect(ch chan<- prometheus.Metric) {
|
||||
mg.metrics.migCount.Collect(ch)
|
||||
mg.metrics.migDuration.Collect(ch)
|
||||
mg.metrics.totalMigDuration.Collect(ch)
|
||||
}
|
||||
|
||||
// Describe implements Prometheus.Collector.
|
||||
func (mg *Migrator) Describe(ch chan<- *prometheus.Desc) {
|
||||
mg.metrics.migCount.Describe(ch)
|
||||
mg.metrics.migDuration.Describe(ch)
|
||||
mg.metrics.totalMigDuration.Describe(ch)
|
||||
}
|
||||
|
||||
// AddCreateMigration adds the initial migration log table -- this should likely be
|
||||
// automatic and first, but enough tests exists that do not expect that we can keep it explicit
|
||||
func (mg *Migrator) AddCreateMigration() {
|
||||
@ -139,9 +191,14 @@ func (mg *Migrator) RemoveMigrationLogs(migrationsIDs ...string) {
|
||||
}
|
||||
}
|
||||
|
||||
// soft-deprecated: use RunMigrations instead (will be fully deprecated later)
|
||||
func (mg *Migrator) Start(isDatabaseLockingEnabled bool, lockAttemptTimeout int) (err error) {
|
||||
return mg.RunMigrations(context.Background(), isDatabaseLockingEnabled, lockAttemptTimeout)
|
||||
}
|
||||
|
||||
func (mg *Migrator) RunMigrations(ctx context.Context, isDatabaseLockingEnabled bool, lockAttemptTimeout int) (err error) {
|
||||
if !isDatabaseLockingEnabled {
|
||||
return mg.run()
|
||||
return mg.run(ctx)
|
||||
}
|
||||
|
||||
dbName, err := mg.Dialect.GetDBName(mg.DBEngine.DataSourceName())
|
||||
@ -153,8 +210,10 @@ func (mg *Migrator) Start(isDatabaseLockingEnabled bool, lockAttemptTimeout int)
|
||||
return err
|
||||
}
|
||||
|
||||
logger := mg.Logger.FromContext(ctx)
|
||||
|
||||
return mg.InTransaction(func(sess *xorm.Session) error {
|
||||
mg.Logger.Info("Locking database")
|
||||
logger.Info("Locking database")
|
||||
lockCfg := LockCfg{
|
||||
Session: sess,
|
||||
Key: key,
|
||||
@ -162,110 +221,154 @@ func (mg *Migrator) Start(isDatabaseLockingEnabled bool, lockAttemptTimeout int)
|
||||
}
|
||||
|
||||
if err := casRestoreOnErr(&mg.isLocked, false, true, ErrMigratorIsLocked, mg.Dialect.Lock, lockCfg); err != nil {
|
||||
mg.Logger.Error("Failed to lock database", "error", err)
|
||||
logger.Error("Failed to lock database", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
mg.Logger.Info("Unlocking database")
|
||||
logger.Info("Unlocking database")
|
||||
unlockErr := casRestoreOnErr(&mg.isLocked, true, false, ErrMigratorIsUnlocked, mg.Dialect.Unlock, lockCfg)
|
||||
if unlockErr != nil {
|
||||
mg.Logger.Error("Failed to unlock database", "error", unlockErr)
|
||||
logger.Error("Failed to unlock database", "error", unlockErr)
|
||||
}
|
||||
}()
|
||||
|
||||
// migration will run inside a nested transaction
|
||||
return mg.run()
|
||||
return mg.run(ctx)
|
||||
})
|
||||
}
|
||||
|
||||
func (mg *Migrator) run() (err error) {
|
||||
mg.Logger.Info("Starting DB migrations")
|
||||
func (mg *Migrator) run(ctx context.Context) (err error) {
|
||||
ctx, span := tracer.Start(ctx, "Migrator.run")
|
||||
defer span.End()
|
||||
|
||||
logger := mg.Logger.FromContext(ctx)
|
||||
|
||||
logger.Info("Starting DB migrations")
|
||||
|
||||
_, err = mg.GetMigrationLog()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
successLabel := prometheus.Labels{"success": "true"}
|
||||
|
||||
migrationsPerformed := 0
|
||||
migrationsSkipped := 0
|
||||
start := time.Now()
|
||||
for _, m := range mg.migrations {
|
||||
m := m
|
||||
_, exists := mg.logMap[m.Id()]
|
||||
if exists {
|
||||
mg.Logger.Debug("Skipping migration: Already executed", "id", m.Id())
|
||||
logger.Debug("Skipping migration: Already executed", "id", m.Id())
|
||||
span.AddEvent("Skipping migration: Already executed",
|
||||
trace.WithAttributes(attribute.String("migration_id", m.Id())),
|
||||
)
|
||||
migrationsSkipped++
|
||||
continue
|
||||
}
|
||||
|
||||
sql := m.SQL(mg.Dialect)
|
||||
migStart := time.Now()
|
||||
|
||||
record := MigrationLog{
|
||||
MigrationID: m.Id(),
|
||||
SQL: sql,
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
err := mg.InTransaction(func(sess *xorm.Session) error {
|
||||
err := mg.exec(m, sess)
|
||||
// if we get an sqlite busy/locked error, sleep 100ms and try again
|
||||
cnt := 0
|
||||
for cnt < 3 && (errors.Is(err, sqlite3.ErrLocked) || errors.Is(err, sqlite3.ErrBusy)) {
|
||||
cnt++
|
||||
mg.Logger.Debug("Database locked, sleeping then retrying", "error", err, "sql", sql)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
err = mg.exec(m, sess)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
mg.Logger.Error("Exec failed", "error", err, "sql", sql)
|
||||
record.Error = err.Error()
|
||||
if !m.SkipMigrationLog() {
|
||||
if _, err := sess.Table(mg.tableName).Insert(&record); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
record.Success = true
|
||||
if !m.SkipMigrationLog() {
|
||||
_, err = sess.Table(mg.tableName).Insert(&record)
|
||||
}
|
||||
if err == nil {
|
||||
migrationsPerformed++
|
||||
}
|
||||
if err := mg.doMigration(ctx, m); err != nil {
|
||||
failLabel := prometheus.Labels{"success": "false"}
|
||||
metricutil.ObserveWithExemplar(ctx, mg.metrics.migDuration.With(failLabel), time.Since(migStart).Seconds())
|
||||
mg.metrics.migCount.With(failLabel).Inc()
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("%v: %w", fmt.Sprintf("migration failed (id = %s)", m.Id()), err)
|
||||
}
|
||||
|
||||
metricutil.ObserveWithExemplar(ctx, mg.metrics.migDuration.With(successLabel), time.Since(migStart).Seconds())
|
||||
mg.metrics.migCount.With(successLabel).Inc()
|
||||
|
||||
migrationsPerformed++
|
||||
}
|
||||
|
||||
mg.Logger.Info("migrations completed", "performed", migrationsPerformed, "skipped", migrationsSkipped, "duration", time.Since(start))
|
||||
metricutil.ObserveWithExemplar(ctx, mg.metrics.totalMigDuration.With(successLabel), time.Since(start).Seconds())
|
||||
|
||||
logger.Info("migrations completed", "performed", migrationsPerformed, "skipped", migrationsSkipped, "duration", time.Since(start))
|
||||
|
||||
// Make sure migrations are synced
|
||||
return mg.DBEngine.Sync2()
|
||||
}
|
||||
|
||||
func (mg *Migrator) exec(m Migration, sess *xorm.Session) error {
|
||||
func (mg *Migrator) doMigration(ctx context.Context, m Migration) error {
|
||||
ctx, span := tracer.Start(ctx, "Migrator.doMigration", trace.WithAttributes(
|
||||
attribute.String("migration_id", m.Id()),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
logger := mg.Logger.FromContext(ctx)
|
||||
|
||||
sql := m.SQL(mg.Dialect)
|
||||
|
||||
record := MigrationLog{
|
||||
MigrationID: m.Id(),
|
||||
SQL: sql,
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
err := mg.InTransaction(func(sess *xorm.Session) error {
|
||||
// propagate context
|
||||
sess = sess.Context(ctx)
|
||||
|
||||
err := mg.exec(ctx, m, sess)
|
||||
// if we get an sqlite busy/locked error, sleep 100ms and try again
|
||||
cnt := 0
|
||||
for cnt < 3 && (errors.Is(err, sqlite3.ErrLocked) || errors.Is(err, sqlite3.ErrBusy)) {
|
||||
cnt++
|
||||
logger.Debug("Database locked, sleeping then retrying", "error", err, "sql", sql)
|
||||
span.AddEvent("Database locked, sleeping then retrying",
|
||||
trace.WithAttributes(attribute.String("error", err.Error())),
|
||||
trace.WithAttributes(attribute.String("sql", sql)),
|
||||
)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
err = mg.exec(ctx, m, sess)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
logger.Error("Exec failed", "error", err, "sql", sql)
|
||||
record.Error = err.Error()
|
||||
if !m.SkipMigrationLog() {
|
||||
if _, err := sess.Table(mg.tableName).Insert(&record); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
record.Success = true
|
||||
if !m.SkipMigrationLog() {
|
||||
_, err = sess.Table(mg.tableName).Insert(&record)
|
||||
}
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return tracing.Errorf(span, "migration failed (id = %s): %w", m.Id(), err)
|
||||
}
|
||||
|
||||
span.SetStatus(codes.Ok, "")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mg *Migrator) exec(ctx context.Context, m Migration, sess *xorm.Session) error {
|
||||
logger := mg.Logger.FromContext(ctx)
|
||||
|
||||
start := time.Now()
|
||||
mg.Logger.Info("Executing migration", "id", m.Id())
|
||||
logger.Info("Executing migration", "id", m.Id())
|
||||
|
||||
condition := m.GetCondition()
|
||||
if condition != nil {
|
||||
sql, args := condition.SQL(mg.Dialect)
|
||||
|
||||
if sql != "" {
|
||||
mg.Logger.Debug("Executing migration condition SQL", "id", m.Id(), "sql", sql, "args", args)
|
||||
logger.Debug("Executing migration condition SQL", "id", m.Id(), "sql", sql, "args", args)
|
||||
results, err := sess.SQL(sql, args...).Query()
|
||||
if err != nil {
|
||||
mg.Logger.Error("Executing migration condition failed", "id", m.Id(), "error", err)
|
||||
logger.Error("Executing migration condition failed", "id", m.Id(), "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if !condition.IsFulfilled(results) {
|
||||
mg.Logger.Warn("Skipping migration: Already executed, but not recorded in migration log", "id", m.Id())
|
||||
logger.Warn("Skipping migration: Already executed, but not recorded in migration log", "id", m.Id())
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@ -273,20 +376,20 @@ func (mg *Migrator) exec(m Migration, sess *xorm.Session) error {
|
||||
|
||||
var err error
|
||||
if codeMigration, ok := m.(CodeMigration); ok {
|
||||
mg.Logger.Debug("Executing code migration", "id", m.Id())
|
||||
logger.Debug("Executing code migration", "id", m.Id())
|
||||
err = codeMigration.Exec(sess, mg)
|
||||
} else {
|
||||
sql := m.SQL(mg.Dialect)
|
||||
mg.Logger.Debug("Executing sql migration", "id", m.Id(), "sql", sql)
|
||||
logger.Debug("Executing sql migration", "id", m.Id(), "sql", sql)
|
||||
_, err = sess.Exec(sql)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
mg.Logger.Error("Executing migration failed", "id", m.Id(), "error", err, "duration", time.Since(start))
|
||||
logger.Error("Executing migration failed", "id", m.Id(), "error", err, "duration", time.Since(start))
|
||||
return err
|
||||
}
|
||||
|
||||
mg.Logger.Info("Migration successfully executed", "id", m.Id(), "duration", time.Since(start))
|
||||
logger.Info("Migration successfully executed", "id", m.Id(), "duration", time.Since(start))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -75,18 +75,6 @@ func ProvideService(cfg *setting.Cfg,
|
||||
}
|
||||
s.tracer = tracer
|
||||
|
||||
// initialize and register metrics wrapper around the *sql.DB
|
||||
db := s.engine.DB().DB
|
||||
|
||||
// register the go_sql_stats_connections_* metrics
|
||||
if err := prometheus.Register(sqlstats.NewStatsCollector("grafana", db)); err != nil {
|
||||
s.log.Warn("Failed to register sqlstore stats collector", "error", err)
|
||||
}
|
||||
// TODO: deprecate/remove these metrics
|
||||
if err := prometheus.Register(newSQLStoreMetrics(db)); err != nil {
|
||||
s.log.Warn("Failed to register sqlstore metrics", "error", err)
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
@ -150,7 +138,14 @@ func (ss *SQLStore) Migrate(isDatabaseLockingEnabled bool) error {
|
||||
migrator := migrator.NewMigrator(ss.engine, ss.cfg)
|
||||
ss.migrations.AddMigration(migrator)
|
||||
|
||||
return migrator.Start(isDatabaseLockingEnabled, ss.dbCfg.MigrationLockAttemptTimeout)
|
||||
if err := prometheus.Register(migrator); err != nil {
|
||||
ss.log.Warn("Failed to register migrator metrics", "error", err)
|
||||
}
|
||||
|
||||
ctx, span := ss.tracer.Start(context.Background(), "SQLStore.Migrate")
|
||||
defer span.End()
|
||||
|
||||
return migrator.RunMigrations(ctx, isDatabaseLockingEnabled, ss.dbCfg.MigrationLockAttemptTimeout)
|
||||
}
|
||||
|
||||
// Reset resets database state.
|
||||
@ -327,6 +322,19 @@ func (ss *SQLStore) initEngine(engine *xorm.Engine) error {
|
||||
engine.ShowExecTime(true)
|
||||
}
|
||||
|
||||
// initialize and register metrics wrapper around the *sql.DB
|
||||
db := engine.DB().DB
|
||||
|
||||
// register the go_sql_stats_connections_* metrics
|
||||
if err := prometheus.Register(sqlstats.NewStatsCollector("grafana", db)); err != nil {
|
||||
ss.log.Warn("Failed to register sqlstore stats collector", "error", err)
|
||||
}
|
||||
|
||||
// TODO: deprecate/remove these metrics
|
||||
if err := prometheus.Register(newSQLStoreMetrics(db)); err != nil {
|
||||
ss.log.Warn("Failed to register sqlstore metrics", "error", err)
|
||||
}
|
||||
|
||||
ss.engine = engine
|
||||
return nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user