[8.4.x]: SQLStore: Prevent concurrent migrations (#44101) (#45447)

* SQLStore: Prevent concurrent migrations (#44101)

* SQLStore: Prevent concurrent migrations

* Hide behind a feature toggle

* Configurable locking attempt timeout

* Update docs/sources/administration/configuration.md

Co-authored-by: Igor Suleymanov <radiohead@users.noreply.github.com>
Co-authored-by: achatterjee-grafana <70489351+achatterjee-grafana@users.noreply.github.com>
(cherry picked from commit d718ee1918)

* Resolve dependency cycle (#45427)

(cherry picked from commit 6a38ce2307)
This commit is contained in:
Sofia Papagiannaki 2022-02-16 11:23:54 +02:00 committed by GitHub
parent e43e5c3c42
commit 3d2fbcba2d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 510 additions and 41 deletions

View File

@ -122,6 +122,9 @@ path = grafana.db
# For "sqlite3" only. cache mode setting used for connecting to the database
cache_mode = private
# For "mysql" only if lockingMigration feature toggle is set. How many seconds to wait before failing to lock the database for the migrations, default is 0.
locking_attempt_timeout_sec = 0
#################################### Cache server #############################
[remote_cache]
# Either "redis", "memcached" or "database" default is "database"

View File

@ -123,6 +123,9 @@
# For "sqlite3" only. cache mode setting used for connecting to the database. (private, shared)
;cache_mode = private
# For "mysql" only if lockingMigration feature toggle is set. How many seconds to wait before failing to lock the database for the migrations, default is 0.
;locking_attempt_timeout_sec = 0
################################### Data sources #########################
[datasources]
# Upper limit of data sources that Grafana will return. This limit is a temporary configuration and it will be deprecated when pagination will be introduced on the list data sources API.

View File

@ -314,6 +314,10 @@ The maximum number of open connections to the database.
Sets the maximum amount of time a connection may be reused. The default is 14400 (which means 14400 seconds or 4 hours). For MySQL, this setting should be shorter than the [`wait_timeout`](https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html#sysvar_wait_timeout) variable.
### locking_attempt_timeout_sec
For "mysql", if `lockingMigration` feature toggle is set, specify the time (in seconds) to wait before failing to lock the database for the migrations. Default is 0.
### log_queries
Set to `true` to log the sql calls and execution times.

13
go.mod
View File

@ -77,7 +77,7 @@ require (
github.com/ohler55/ojg v1.12.9
github.com/opentracing/opentracing-go v1.2.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pkg/browser v0.0.0-20210904010418-6d279e18f982 // indirect
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
github.com/pkg/errors v0.9.1
github.com/prometheus/alertmanager v0.23.1-0.20211116083607-e2a10119aaf7
github.com/prometheus/client_golang v1.12.1
@ -105,7 +105,7 @@ require (
go.opentelemetry.io/otel/trace v1.2.0
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e
golang.org/x/exp v0.0.0-20210220032938-85be41e4509f // indirect
golang.org/x/net v0.0.0-20210903162142-ad29c8ab022f
golang.org/x/net v0.0.0-20211013171255-e13a2654a71e
golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac
@ -239,9 +239,9 @@ require (
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/weaveworks/promrus v1.2.0 // indirect
github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 // indirect
go.mongodb.org/mongo-driver v1.5.2 // indirect
go.mongodb.org/mongo-driver v1.7.0 // indirect
go.opencensus.io v0.23.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/atomic v1.9.0
go.uber.org/goleak v1.1.10 // indirect
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect
@ -252,7 +252,10 @@ require (
gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d // indirect
)
require cloud.google.com/go/kms v1.1.0
require (
cloud.google.com/go/kms v1.1.0
github.com/golang-migrate/migrate/v4 v4.7.0
)
require (
github.com/Azure/go-autorest/autorest/adal v0.9.15 // indirect

11
go.sum
View File

@ -1044,6 +1044,7 @@ github.com/gogo/status v1.1.0 h1:+eIkrewn5q6b30y+g/BJINVVdi2xH7je5MPJ3ZPK3JA=
github.com/gogo/status v1.1.0/go.mod h1:BFv9nrluPLmrS0EmGVvLaPNmRosr9KapBYd5/hpY1WM=
github.com/golang-jwt/jwt/v4 v4.0.0 h1:RAqyYixv1p7uEnocuy8P1nru5wprCh/MH2BIlW5z5/o=
github.com/golang-jwt/jwt/v4 v4.0.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg=
github.com/golang-migrate/migrate/v4 v4.7.0 h1:gONcHxHApDTKXDyLH/H97gEHmpu1zcnnbAaq2zgrPrs=
github.com/golang-migrate/migrate/v4 v4.7.0/go.mod h1:Qvut3N4xKWjoH3sokBccML6WyHSnggXm/DvMMnTsQIc=
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY=
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0=
@ -1959,8 +1960,8 @@ github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuR
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA=
github.com/pkg/browser v0.0.0-20210904010418-6d279e18f982 h1:TdFv+3Gr3GaghJ/o80aulO4ian7GHGWMdLBXoLZH1Is=
github.com/pkg/browser v0.0.0-20210904010418-6d279e18f982/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI=
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 h1:KoWmjvw+nsYOo29YJK9vDA65RGE3NrOnUtO7a+RF9HU=
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1-0.20171018195549-f15c970de5b7/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@ -2474,8 +2475,9 @@ go.mongodb.org/mongo-driver v1.4.3/go.mod h1:WcMNYLx/IlOxLe6JRJiv2uXuCz6zBLndR4S
go.mongodb.org/mongo-driver v1.4.4/go.mod h1:WcMNYLx/IlOxLe6JRJiv2uXuCz6zBLndR4SoGjYphSc=
go.mongodb.org/mongo-driver v1.4.6/go.mod h1:WcMNYLx/IlOxLe6JRJiv2uXuCz6zBLndR4SoGjYphSc=
go.mongodb.org/mongo-driver v1.5.1/go.mod h1:gRXCHX4Jo7J0IJ1oDQyUxF7jfy19UfxniMS4xxMmUqw=
go.mongodb.org/mongo-driver v1.5.2 h1:AsxOLoJTgP6YNM0fXWw4OjdluYmWzQYp+lFJL7xu9fU=
go.mongodb.org/mongo-driver v1.5.2/go.mod h1:gRXCHX4Jo7J0IJ1oDQyUxF7jfy19UfxniMS4xxMmUqw=
go.mongodb.org/mongo-driver v1.7.0 h1:hHrvOBWlWB2c7+8Gh/Xi5jj82AgidK/t7KVXBZ+IyUA=
go.mongodb.org/mongo-driver v1.7.0/go.mod h1:Q4oFMbo1+MSNqICAdYMlC/zSTrwCogR4R8NzkI+yfU8=
go.mozilla.org/pkcs7 v0.0.0-20200128120323-432b2356ecb1/go.mod h1:SNgMg+EgDFwmvSmLRTNKC5fegJjB7v23qTQ0XLGUNHk=
go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
@ -2731,8 +2733,9 @@ golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210903162142-ad29c8ab022f h1:w6wWR0H+nyVpbSAQbzVEIACVyr/h8l/BEkY6Sokc7Eg=
golang.org/x/net v0.0.0-20210903162142-ad29c8ab022f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211013171255-e13a2654a71e h1:Xj+JO91noE97IN6F/7WZxzC5QE6yENAQPrwIYhW3bsA=
golang.org/x/net v0.0.0-20211013171255-e13a2654a71e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20181106182150-f42d05182288/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=

View File

@ -38,4 +38,5 @@ export interface FeatureToggles {
validatedQueries?: boolean;
swaggerUi?: boolean;
featureHighlights?: boolean;
migrationLocking?: boolean;
}

View File

@ -55,6 +55,7 @@ func runDbCommand(command func(commandLine utils.CommandLine, sqlStore *sqlstore
if err != nil {
return errutil.Wrap("failed to initialize tracer service", err)
}
sqlStore, err := sqlstore.ProvideService(cfg, nil, bus.GetBus(), &migrations.OSSMigrations{}, tracer)
if err != nil {
return errutil.Wrap("failed to initialize SQL store", err)

View File

@ -121,5 +121,10 @@ var (
Description: "Highlight Enterprise features",
State: FeatureStateStable,
},
{
Name: "migrationLocking",
Description: "Lock database during migrations",
State: FeatureStateBeta,
},
}
)

View File

@ -90,4 +90,8 @@ const (
// FlagFeatureHighlights
// Highlight Enterprise features
FlagFeatureHighlights = "featureHighlights"
// FlagMigrationLocking
// Lock database during migrations
FlagMigrationLocking = "migrationLocking"
)

View File

@ -165,7 +165,7 @@ func TestMigrations(t *testing.T) {
acmigrator := migrator.NewMigrator(x, tc.config)
acmig.AddTeamMembershipMigrations(acmigrator)
errRunningMig := acmigrator.Start()
errRunningMig := acmigrator.Start(false, 0)
require.NoError(t, errRunningMig)
for _, user := range users {
@ -221,7 +221,7 @@ func setupTestDB(t *testing.T) *xorm.Engine {
migrations := &migrations.OSSMigrations{}
migrations.AddMigration(mg)
err = mg.Start()
err = mg.Start(false, 0)
require.NoError(t, err)
return x

View File

@ -1,10 +1,16 @@
package migrations
import (
"errors"
"fmt"
"os"
"strings"
"sync"
"sync/atomic"
"testing"
"github.com/go-sql-driver/mysql"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"xorm.io/xorm"
@ -32,7 +38,7 @@ func TestMigrations(t *testing.T) {
migrations.AddMigration(mg)
expectedMigrations := mg.GetMigrationIDs(true)
err = mg.Start()
err = mg.Start(false, 0)
require.NoError(t, err)
has, err := x.SQL(query).Get(&result)
@ -44,7 +50,7 @@ func TestMigrations(t *testing.T) {
mg = NewMigrator(x, &setting.Cfg{})
migrations.AddMigration(mg)
err = mg.Start()
err = mg.Start(false, 0)
require.NoError(t, err)
has, err = x.SQL(query).Get(&result)
@ -53,6 +59,179 @@ func TestMigrations(t *testing.T) {
checkStepsAndDatabaseMatch(t, mg, expectedMigrations)
}
func TestMigrationLock(t *testing.T) {
dbType := getDBType()
if dbType == SQLite {
t.Skip()
}
testDB := getTestDB(dbType)
x, err := xorm.NewEngine(testDB.DriverName, testDB.ConnStr)
require.NoError(t, err)
dialect := NewDialect(x)
sess := x.NewSession()
t.Cleanup(func() {
sess.Close()
})
cfg := LockCfg{Session: sess}
t.Run("obtaining lock should succeed", func(t *testing.T) {
err := dialect.Lock(cfg)
require.NoError(t, err)
t.Run("releasing previously obtained lock should succeed", func(t *testing.T) {
err := dialect.Unlock(cfg)
require.NoError(t, err)
t.Run("releasing already released lock should fail", func(t *testing.T) {
err := dialect.Unlock(cfg)
require.Error(t, err)
assert.ErrorIs(t, err, ErrReleaseLockDB)
})
})
})
t.Run("obtaining lock twice should succeed", func(t *testing.T) {
err = dialect.Lock(cfg)
require.NoError(t, err)
err = dialect.Lock(cfg)
require.NoError(t, err)
t.Cleanup(func() {
err := dialect.Unlock(cfg)
require.NoError(t, err)
err = dialect.Unlock(cfg)
require.NoError(t, err)
})
})
t.Run("obtaining same lock from another session should fail", func(t *testing.T) {
x2, err := xorm.NewEngine(testDB.DriverName, testDB.ConnStr)
require.NoError(t, err)
sess2 := x2.NewSession()
d2 := NewDialect(x2)
err = dialect.Lock(cfg)
require.NoError(t, err)
err = d2.Lock(LockCfg{Session: sess2})
require.Error(t, err)
assert.ErrorIs(t, err, ErrLockDB)
t.Cleanup(func() {
err := dialect.Unlock(cfg)
require.NoError(t, err)
})
})
t.Run("obtaining lock for a another database should succeed", func(t *testing.T) {
err := dialect.Lock(cfg)
require.NoError(t, err)
x, err := xorm.NewEngine(testDB.DriverName, replaceDBName(t, testDB.ConnStr, dbType))
require.NoError(t, err)
d := NewDialect(x)
err = d.Lock(cfg)
require.NoError(t, err)
t.Cleanup(func() {
err := dialect.Unlock(cfg)
require.NoError(t, err)
err = d.Unlock(cfg)
require.NoError(t, err)
})
})
}
func TestMigratorLocking(t *testing.T) {
dbType := getDBType()
testDB := getTestDB(dbType)
x, err := xorm.NewEngine(testDB.DriverName, testDB.ConnStr)
require.NoError(t, err)
err = NewDialect(x).CleanDB()
require.NoError(t, err)
mg := NewMigrator(x, &setting.Cfg{})
migrations := &OSSMigrations{}
migrations.AddMigration(mg)
var errorNum int64
t.Run("when concurrent migrations for the same migrator occur, the second one should fail", func(t *testing.T) {
for i := 0; i < 2; i++ {
i := i // capture i variable
t.Run(fmt.Sprintf("run migration %d", i), func(t *testing.T) {
t.Parallel()
err = mg.Start(true, 0)
if err != nil {
if errors.Is(err, ErrMigratorIsLocked) {
atomic.AddInt64(&errorNum, 1)
}
}
})
}
})
assert.Equal(t, int64(1), errorNum)
}
func TestDatabaseLocking(t *testing.T) {
dbType := getDBType()
// skip for SQLite since there is no database locking (only migrator locking)
if dbType == SQLite {
t.Skip()
}
testDB := getTestDB(dbType)
x, err := xorm.NewEngine(testDB.DriverName, testDB.ConnStr)
require.NoError(t, err)
err = NewDialect(x).CleanDB()
require.NoError(t, err)
mg1 := NewMigrator(x, &setting.Cfg{})
migrations := &OSSMigrations{}
migrations.AddMigration(mg1)
reg := registry{
migrators: make(map[int]*Migrator, 2),
}
reg.set(0, mg1)
mg2 := NewMigrator(x, &setting.Cfg{})
migrations.AddMigration(mg2)
reg.set(1, mg2)
var errorNum int64
t.Run("when concurrent migrations occur for different migrators occur, the second one should fail", func(t *testing.T) {
for i := 0; i < 2; i++ {
i := i // capture i variable
t.Run(fmt.Sprintf("run migration %d", i), func(t *testing.T) {
mg, err := reg.get(i)
require.NoError(t, err)
t.Parallel()
err = mg.Start(true, 0)
if err != nil {
assert.ErrorIs(t, err, ErrLockDB)
if errors.Is(err, ErrLockDB) {
atomic.AddInt64(&errorNum, 1)
}
}
})
}
})
assert.Equal(t, int64(1), errorNum)
}
func checkStepsAndDatabaseMatch(t *testing.T, mg *Migrator, expected []string) {
t.Helper()
log, err := mg.GetMigrationLog()
@ -91,3 +270,61 @@ func checkStepsAndDatabaseMatch(t *testing.T, mg *Migrator, expected []string) {
}
require.Failf(t, "the number of migrations does not match log in database", msg)
}
func getDBType() string {
dbType := SQLite
// environment variable present for test db?
if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
dbType = db
}
return dbType
}
func getTestDB(dbType string) sqlutil.TestDB {
switch dbType {
case "mysql":
return sqlutil.MySQLTestDB()
case "postgres":
return sqlutil.PostgresTestDB()
default:
return sqlutil.SQLite3TestDB()
}
}
func replaceDBName(t *testing.T, connStr, dbType string) string {
switch dbType {
case "mysql":
cfg, err := mysql.ParseDSN(connStr)
require.NoError(t, err)
cfg.DBName = "grafana_ds_tests"
return cfg.FormatDSN()
case "postgres":
return strings.Replace(connStr, "dbname=grafanatest", "dbname=grafanadstest", 1)
default:
return connStr
}
}
type registry struct {
mu sync.Mutex
migrators map[int]*Migrator
}
func (r *registry) get(i int) (*Migrator, error) {
r.mu.Lock()
defer r.mu.Unlock()
m, ok := r.migrators[i]
if !ok {
return nil, fmt.Errorf("invalid index: %d", i)
}
return m, nil
}
func (r *registry) set(i int, mg *Migrator) {
r.mu.Lock()
defer r.mu.Unlock()
r.migrators[i] = mg
}

View File

@ -7,6 +7,11 @@ import (
"xorm.io/xorm"
)
var (
ErrLockDB = fmt.Errorf("failed to obtain lock")
ErrReleaseLockDB = fmt.Errorf("failed to release lock")
)
type Dialect interface {
DriverName() string
Quote(string) string
@ -53,6 +58,13 @@ type Dialect interface {
IsUniqueConstraintViolation(err error) bool
ErrorMessage(err error) string
IsDeadlock(err error) bool
Lock(LockCfg) error
Unlock(LockCfg) error
}
type LockCfg struct {
Session *xorm.Session
Timeout int
}
type dialectFunc func(*xorm.Engine) Dialect
@ -288,3 +300,11 @@ func (b *BaseDialect) TruncateDBTables() error {
func (b *BaseDialect) UpsertSQL(tableName string, keyCols, updateCols []string) string {
return ""
}
func (b *BaseDialect) Lock(_ LockCfg) error {
return nil
}
func (b *BaseDialect) Unlock(_ LockCfg) error {
return nil
}

View File

@ -7,6 +7,7 @@ import (
_ "github.com/go-sql-driver/mysql"
_ "github.com/lib/pq"
_ "github.com/mattn/go-sqlite3"
"go.uber.org/atomic"
"xorm.io/xorm"
"github.com/grafana/grafana/pkg/infra/log"
@ -14,12 +15,18 @@ import (
"github.com/grafana/grafana/pkg/util/errutil"
)
var (
ErrMigratorIsLocked = fmt.Errorf("migrator is locked")
ErrMigratorIsUnlocked = fmt.Errorf("migrator is unlocked")
)
type Migrator struct {
DBEngine *xorm.Engine
Dialect Dialect
migrations []Migration
Logger log.Logger
Cfg *setting.Cfg
isLocked atomic.Bool
}
type MigrationLog struct {
@ -87,7 +94,32 @@ func (mg *Migrator) GetMigrationLog() (map[string]MigrationLog, error) {
return logMap, nil
}
func (mg *Migrator) Start() error {
func (mg *Migrator) Start(isDatabaseLockingEnabled bool, lockAttemptTimeout int) (err error) {
if !isDatabaseLockingEnabled {
return mg.run()
}
return mg.InTransaction(func(sess *xorm.Session) error {
mg.Logger.Info("Locking database")
if err := casRestoreOnErr(&mg.isLocked, false, true, ErrMigratorIsLocked, mg.Dialect.Lock, LockCfg{Session: sess, Timeout: lockAttemptTimeout}); err != nil {
mg.Logger.Error("Failed to lock database", "error", err)
return err
}
defer func() {
mg.Logger.Info("Unlocking database")
unlockErr := casRestoreOnErr(&mg.isLocked, true, false, ErrMigratorIsUnlocked, mg.Dialect.Unlock, LockCfg{Session: sess})
if unlockErr != nil {
mg.Logger.Error("Failed to unlock database", "error", unlockErr)
}
}()
// migration will run inside a nested transaction
return mg.run()
})
}
func (mg *Migrator) run() (err error) {
mg.Logger.Info("Starting DB migrations")
logMap, err := mg.GetMigrationLog()
@ -211,3 +243,15 @@ func (mg *Migrator) InTransaction(callback dbTransactionFunc) error {
return nil
}
func casRestoreOnErr(lock *atomic.Bool, o, n bool, casErr error, f func(LockCfg) error, lockCfg LockCfg) error {
if !lock.CAS(o, n) {
return casErr
}
if err := f(lockCfg); err != nil {
// Automatically unlock/lock on error
lock.Store(o)
return err
}
return nil
}

View File

@ -1,6 +1,7 @@
package migrator
import (
"database/sql"
"errors"
"fmt"
"strconv"
@ -8,6 +9,7 @@ import (
"github.com/VividCortex/mysqlerr"
"github.com/go-sql-driver/mysql"
"github.com/golang-migrate/migrate/v4/database"
"github.com/grafana/grafana/pkg/util/errutil"
"xorm.io/xorm"
)
@ -225,3 +227,66 @@ func (db *MySQLDialect) UpsertSQL(tableName string, keyCols, updateCols []string
)
return s
}
func (db *MySQLDialect) Lock(cfg LockCfg) error {
query := "SELECT GET_LOCK(?, ?)"
var success sql.NullBool
lockName, err := db.getLockName()
if err != nil {
return fmt.Errorf("failed to generate lock name: %w", err)
}
// trying to obtain the lock with the specific name
// the lock is exclusive per session and is released explicitly by executing RELEASE_LOCK() or implicitly when the session terminates
// it returns 1 if the lock was obtained successfully,
// 0 if the attempt timed out (for example, because another client has previously locked the name),
// or NULL if an error occurred
// starting from MySQL 5.7 it is even possible for a given session to acquire multiple locks for the same name
// however other sessions cannot acquire a lock with that name until the acquiring session releases all its locks for the name.
_, err = cfg.Session.SQL(query, lockName, cfg.Timeout).Get(&success)
if err != nil {
return err
}
if !success.Valid || !success.Bool {
return ErrLockDB
}
return nil
}
func (db *MySQLDialect) Unlock(cfg LockCfg) error {
query := "SELECT RELEASE_LOCK(?)"
var success sql.NullBool
lockName, err := db.getLockName()
if err != nil {
return fmt.Errorf("failed to generate lock name: %w", err)
}
// trying to release the lock with the specific name
// it returns 1 if the lock was released,
// 0 if the lock was not established by this thread (in which case the lock is not released),
// and NULL if the named lock did not exist (it was never obtained by a call to GET_LOCK() or if it has previously been released)
_, err = cfg.Session.SQL(query, lockName).Get(&success)
if err != nil {
return err
}
if !success.Valid || !success.Bool {
return ErrReleaseLockDB
}
return nil
}
func (db *MySQLDialect) getLockName() (string, error) {
cfg, err := mysql.ParseDSN(db.engine.DataSourceName())
if err != nil {
return "", err
}
s, err := database.GenerateAdvisoryLockId(cfg.DBName)
if err != nil {
return "", fmt.Errorf("failed to generate advisory lock key: %w", err)
}
return s, nil
}

View File

@ -3,9 +3,11 @@ package migrator
import (
"errors"
"fmt"
"regexp"
"strconv"
"strings"
"github.com/golang-migrate/migrate/v4/database"
"github.com/lib/pq"
"github.com/grafana/grafana/pkg/util/errutil"
@ -257,3 +259,76 @@ func (db *PostgresDialect) UpsertSQL(tableName string, keyCols, updateCols []str
)
return s
}
func (db *PostgresDialect) Lock(cfg LockCfg) error {
// trying to obtain the lock for a resource identified by a 64-bit or 32-bit key value
// the lock is exclusive: multiple lock requests stack, so that if the same resource is locked three times
// it must then be unlocked three times to be released for other sessions' use.
// it will either obtain the lock immediately and return true,
// or return false if the lock cannot be acquired immediately.
query := "SELECT pg_try_advisory_lock(?)"
var success bool
key, err := db.getLockKey()
if err != nil {
return fmt.Errorf("failed to generate advisory lock key: %w", err)
}
_, err = cfg.Session.SQL(query, key).Get(&success)
if err != nil {
return err
}
if !success {
return ErrLockDB
}
return nil
}
func (db *PostgresDialect) Unlock(cfg LockCfg) error {
// trying to release a previously-acquired exclusive session level advisory lock.
// it will either return true if the lock is successfully released or
// false if the lock was not held (in addition an SQL warning will be reported by the server)
query := "SELECT pg_advisory_unlock(?)"
var success bool
key, err := db.getLockKey()
if err != nil {
return fmt.Errorf("failed to generate advisory lock key: %w", err)
}
_, err = cfg.Session.SQL(query, key).Get(&success)
if err != nil {
return err
}
if !success {
return ErrReleaseLockDB
}
return nil
}
func getDBName(dsn string) (string, error) {
if strings.HasPrefix(dsn, "postgres://") || strings.HasPrefix(dsn, "postgresql://") {
parsedDSN, err := pq.ParseURL(dsn)
if err != nil {
return "", err
}
dsn = parsedDSN
}
re := regexp.MustCompile(`dbname=(\w+)`)
submatch := re.FindSubmatch([]byte(dsn))
if len(submatch) < 2 {
return "", fmt.Errorf("failed to get database name")
}
return string(submatch[1]), nil
}
func (db *PostgresDialect) getLockKey() (string, error) {
dbName, err := getDBName(db.engine.DataSourceName())
if err != nil {
return "", err
}
key, err := database.GenerateAdvisoryLockId(dbName)
if err != nil {
return "", err
}
return key, nil
}

View File

@ -453,7 +453,7 @@ func (m SQLStoreMock) UpdateDataSource(ctx context.Context, cmd *models.UpdateDa
return nil // TODO: Implement
}
func (m SQLStoreMock) Migrate() error {
func (m SQLStoreMock) Migrate(_ bool) error {
return nil // TODO: Implement
}

View File

@ -56,8 +56,7 @@ type SQLStore struct {
tracer tracing.Tracer
}
func ProvideService(cfg *setting.Cfg, cacheService *localcache.CacheService, bus bus.Bus, migrations registry.DatabaseMigrator, tracer tracing.Tracer,
) (*SQLStore, error) {
func ProvideService(cfg *setting.Cfg, cacheService *localcache.CacheService, bus bus.Bus, migrations registry.DatabaseMigrator, tracer tracing.Tracer) (*SQLStore, error) {
// This change will make xorm use an empty default schema for postgres and
// by that mimic the functionality of how it was functioning before
// xorm's changes above.
@ -67,7 +66,7 @@ func ProvideService(cfg *setting.Cfg, cacheService *localcache.CacheService, bus
return nil, err
}
if err := s.Migrate(); err != nil {
if err := s.Migrate(cfg.IsFeatureToggleEnabled(featuremgmt.FlagMigrationLocking)); err != nil {
return nil, err
}
@ -150,7 +149,7 @@ func newSQLStore(cfg *setting.Cfg, cacheService *localcache.CacheService, bus bu
// Migrate performs database migrations.
// Has to be done in a second phase (after initialization), since other services can register migrations during
// the initialization phase.
func (ss *SQLStore) Migrate() error {
func (ss *SQLStore) Migrate(isDatabaseLockingEnabled bool) error {
if ss.dbCfg.SkipMigrations {
return nil
}
@ -158,7 +157,7 @@ func (ss *SQLStore) Migrate() error {
migrator := migrator.NewMigrator(ss.engine, ss.Cfg)
ss.migrations.AddMigration(migrator)
return migrator.Start()
return migrator.Start(isDatabaseLockingEnabled, ss.dbCfg.MigrationLockAttemptTimeout)
}
// Sync syncs changes to the database.
@ -440,6 +439,7 @@ func (ss *SQLStore) readConfig() error {
ss.dbCfg.CacheMode = sec.Key("cache_mode").MustString("private")
ss.dbCfg.SkipMigrations = sec.Key("skip_migrations").MustBool()
ss.dbCfg.MigrationLockAttemptTimeout = sec.Key("locking_attempt_timeout_sec").MustInt()
return nil
}
@ -547,7 +547,7 @@ func initTestDB(migration registry.DatabaseMigrator, opts ...InitTestDBOpt) (*SQ
return nil, err
}
if err := testSQLStore.Migrate(); err != nil {
if err := testSQLStore.Migrate(false); err != nil {
return nil, err
}
@ -626,4 +626,5 @@ type DatabaseConfig struct {
CacheMode string
UrlQueryParams map[string][]string
SkipMigrations bool
MigrationLockAttemptTimeout int
}

View File

@ -117,7 +117,7 @@ type Store interface {
DeleteDataSource(ctx context.Context, cmd *models.DeleteDataSourceCommand) error
AddDataSource(ctx context.Context, cmd *models.AddDataSourceCommand) error
UpdateDataSource(ctx context.Context, cmd *models.UpdateDataSourceCommand) error
Migrate() error
Migrate(bool) error
Sync() error
Reset() error
Quote(value string) string