diff --git a/conf/defaults.ini b/conf/defaults.ini index 3e9c99f97ad..a852dee4e75 100644 --- a/conf/defaults.ini +++ b/conf/defaults.ini @@ -122,6 +122,9 @@ path = grafana.db # For "sqlite3" only. cache mode setting used for connecting to the database cache_mode = private +# For "mysql" only if lockingMigration feature toggle is set. How many seconds to wait before failing to lock the database for the migrations, default is 0. +locking_attempt_timeout_sec = 0 + #################################### Cache server ############################# [remote_cache] # Either "redis", "memcached" or "database" default is "database" diff --git a/conf/sample.ini b/conf/sample.ini index cc908787981..710a8ea4810 100644 --- a/conf/sample.ini +++ b/conf/sample.ini @@ -123,6 +123,9 @@ # For "sqlite3" only. cache mode setting used for connecting to the database. (private, shared) ;cache_mode = private +# For "mysql" only if lockingMigration feature toggle is set. How many seconds to wait before failing to lock the database for the migrations, default is 0. +;locking_attempt_timeout_sec = 0 + ################################### Data sources ######################### [datasources] # Upper limit of data sources that Grafana will return. This limit is a temporary configuration and it will be deprecated when pagination will be introduced on the list data sources API. diff --git a/docs/sources/administration/configuration.md b/docs/sources/administration/configuration.md index b4cb40afba2..97d454d71ad 100644 --- a/docs/sources/administration/configuration.md +++ b/docs/sources/administration/configuration.md @@ -314,6 +314,10 @@ The maximum number of open connections to the database. Sets the maximum amount of time a connection may be reused. The default is 14400 (which means 14400 seconds or 4 hours). For MySQL, this setting should be shorter than the [`wait_timeout`](https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html#sysvar_wait_timeout) variable. +### locking_attempt_timeout_sec + +For "mysql", if `lockingMigration` feature toggle is set, specify the time (in seconds) to wait before failing to lock the database for the migrations. Default is 0. + ### log_queries Set to `true` to log the sql calls and execution times. diff --git a/go.mod b/go.mod index a234cc23020..4e9cc01470b 100644 --- a/go.mod +++ b/go.mod @@ -77,7 +77,7 @@ require ( github.com/ohler55/ojg v1.12.9 github.com/opentracing/opentracing-go v1.2.0 github.com/patrickmn/go-cache v2.1.0+incompatible - github.com/pkg/browser v0.0.0-20210904010418-6d279e18f982 // indirect + github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect github.com/pkg/errors v0.9.1 github.com/prometheus/alertmanager v0.23.1-0.20211116083607-e2a10119aaf7 github.com/prometheus/client_golang v1.12.1 @@ -105,7 +105,7 @@ require ( go.opentelemetry.io/otel/trace v1.2.0 golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e golang.org/x/exp v0.0.0-20210220032938-85be41e4509f // indirect - golang.org/x/net v0.0.0-20210903162142-ad29c8ab022f + golang.org/x/net v0.0.0-20211013171255-e13a2654a71e golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac @@ -239,9 +239,9 @@ require ( github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/weaveworks/promrus v1.2.0 // indirect github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 // indirect - go.mongodb.org/mongo-driver v1.5.2 // indirect + go.mongodb.org/mongo-driver v1.7.0 // indirect go.opencensus.io v0.23.0 // indirect - go.uber.org/atomic v1.9.0 // indirect + go.uber.org/atomic v1.9.0 go.uber.org/goleak v1.1.10 // indirect golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect @@ -252,7 +252,10 @@ require ( gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d // indirect ) -require cloud.google.com/go/kms v1.1.0 +require ( + cloud.google.com/go/kms v1.1.0 + github.com/golang-migrate/migrate/v4 v4.7.0 +) require ( github.com/Azure/go-autorest/autorest/adal v0.9.15 // indirect diff --git a/go.sum b/go.sum index 5c4d1ce5899..665845a11d3 100644 --- a/go.sum +++ b/go.sum @@ -1044,6 +1044,7 @@ github.com/gogo/status v1.1.0 h1:+eIkrewn5q6b30y+g/BJINVVdi2xH7je5MPJ3ZPK3JA= github.com/gogo/status v1.1.0/go.mod h1:BFv9nrluPLmrS0EmGVvLaPNmRosr9KapBYd5/hpY1WM= github.com/golang-jwt/jwt/v4 v4.0.0 h1:RAqyYixv1p7uEnocuy8P1nru5wprCh/MH2BIlW5z5/o= github.com/golang-jwt/jwt/v4 v4.0.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg= +github.com/golang-migrate/migrate/v4 v4.7.0 h1:gONcHxHApDTKXDyLH/H97gEHmpu1zcnnbAaq2zgrPrs= github.com/golang-migrate/migrate/v4 v4.7.0/go.mod h1:Qvut3N4xKWjoH3sokBccML6WyHSnggXm/DvMMnTsQIc= github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY= github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= @@ -1959,8 +1960,8 @@ github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuR github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA= -github.com/pkg/browser v0.0.0-20210904010418-6d279e18f982 h1:TdFv+3Gr3GaghJ/o80aulO4ian7GHGWMdLBXoLZH1Is= -github.com/pkg/browser v0.0.0-20210904010418-6d279e18f982/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI= +github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 h1:KoWmjvw+nsYOo29YJK9vDA65RGE3NrOnUtO7a+RF9HU= +github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1-0.20171018195549-f15c970de5b7/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -2474,8 +2475,9 @@ go.mongodb.org/mongo-driver v1.4.3/go.mod h1:WcMNYLx/IlOxLe6JRJiv2uXuCz6zBLndR4S go.mongodb.org/mongo-driver v1.4.4/go.mod h1:WcMNYLx/IlOxLe6JRJiv2uXuCz6zBLndR4SoGjYphSc= go.mongodb.org/mongo-driver v1.4.6/go.mod h1:WcMNYLx/IlOxLe6JRJiv2uXuCz6zBLndR4SoGjYphSc= go.mongodb.org/mongo-driver v1.5.1/go.mod h1:gRXCHX4Jo7J0IJ1oDQyUxF7jfy19UfxniMS4xxMmUqw= -go.mongodb.org/mongo-driver v1.5.2 h1:AsxOLoJTgP6YNM0fXWw4OjdluYmWzQYp+lFJL7xu9fU= go.mongodb.org/mongo-driver v1.5.2/go.mod h1:gRXCHX4Jo7J0IJ1oDQyUxF7jfy19UfxniMS4xxMmUqw= +go.mongodb.org/mongo-driver v1.7.0 h1:hHrvOBWlWB2c7+8Gh/Xi5jj82AgidK/t7KVXBZ+IyUA= +go.mongodb.org/mongo-driver v1.7.0/go.mod h1:Q4oFMbo1+MSNqICAdYMlC/zSTrwCogR4R8NzkI+yfU8= go.mozilla.org/pkcs7 v0.0.0-20200128120323-432b2356ecb1/go.mod h1:SNgMg+EgDFwmvSmLRTNKC5fegJjB7v23qTQ0XLGUNHk= go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= @@ -2731,8 +2733,9 @@ golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20210903162142-ad29c8ab022f h1:w6wWR0H+nyVpbSAQbzVEIACVyr/h8l/BEkY6Sokc7Eg= golang.org/x/net v0.0.0-20210903162142-ad29c8ab022f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211013171255-e13a2654a71e h1:Xj+JO91noE97IN6F/7WZxzC5QE6yENAQPrwIYhW3bsA= +golang.org/x/net v0.0.0-20211013171255-e13a2654a71e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181106182150-f42d05182288/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= diff --git a/packages/grafana-data/src/types/featureToggles.gen.ts b/packages/grafana-data/src/types/featureToggles.gen.ts index 485fa5229ee..6e71b8ff045 100644 --- a/packages/grafana-data/src/types/featureToggles.gen.ts +++ b/packages/grafana-data/src/types/featureToggles.gen.ts @@ -38,4 +38,5 @@ export interface FeatureToggles { validatedQueries?: boolean; swaggerUi?: boolean; featureHighlights?: boolean; + migrationLocking?: boolean; } diff --git a/pkg/cmd/grafana-cli/commands/commands.go b/pkg/cmd/grafana-cli/commands/commands.go index 55f381ebec3..809dd35adf3 100644 --- a/pkg/cmd/grafana-cli/commands/commands.go +++ b/pkg/cmd/grafana-cli/commands/commands.go @@ -55,6 +55,7 @@ func runDbCommand(command func(commandLine utils.CommandLine, sqlStore *sqlstore if err != nil { return errutil.Wrap("failed to initialize tracer service", err) } + sqlStore, err := sqlstore.ProvideService(cfg, nil, bus.GetBus(), &migrations.OSSMigrations{}, tracer) if err != nil { return errutil.Wrap("failed to initialize SQL store", err) diff --git a/pkg/services/featuremgmt/registry.go b/pkg/services/featuremgmt/registry.go index c9552f0cc9f..28312f0aaf4 100644 --- a/pkg/services/featuremgmt/registry.go +++ b/pkg/services/featuremgmt/registry.go @@ -121,5 +121,10 @@ var ( Description: "Highlight Enterprise features", State: FeatureStateStable, }, + { + Name: "migrationLocking", + Description: "Lock database during migrations", + State: FeatureStateBeta, + }, } ) diff --git a/pkg/services/featuremgmt/toggles_gen.go b/pkg/services/featuremgmt/toggles_gen.go index cd8f29c34a2..8accb561976 100644 --- a/pkg/services/featuremgmt/toggles_gen.go +++ b/pkg/services/featuremgmt/toggles_gen.go @@ -90,4 +90,8 @@ const ( // FlagFeatureHighlights // Highlight Enterprise features FlagFeatureHighlights = "featureHighlights" + + // FlagMigrationLocking + // Lock database during migrations + FlagMigrationLocking = "migrationLocking" ) diff --git a/pkg/services/sqlstore/migrations/accesscontrol/test/ac_test.go b/pkg/services/sqlstore/migrations/accesscontrol/test/ac_test.go index b79ba4c3d61..715161380d5 100644 --- a/pkg/services/sqlstore/migrations/accesscontrol/test/ac_test.go +++ b/pkg/services/sqlstore/migrations/accesscontrol/test/ac_test.go @@ -165,7 +165,7 @@ func TestMigrations(t *testing.T) { acmigrator := migrator.NewMigrator(x, tc.config) acmig.AddTeamMembershipMigrations(acmigrator) - errRunningMig := acmigrator.Start() + errRunningMig := acmigrator.Start(false, 0) require.NoError(t, errRunningMig) for _, user := range users { @@ -221,7 +221,7 @@ func setupTestDB(t *testing.T) *xorm.Engine { migrations := &migrations.OSSMigrations{} migrations.AddMigration(mg) - err = mg.Start() + err = mg.Start(false, 0) require.NoError(t, err) return x diff --git a/pkg/services/sqlstore/migrations/migrations_test.go b/pkg/services/sqlstore/migrations/migrations_test.go index 90c7918333b..3ee4d111954 100644 --- a/pkg/services/sqlstore/migrations/migrations_test.go +++ b/pkg/services/sqlstore/migrations/migrations_test.go @@ -1,10 +1,16 @@ package migrations import ( + "errors" "fmt" + "os" "strings" + "sync" + "sync/atomic" "testing" + "github.com/go-sql-driver/mysql" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "xorm.io/xorm" @@ -32,7 +38,7 @@ func TestMigrations(t *testing.T) { migrations.AddMigration(mg) expectedMigrations := mg.GetMigrationIDs(true) - err = mg.Start() + err = mg.Start(false, 0) require.NoError(t, err) has, err := x.SQL(query).Get(&result) @@ -44,7 +50,7 @@ func TestMigrations(t *testing.T) { mg = NewMigrator(x, &setting.Cfg{}) migrations.AddMigration(mg) - err = mg.Start() + err = mg.Start(false, 0) require.NoError(t, err) has, err = x.SQL(query).Get(&result) @@ -53,6 +59,179 @@ func TestMigrations(t *testing.T) { checkStepsAndDatabaseMatch(t, mg, expectedMigrations) } +func TestMigrationLock(t *testing.T) { + dbType := getDBType() + if dbType == SQLite { + t.Skip() + } + + testDB := getTestDB(dbType) + + x, err := xorm.NewEngine(testDB.DriverName, testDB.ConnStr) + require.NoError(t, err) + + dialect := NewDialect(x) + + sess := x.NewSession() + t.Cleanup(func() { + sess.Close() + }) + + cfg := LockCfg{Session: sess} + + t.Run("obtaining lock should succeed", func(t *testing.T) { + err := dialect.Lock(cfg) + require.NoError(t, err) + + t.Run("releasing previously obtained lock should succeed", func(t *testing.T) { + err := dialect.Unlock(cfg) + require.NoError(t, err) + + t.Run("releasing already released lock should fail", func(t *testing.T) { + err := dialect.Unlock(cfg) + require.Error(t, err) + assert.ErrorIs(t, err, ErrReleaseLockDB) + }) + }) + }) + + t.Run("obtaining lock twice should succeed", func(t *testing.T) { + err = dialect.Lock(cfg) + require.NoError(t, err) + + err = dialect.Lock(cfg) + require.NoError(t, err) + + t.Cleanup(func() { + err := dialect.Unlock(cfg) + require.NoError(t, err) + + err = dialect.Unlock(cfg) + require.NoError(t, err) + }) + }) + + t.Run("obtaining same lock from another session should fail", func(t *testing.T) { + x2, err := xorm.NewEngine(testDB.DriverName, testDB.ConnStr) + require.NoError(t, err) + sess2 := x2.NewSession() + + d2 := NewDialect(x2) + + err = dialect.Lock(cfg) + require.NoError(t, err) + + err = d2.Lock(LockCfg{Session: sess2}) + require.Error(t, err) + assert.ErrorIs(t, err, ErrLockDB) + + t.Cleanup(func() { + err := dialect.Unlock(cfg) + require.NoError(t, err) + }) + }) + + t.Run("obtaining lock for a another database should succeed", func(t *testing.T) { + err := dialect.Lock(cfg) + require.NoError(t, err) + + x, err := xorm.NewEngine(testDB.DriverName, replaceDBName(t, testDB.ConnStr, dbType)) + require.NoError(t, err) + + d := NewDialect(x) + err = d.Lock(cfg) + require.NoError(t, err) + + t.Cleanup(func() { + err := dialect.Unlock(cfg) + require.NoError(t, err) + + err = d.Unlock(cfg) + require.NoError(t, err) + }) + }) +} + +func TestMigratorLocking(t *testing.T) { + dbType := getDBType() + testDB := getTestDB(dbType) + + x, err := xorm.NewEngine(testDB.DriverName, testDB.ConnStr) + require.NoError(t, err) + + err = NewDialect(x).CleanDB() + require.NoError(t, err) + + mg := NewMigrator(x, &setting.Cfg{}) + migrations := &OSSMigrations{} + migrations.AddMigration(mg) + + var errorNum int64 + t.Run("when concurrent migrations for the same migrator occur, the second one should fail", func(t *testing.T) { + for i := 0; i < 2; i++ { + i := i // capture i variable + t.Run(fmt.Sprintf("run migration %d", i), func(t *testing.T) { + t.Parallel() + err = mg.Start(true, 0) + if err != nil { + if errors.Is(err, ErrMigratorIsLocked) { + atomic.AddInt64(&errorNum, 1) + } + } + }) + } + }) + assert.Equal(t, int64(1), errorNum) +} + +func TestDatabaseLocking(t *testing.T) { + dbType := getDBType() + // skip for SQLite since there is no database locking (only migrator locking) + if dbType == SQLite { + t.Skip() + } + + testDB := getTestDB(dbType) + + x, err := xorm.NewEngine(testDB.DriverName, testDB.ConnStr) + require.NoError(t, err) + + err = NewDialect(x).CleanDB() + require.NoError(t, err) + + mg1 := NewMigrator(x, &setting.Cfg{}) + migrations := &OSSMigrations{} + migrations.AddMigration(mg1) + reg := registry{ + migrators: make(map[int]*Migrator, 2), + } + reg.set(0, mg1) + + mg2 := NewMigrator(x, &setting.Cfg{}) + migrations.AddMigration(mg2) + reg.set(1, mg2) + + var errorNum int64 + t.Run("when concurrent migrations occur for different migrators occur, the second one should fail", func(t *testing.T) { + for i := 0; i < 2; i++ { + i := i // capture i variable + t.Run(fmt.Sprintf("run migration %d", i), func(t *testing.T) { + mg, err := reg.get(i) + require.NoError(t, err) + t.Parallel() + err = mg.Start(true, 0) + if err != nil { + assert.ErrorIs(t, err, ErrLockDB) + if errors.Is(err, ErrLockDB) { + atomic.AddInt64(&errorNum, 1) + } + } + }) + } + }) + assert.Equal(t, int64(1), errorNum) +} + func checkStepsAndDatabaseMatch(t *testing.T, mg *Migrator, expected []string) { t.Helper() log, err := mg.GetMigrationLog() @@ -91,3 +270,61 @@ func checkStepsAndDatabaseMatch(t *testing.T, mg *Migrator, expected []string) { } require.Failf(t, "the number of migrations does not match log in database", msg) } + +func getDBType() string { + dbType := SQLite + + // environment variable present for test db? + if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present { + dbType = db + } + return dbType +} + +func getTestDB(dbType string) sqlutil.TestDB { + switch dbType { + case "mysql": + return sqlutil.MySQLTestDB() + case "postgres": + return sqlutil.PostgresTestDB() + default: + return sqlutil.SQLite3TestDB() + } +} + +func replaceDBName(t *testing.T, connStr, dbType string) string { + switch dbType { + case "mysql": + cfg, err := mysql.ParseDSN(connStr) + require.NoError(t, err) + cfg.DBName = "grafana_ds_tests" + return cfg.FormatDSN() + case "postgres": + return strings.Replace(connStr, "dbname=grafanatest", "dbname=grafanadstest", 1) + default: + return connStr + } +} + +type registry struct { + mu sync.Mutex + migrators map[int]*Migrator +} + +func (r *registry) get(i int) (*Migrator, error) { + r.mu.Lock() + defer r.mu.Unlock() + + m, ok := r.migrators[i] + if !ok { + return nil, fmt.Errorf("invalid index: %d", i) + } + return m, nil +} + +func (r *registry) set(i int, mg *Migrator) { + r.mu.Lock() + defer r.mu.Unlock() + + r.migrators[i] = mg +} diff --git a/pkg/services/sqlstore/migrator/dialect.go b/pkg/services/sqlstore/migrator/dialect.go index e151d25f083..20cb7b4156f 100644 --- a/pkg/services/sqlstore/migrator/dialect.go +++ b/pkg/services/sqlstore/migrator/dialect.go @@ -7,6 +7,11 @@ import ( "xorm.io/xorm" ) +var ( + ErrLockDB = fmt.Errorf("failed to obtain lock") + ErrReleaseLockDB = fmt.Errorf("failed to release lock") +) + type Dialect interface { DriverName() string Quote(string) string @@ -53,6 +58,13 @@ type Dialect interface { IsUniqueConstraintViolation(err error) bool ErrorMessage(err error) string IsDeadlock(err error) bool + Lock(LockCfg) error + Unlock(LockCfg) error +} + +type LockCfg struct { + Session *xorm.Session + Timeout int } type dialectFunc func(*xorm.Engine) Dialect @@ -288,3 +300,11 @@ func (b *BaseDialect) TruncateDBTables() error { func (b *BaseDialect) UpsertSQL(tableName string, keyCols, updateCols []string) string { return "" } + +func (b *BaseDialect) Lock(_ LockCfg) error { + return nil +} + +func (b *BaseDialect) Unlock(_ LockCfg) error { + return nil +} diff --git a/pkg/services/sqlstore/migrator/migrator.go b/pkg/services/sqlstore/migrator/migrator.go index 50c4ce347fa..08539571fe8 100644 --- a/pkg/services/sqlstore/migrator/migrator.go +++ b/pkg/services/sqlstore/migrator/migrator.go @@ -7,6 +7,7 @@ import ( _ "github.com/go-sql-driver/mysql" _ "github.com/lib/pq" _ "github.com/mattn/go-sqlite3" + "go.uber.org/atomic" "xorm.io/xorm" "github.com/grafana/grafana/pkg/infra/log" @@ -14,12 +15,18 @@ import ( "github.com/grafana/grafana/pkg/util/errutil" ) +var ( + ErrMigratorIsLocked = fmt.Errorf("migrator is locked") + ErrMigratorIsUnlocked = fmt.Errorf("migrator is unlocked") +) + type Migrator struct { DBEngine *xorm.Engine Dialect Dialect migrations []Migration Logger log.Logger Cfg *setting.Cfg + isLocked atomic.Bool } type MigrationLog struct { @@ -87,7 +94,32 @@ func (mg *Migrator) GetMigrationLog() (map[string]MigrationLog, error) { return logMap, nil } -func (mg *Migrator) Start() error { +func (mg *Migrator) Start(isDatabaseLockingEnabled bool, lockAttemptTimeout int) (err error) { + if !isDatabaseLockingEnabled { + return mg.run() + } + + return mg.InTransaction(func(sess *xorm.Session) error { + mg.Logger.Info("Locking database") + if err := casRestoreOnErr(&mg.isLocked, false, true, ErrMigratorIsLocked, mg.Dialect.Lock, LockCfg{Session: sess, Timeout: lockAttemptTimeout}); err != nil { + mg.Logger.Error("Failed to lock database", "error", err) + return err + } + + defer func() { + mg.Logger.Info("Unlocking database") + unlockErr := casRestoreOnErr(&mg.isLocked, true, false, ErrMigratorIsUnlocked, mg.Dialect.Unlock, LockCfg{Session: sess}) + if unlockErr != nil { + mg.Logger.Error("Failed to unlock database", "error", unlockErr) + } + }() + + // migration will run inside a nested transaction + return mg.run() + }) +} + +func (mg *Migrator) run() (err error) { mg.Logger.Info("Starting DB migrations") logMap, err := mg.GetMigrationLog() @@ -211,3 +243,15 @@ func (mg *Migrator) InTransaction(callback dbTransactionFunc) error { return nil } + +func casRestoreOnErr(lock *atomic.Bool, o, n bool, casErr error, f func(LockCfg) error, lockCfg LockCfg) error { + if !lock.CAS(o, n) { + return casErr + } + if err := f(lockCfg); err != nil { + // Automatically unlock/lock on error + lock.Store(o) + return err + } + return nil +} diff --git a/pkg/services/sqlstore/migrator/mysql_dialect.go b/pkg/services/sqlstore/migrator/mysql_dialect.go index b2da6b0b862..0eabbe3e848 100644 --- a/pkg/services/sqlstore/migrator/mysql_dialect.go +++ b/pkg/services/sqlstore/migrator/mysql_dialect.go @@ -1,6 +1,7 @@ package migrator import ( + "database/sql" "errors" "fmt" "strconv" @@ -8,6 +9,7 @@ import ( "github.com/VividCortex/mysqlerr" "github.com/go-sql-driver/mysql" + "github.com/golang-migrate/migrate/v4/database" "github.com/grafana/grafana/pkg/util/errutil" "xorm.io/xorm" ) @@ -225,3 +227,66 @@ func (db *MySQLDialect) UpsertSQL(tableName string, keyCols, updateCols []string ) return s } + +func (db *MySQLDialect) Lock(cfg LockCfg) error { + query := "SELECT GET_LOCK(?, ?)" + var success sql.NullBool + + lockName, err := db.getLockName() + if err != nil { + return fmt.Errorf("failed to generate lock name: %w", err) + } + + // trying to obtain the lock with the specific name + // the lock is exclusive per session and is released explicitly by executing RELEASE_LOCK() or implicitly when the session terminates + // it returns 1 if the lock was obtained successfully, + // 0 if the attempt timed out (for example, because another client has previously locked the name), + // or NULL if an error occurred + // starting from MySQL 5.7 it is even possible for a given session to acquire multiple locks for the same name + // however other sessions cannot acquire a lock with that name until the acquiring session releases all its locks for the name. + _, err = cfg.Session.SQL(query, lockName, cfg.Timeout).Get(&success) + if err != nil { + return err + } + if !success.Valid || !success.Bool { + return ErrLockDB + } + return nil +} + +func (db *MySQLDialect) Unlock(cfg LockCfg) error { + query := "SELECT RELEASE_LOCK(?)" + var success sql.NullBool + + lockName, err := db.getLockName() + if err != nil { + return fmt.Errorf("failed to generate lock name: %w", err) + } + + // trying to release the lock with the specific name + // it returns 1 if the lock was released, + // 0 if the lock was not established by this thread (in which case the lock is not released), + // and NULL if the named lock did not exist (it was never obtained by a call to GET_LOCK() or if it has previously been released) + _, err = cfg.Session.SQL(query, lockName).Get(&success) + if err != nil { + return err + } + if !success.Valid || !success.Bool { + return ErrReleaseLockDB + } + return nil +} + +func (db *MySQLDialect) getLockName() (string, error) { + cfg, err := mysql.ParseDSN(db.engine.DataSourceName()) + if err != nil { + return "", err + } + + s, err := database.GenerateAdvisoryLockId(cfg.DBName) + if err != nil { + return "", fmt.Errorf("failed to generate advisory lock key: %w", err) + } + + return s, nil +} diff --git a/pkg/services/sqlstore/migrator/postgres_dialect.go b/pkg/services/sqlstore/migrator/postgres_dialect.go index ccbb9301d30..2978f98ef00 100644 --- a/pkg/services/sqlstore/migrator/postgres_dialect.go +++ b/pkg/services/sqlstore/migrator/postgres_dialect.go @@ -3,9 +3,11 @@ package migrator import ( "errors" "fmt" + "regexp" "strconv" "strings" + "github.com/golang-migrate/migrate/v4/database" "github.com/lib/pq" "github.com/grafana/grafana/pkg/util/errutil" @@ -257,3 +259,76 @@ func (db *PostgresDialect) UpsertSQL(tableName string, keyCols, updateCols []str ) return s } + +func (db *PostgresDialect) Lock(cfg LockCfg) error { + // trying to obtain the lock for a resource identified by a 64-bit or 32-bit key value + // the lock is exclusive: multiple lock requests stack, so that if the same resource is locked three times + // it must then be unlocked three times to be released for other sessions' use. + // it will either obtain the lock immediately and return true, + // or return false if the lock cannot be acquired immediately. + query := "SELECT pg_try_advisory_lock(?)" + var success bool + + key, err := db.getLockKey() + if err != nil { + return fmt.Errorf("failed to generate advisory lock key: %w", err) + } + _, err = cfg.Session.SQL(query, key).Get(&success) + if err != nil { + return err + } + if !success { + return ErrLockDB + } + + return nil +} + +func (db *PostgresDialect) Unlock(cfg LockCfg) error { + // trying to release a previously-acquired exclusive session level advisory lock. + // it will either return true if the lock is successfully released or + // false if the lock was not held (in addition an SQL warning will be reported by the server) + query := "SELECT pg_advisory_unlock(?)" + var success bool + + key, err := db.getLockKey() + if err != nil { + return fmt.Errorf("failed to generate advisory lock key: %w", err) + } + _, err = cfg.Session.SQL(query, key).Get(&success) + if err != nil { + return err + } + if !success { + return ErrReleaseLockDB + } + return nil +} + +func getDBName(dsn string) (string, error) { + if strings.HasPrefix(dsn, "postgres://") || strings.HasPrefix(dsn, "postgresql://") { + parsedDSN, err := pq.ParseURL(dsn) + if err != nil { + return "", err + } + dsn = parsedDSN + } + re := regexp.MustCompile(`dbname=(\w+)`) + submatch := re.FindSubmatch([]byte(dsn)) + if len(submatch) < 2 { + return "", fmt.Errorf("failed to get database name") + } + return string(submatch[1]), nil +} + +func (db *PostgresDialect) getLockKey() (string, error) { + dbName, err := getDBName(db.engine.DataSourceName()) + if err != nil { + return "", err + } + key, err := database.GenerateAdvisoryLockId(dbName) + if err != nil { + return "", err + } + return key, nil +} diff --git a/pkg/services/sqlstore/mockstore/mockstore.go b/pkg/services/sqlstore/mockstore/mockstore.go index 0aef2583602..a10cb01a4e5 100644 --- a/pkg/services/sqlstore/mockstore/mockstore.go +++ b/pkg/services/sqlstore/mockstore/mockstore.go @@ -453,7 +453,7 @@ func (m SQLStoreMock) UpdateDataSource(ctx context.Context, cmd *models.UpdateDa return nil // TODO: Implement } -func (m SQLStoreMock) Migrate() error { +func (m SQLStoreMock) Migrate(_ bool) error { return nil // TODO: Implement } diff --git a/pkg/services/sqlstore/sqlstore.go b/pkg/services/sqlstore/sqlstore.go index 2cddc4c7519..e915861319f 100644 --- a/pkg/services/sqlstore/sqlstore.go +++ b/pkg/services/sqlstore/sqlstore.go @@ -56,8 +56,7 @@ type SQLStore struct { tracer tracing.Tracer } -func ProvideService(cfg *setting.Cfg, cacheService *localcache.CacheService, bus bus.Bus, migrations registry.DatabaseMigrator, tracer tracing.Tracer, -) (*SQLStore, error) { +func ProvideService(cfg *setting.Cfg, cacheService *localcache.CacheService, bus bus.Bus, migrations registry.DatabaseMigrator, tracer tracing.Tracer) (*SQLStore, error) { // This change will make xorm use an empty default schema for postgres and // by that mimic the functionality of how it was functioning before // xorm's changes above. @@ -67,7 +66,7 @@ func ProvideService(cfg *setting.Cfg, cacheService *localcache.CacheService, bus return nil, err } - if err := s.Migrate(); err != nil { + if err := s.Migrate(cfg.IsFeatureToggleEnabled(featuremgmt.FlagMigrationLocking)); err != nil { return nil, err } @@ -150,7 +149,7 @@ func newSQLStore(cfg *setting.Cfg, cacheService *localcache.CacheService, bus bu // Migrate performs database migrations. // Has to be done in a second phase (after initialization), since other services can register migrations during // the initialization phase. -func (ss *SQLStore) Migrate() error { +func (ss *SQLStore) Migrate(isDatabaseLockingEnabled bool) error { if ss.dbCfg.SkipMigrations { return nil } @@ -158,7 +157,7 @@ func (ss *SQLStore) Migrate() error { migrator := migrator.NewMigrator(ss.engine, ss.Cfg) ss.migrations.AddMigration(migrator) - return migrator.Start() + return migrator.Start(isDatabaseLockingEnabled, ss.dbCfg.MigrationLockAttemptTimeout) } // Sync syncs changes to the database. @@ -440,6 +439,7 @@ func (ss *SQLStore) readConfig() error { ss.dbCfg.CacheMode = sec.Key("cache_mode").MustString("private") ss.dbCfg.SkipMigrations = sec.Key("skip_migrations").MustBool() + ss.dbCfg.MigrationLockAttemptTimeout = sec.Key("locking_attempt_timeout_sec").MustInt() return nil } @@ -547,7 +547,7 @@ func initTestDB(migration registry.DatabaseMigrator, opts ...InitTestDBOpt) (*SQ return nil, err } - if err := testSQLStore.Migrate(); err != nil { + if err := testSQLStore.Migrate(false); err != nil { return nil, err } @@ -607,23 +607,24 @@ func IsTestDBMSSQL() bool { } type DatabaseConfig struct { - Type string - Host string - Name string - User string - Pwd string - Path string - SslMode string - CaCertPath string - ClientKeyPath string - ClientCertPath string - ServerCertName string - ConnectionString string - IsolationLevel string - MaxOpenConn int - MaxIdleConn int - ConnMaxLifetime int - CacheMode string - UrlQueryParams map[string][]string - SkipMigrations bool + Type string + Host string + Name string + User string + Pwd string + Path string + SslMode string + CaCertPath string + ClientKeyPath string + ClientCertPath string + ServerCertName string + ConnectionString string + IsolationLevel string + MaxOpenConn int + MaxIdleConn int + ConnMaxLifetime int + CacheMode string + UrlQueryParams map[string][]string + SkipMigrations bool + MigrationLockAttemptTimeout int } diff --git a/pkg/services/sqlstore/store.go b/pkg/services/sqlstore/store.go index bfadaf529c8..8b94be49e85 100644 --- a/pkg/services/sqlstore/store.go +++ b/pkg/services/sqlstore/store.go @@ -117,7 +117,7 @@ type Store interface { DeleteDataSource(ctx context.Context, cmd *models.DeleteDataSourceCommand) error AddDataSource(ctx context.Context, cmd *models.AddDataSourceCommand) error UpdateDataSource(ctx context.Context, cmd *models.UpdateDataSourceCommand) error - Migrate() error + Migrate(bool) error Sync() error Reset() error Quote(value string) string