SQLStore: Prevent concurrent migrations (#44101)

* SQLStore: Prevent concurrent migrations

* Hide behind a feature toggle

* Configurable locking attempt timeout

* Update docs/sources/administration/configuration.md

Co-authored-by: Igor Suleymanov <radiohead@users.noreply.github.com>
Co-authored-by: achatterjee-grafana <70489351+achatterjee-grafana@users.noreply.github.com>
This commit is contained in:
Sofia Papagiannaki
2022-02-15 18:54:27 +02:00
committed by GitHub
parent 163b570f5d
commit d718ee1918
18 changed files with 521 additions and 42 deletions

View File

@@ -122,6 +122,9 @@ path = grafana.db
# For "sqlite3" only. cache mode setting used for connecting to the database # For "sqlite3" only. cache mode setting used for connecting to the database
cache_mode = private cache_mode = private
# For "mysql" only if lockingMigration feature toggle is set. How many seconds to wait before failing to lock the database for the migrations, default is 0.
locking_attempt_timeout_sec = 0
#################################### Cache server ############################# #################################### Cache server #############################
[remote_cache] [remote_cache]
# Either "redis", "memcached" or "database" default is "database" # Either "redis", "memcached" or "database" default is "database"

View File

@@ -123,6 +123,9 @@
# For "sqlite3" only. cache mode setting used for connecting to the database. (private, shared) # For "sqlite3" only. cache mode setting used for connecting to the database. (private, shared)
;cache_mode = private ;cache_mode = private
# For "mysql" only if lockingMigration feature toggle is set. How many seconds to wait before failing to lock the database for the migrations, default is 0.
;locking_attempt_timeout_sec = 0
################################### Data sources ######################### ################################### Data sources #########################
[datasources] [datasources]
# Upper limit of data sources that Grafana will return. This limit is a temporary configuration and it will be deprecated when pagination will be introduced on the list data sources API. # Upper limit of data sources that Grafana will return. This limit is a temporary configuration and it will be deprecated when pagination will be introduced on the list data sources API.

View File

@@ -314,6 +314,10 @@ The maximum number of open connections to the database.
Sets the maximum amount of time a connection may be reused. The default is 14400 (which means 14400 seconds or 4 hours). For MySQL, this setting should be shorter than the [`wait_timeout`](https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html#sysvar_wait_timeout) variable. Sets the maximum amount of time a connection may be reused. The default is 14400 (which means 14400 seconds or 4 hours). For MySQL, this setting should be shorter than the [`wait_timeout`](https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html#sysvar_wait_timeout) variable.
### locking_attempt_timeout_sec
For "mysql", if `lockingMigration` feature toggle is set, specify the time (in seconds) to wait before failing to lock the database for the migrations. Default is 0.
### log_queries ### log_queries
Set to `true` to log the sql calls and execution times. Set to `true` to log the sql calls and execution times.

13
go.mod
View File

@@ -77,7 +77,7 @@ require (
github.com/ohler55/ojg v1.12.9 github.com/ohler55/ojg v1.12.9
github.com/opentracing/opentracing-go v1.2.0 github.com/opentracing/opentracing-go v1.2.0
github.com/patrickmn/go-cache v2.1.0+incompatible github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pkg/browser v0.0.0-20210904010418-6d279e18f982 // indirect github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
github.com/prometheus/alertmanager v0.23.1-0.20211116083607-e2a10119aaf7 github.com/prometheus/alertmanager v0.23.1-0.20211116083607-e2a10119aaf7
github.com/prometheus/client_golang v1.12.1 github.com/prometheus/client_golang v1.12.1
@@ -105,7 +105,7 @@ require (
go.opentelemetry.io/otel/trace v1.2.0 go.opentelemetry.io/otel/trace v1.2.0
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e
golang.org/x/exp v0.0.0-20210220032938-85be41e4509f // indirect golang.org/x/exp v0.0.0-20210220032938-85be41e4509f // indirect
golang.org/x/net v0.0.0-20210903162142-ad29c8ab022f golang.org/x/net v0.0.0-20211013171255-e13a2654a71e
golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac
@@ -239,9 +239,9 @@ require (
github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/weaveworks/promrus v1.2.0 // indirect github.com/weaveworks/promrus v1.2.0 // indirect
github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 // indirect github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 // indirect
go.mongodb.org/mongo-driver v1.5.2 // indirect go.mongodb.org/mongo-driver v1.7.0 // indirect
go.opencensus.io v0.23.0 // indirect go.opencensus.io v0.23.0 // indirect
go.uber.org/atomic v1.9.0 // indirect go.uber.org/atomic v1.9.0
go.uber.org/goleak v1.1.10 // indirect go.uber.org/goleak v1.1.10 // indirect
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect
@@ -252,7 +252,10 @@ require (
gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d // indirect gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d // indirect
) )
require cloud.google.com/go/kms v1.1.0 require (
cloud.google.com/go/kms v1.1.0
github.com/golang-migrate/migrate/v4 v4.7.0
)
require ( require (
github.com/Azure/go-autorest/autorest/adal v0.9.15 // indirect github.com/Azure/go-autorest/autorest/adal v0.9.15 // indirect

11
go.sum
View File

@@ -1044,6 +1044,7 @@ github.com/gogo/status v1.1.0 h1:+eIkrewn5q6b30y+g/BJINVVdi2xH7je5MPJ3ZPK3JA=
github.com/gogo/status v1.1.0/go.mod h1:BFv9nrluPLmrS0EmGVvLaPNmRosr9KapBYd5/hpY1WM= github.com/gogo/status v1.1.0/go.mod h1:BFv9nrluPLmrS0EmGVvLaPNmRosr9KapBYd5/hpY1WM=
github.com/golang-jwt/jwt/v4 v4.0.0 h1:RAqyYixv1p7uEnocuy8P1nru5wprCh/MH2BIlW5z5/o= github.com/golang-jwt/jwt/v4 v4.0.0 h1:RAqyYixv1p7uEnocuy8P1nru5wprCh/MH2BIlW5z5/o=
github.com/golang-jwt/jwt/v4 v4.0.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg= github.com/golang-jwt/jwt/v4 v4.0.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg=
github.com/golang-migrate/migrate/v4 v4.7.0 h1:gONcHxHApDTKXDyLH/H97gEHmpu1zcnnbAaq2zgrPrs=
github.com/golang-migrate/migrate/v4 v4.7.0/go.mod h1:Qvut3N4xKWjoH3sokBccML6WyHSnggXm/DvMMnTsQIc= github.com/golang-migrate/migrate/v4 v4.7.0/go.mod h1:Qvut3N4xKWjoH3sokBccML6WyHSnggXm/DvMMnTsQIc=
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY= github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY=
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0=
@@ -1963,8 +1964,8 @@ github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuR
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA= github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA=
github.com/pkg/browser v0.0.0-20210904010418-6d279e18f982 h1:TdFv+3Gr3GaghJ/o80aulO4ian7GHGWMdLBXoLZH1Is= github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 h1:KoWmjvw+nsYOo29YJK9vDA65RGE3NrOnUtO7a+RF9HU=
github.com/pkg/browser v0.0.0-20210904010418-6d279e18f982/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI= github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1-0.20171018195549-f15c970de5b7/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1-0.20171018195549-f15c970de5b7/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@@ -2478,8 +2479,9 @@ go.mongodb.org/mongo-driver v1.4.3/go.mod h1:WcMNYLx/IlOxLe6JRJiv2uXuCz6zBLndR4S
go.mongodb.org/mongo-driver v1.4.4/go.mod h1:WcMNYLx/IlOxLe6JRJiv2uXuCz6zBLndR4SoGjYphSc= go.mongodb.org/mongo-driver v1.4.4/go.mod h1:WcMNYLx/IlOxLe6JRJiv2uXuCz6zBLndR4SoGjYphSc=
go.mongodb.org/mongo-driver v1.4.6/go.mod h1:WcMNYLx/IlOxLe6JRJiv2uXuCz6zBLndR4SoGjYphSc= go.mongodb.org/mongo-driver v1.4.6/go.mod h1:WcMNYLx/IlOxLe6JRJiv2uXuCz6zBLndR4SoGjYphSc=
go.mongodb.org/mongo-driver v1.5.1/go.mod h1:gRXCHX4Jo7J0IJ1oDQyUxF7jfy19UfxniMS4xxMmUqw= go.mongodb.org/mongo-driver v1.5.1/go.mod h1:gRXCHX4Jo7J0IJ1oDQyUxF7jfy19UfxniMS4xxMmUqw=
go.mongodb.org/mongo-driver v1.5.2 h1:AsxOLoJTgP6YNM0fXWw4OjdluYmWzQYp+lFJL7xu9fU=
go.mongodb.org/mongo-driver v1.5.2/go.mod h1:gRXCHX4Jo7J0IJ1oDQyUxF7jfy19UfxniMS4xxMmUqw= go.mongodb.org/mongo-driver v1.5.2/go.mod h1:gRXCHX4Jo7J0IJ1oDQyUxF7jfy19UfxniMS4xxMmUqw=
go.mongodb.org/mongo-driver v1.7.0 h1:hHrvOBWlWB2c7+8Gh/Xi5jj82AgidK/t7KVXBZ+IyUA=
go.mongodb.org/mongo-driver v1.7.0/go.mod h1:Q4oFMbo1+MSNqICAdYMlC/zSTrwCogR4R8NzkI+yfU8=
go.mozilla.org/pkcs7 v0.0.0-20200128120323-432b2356ecb1/go.mod h1:SNgMg+EgDFwmvSmLRTNKC5fegJjB7v23qTQ0XLGUNHk= go.mozilla.org/pkcs7 v0.0.0-20200128120323-432b2356ecb1/go.mod h1:SNgMg+EgDFwmvSmLRTNKC5fegJjB7v23qTQ0XLGUNHk=
go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
@@ -2735,8 +2737,9 @@ golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210903162142-ad29c8ab022f h1:w6wWR0H+nyVpbSAQbzVEIACVyr/h8l/BEkY6Sokc7Eg=
golang.org/x/net v0.0.0-20210903162142-ad29c8ab022f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210903162142-ad29c8ab022f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211013171255-e13a2654a71e h1:Xj+JO91noE97IN6F/7WZxzC5QE6yENAQPrwIYhW3bsA=
golang.org/x/net v0.0.0-20211013171255-e13a2654a71e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20181106182150-f42d05182288/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181106182150-f42d05182288/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=

View File

@@ -40,4 +40,5 @@ export interface FeatureToggles {
validatedQueries?: boolean; validatedQueries?: boolean;
swaggerUi?: boolean; swaggerUi?: boolean;
featureHighlights?: boolean; featureHighlights?: boolean;
migrationLocking?: boolean;
} }

View File

@@ -12,6 +12,9 @@ import (
"github.com/grafana/grafana/pkg/cmd/grafana-cli/services" "github.com/grafana/grafana/pkg/cmd/grafana-cli/services"
"github.com/grafana/grafana/pkg/cmd/grafana-cli/utils" "github.com/grafana/grafana/pkg/cmd/grafana-cli/utils"
"github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/hooks"
"github.com/grafana/grafana/pkg/services/licensing"
"github.com/grafana/grafana/pkg/services/sqlstore" "github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/services/sqlstore/migrations" "github.com/grafana/grafana/pkg/services/sqlstore/migrations"
"github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/setting"
@@ -55,7 +58,15 @@ func runDbCommand(command func(commandLine utils.CommandLine, sqlStore *sqlstore
if err != nil { if err != nil {
return errutil.Wrap("failed to initialize tracer service", err) return errutil.Wrap("failed to initialize tracer service", err)
} }
sqlStore, err := sqlstore.ProvideService(cfg, nil, bus.GetBus(), &migrations.OSSMigrations{}, tracer)
hooksService := hooks.ProvideService()
ossLicensingService := licensing.ProvideService(cfg, hooksService)
featureManager, err := featuremgmt.ProvideManagerService(cfg, ossLicensingService)
if err != nil {
return errutil.Wrap("failed to initialize feature manager service", err)
}
sqlStore, err := sqlstore.ProvideService(cfg, nil, bus.GetBus(), &migrations.OSSMigrations{}, tracer, featureManager)
if err != nil { if err != nil {
return errutil.Wrap("failed to initialize SQL store", err) return errutil.Wrap("failed to initialize SQL store", err)
} }

View File

@@ -131,5 +131,10 @@ var (
Description: "Highlight Enterprise features", Description: "Highlight Enterprise features",
State: FeatureStateStable, State: FeatureStateStable,
}, },
{
Name: "migrationLocking",
Description: "Lock database during migrations",
State: FeatureStateBeta,
},
} }
) )

View File

@@ -98,4 +98,8 @@ const (
// FlagFeatureHighlights // FlagFeatureHighlights
// Highlight Enterprise features // Highlight Enterprise features
FlagFeatureHighlights = "featureHighlights" FlagFeatureHighlights = "featureHighlights"
// FlagMigrationLocking
// Lock database during migrations
FlagMigrationLocking = "migrationLocking"
) )

View File

@@ -165,7 +165,7 @@ func TestMigrations(t *testing.T) {
acmigrator := migrator.NewMigrator(x, tc.config) acmigrator := migrator.NewMigrator(x, tc.config)
acmig.AddTeamMembershipMigrations(acmigrator) acmig.AddTeamMembershipMigrations(acmigrator)
errRunningMig := acmigrator.Start() errRunningMig := acmigrator.Start(false, 0)
require.NoError(t, errRunningMig) require.NoError(t, errRunningMig)
for _, user := range users { for _, user := range users {
@@ -221,7 +221,7 @@ func setupTestDB(t *testing.T) *xorm.Engine {
migrations := &migrations.OSSMigrations{} migrations := &migrations.OSSMigrations{}
migrations.AddMigration(mg) migrations.AddMigration(mg)
err = mg.Start() err = mg.Start(false, 0)
require.NoError(t, err) require.NoError(t, err)
return x return x

View File

@@ -1,10 +1,16 @@
package migrations package migrations
import ( import (
"errors"
"fmt" "fmt"
"os"
"strings" "strings"
"sync"
"sync/atomic"
"testing" "testing"
"github.com/go-sql-driver/mysql"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"xorm.io/xorm" "xorm.io/xorm"
@@ -32,7 +38,7 @@ func TestMigrations(t *testing.T) {
migrations.AddMigration(mg) migrations.AddMigration(mg)
expectedMigrations := mg.GetMigrationIDs(true) expectedMigrations := mg.GetMigrationIDs(true)
err = mg.Start() err = mg.Start(false, 0)
require.NoError(t, err) require.NoError(t, err)
has, err := x.SQL(query).Get(&result) has, err := x.SQL(query).Get(&result)
@@ -44,7 +50,7 @@ func TestMigrations(t *testing.T) {
mg = NewMigrator(x, &setting.Cfg{}) mg = NewMigrator(x, &setting.Cfg{})
migrations.AddMigration(mg) migrations.AddMigration(mg)
err = mg.Start() err = mg.Start(false, 0)
require.NoError(t, err) require.NoError(t, err)
has, err = x.SQL(query).Get(&result) has, err = x.SQL(query).Get(&result)
@@ -53,6 +59,179 @@ func TestMigrations(t *testing.T) {
checkStepsAndDatabaseMatch(t, mg, expectedMigrations) checkStepsAndDatabaseMatch(t, mg, expectedMigrations)
} }
func TestMigrationLock(t *testing.T) {
dbType := getDBType()
if dbType == SQLite {
t.Skip()
}
testDB := getTestDB(dbType)
x, err := xorm.NewEngine(testDB.DriverName, testDB.ConnStr)
require.NoError(t, err)
dialect := NewDialect(x)
sess := x.NewSession()
t.Cleanup(func() {
sess.Close()
})
cfg := LockCfg{Session: sess}
t.Run("obtaining lock should succeed", func(t *testing.T) {
err := dialect.Lock(cfg)
require.NoError(t, err)
t.Run("releasing previously obtained lock should succeed", func(t *testing.T) {
err := dialect.Unlock(cfg)
require.NoError(t, err)
t.Run("releasing already released lock should fail", func(t *testing.T) {
err := dialect.Unlock(cfg)
require.Error(t, err)
assert.ErrorIs(t, err, ErrReleaseLockDB)
})
})
})
t.Run("obtaining lock twice should succeed", func(t *testing.T) {
err = dialect.Lock(cfg)
require.NoError(t, err)
err = dialect.Lock(cfg)
require.NoError(t, err)
t.Cleanup(func() {
err := dialect.Unlock(cfg)
require.NoError(t, err)
err = dialect.Unlock(cfg)
require.NoError(t, err)
})
})
t.Run("obtaining same lock from another session should fail", func(t *testing.T) {
x2, err := xorm.NewEngine(testDB.DriverName, testDB.ConnStr)
require.NoError(t, err)
sess2 := x2.NewSession()
d2 := NewDialect(x2)
err = dialect.Lock(cfg)
require.NoError(t, err)
err = d2.Lock(LockCfg{Session: sess2})
require.Error(t, err)
assert.ErrorIs(t, err, ErrLockDB)
t.Cleanup(func() {
err := dialect.Unlock(cfg)
require.NoError(t, err)
})
})
t.Run("obtaining lock for a another database should succeed", func(t *testing.T) {
err := dialect.Lock(cfg)
require.NoError(t, err)
x, err := xorm.NewEngine(testDB.DriverName, replaceDBName(t, testDB.ConnStr, dbType))
require.NoError(t, err)
d := NewDialect(x)
err = d.Lock(cfg)
require.NoError(t, err)
t.Cleanup(func() {
err := dialect.Unlock(cfg)
require.NoError(t, err)
err = d.Unlock(cfg)
require.NoError(t, err)
})
})
}
func TestMigratorLocking(t *testing.T) {
dbType := getDBType()
testDB := getTestDB(dbType)
x, err := xorm.NewEngine(testDB.DriverName, testDB.ConnStr)
require.NoError(t, err)
err = NewDialect(x).CleanDB()
require.NoError(t, err)
mg := NewMigrator(x, &setting.Cfg{})
migrations := &OSSMigrations{}
migrations.AddMigration(mg)
var errorNum int64
t.Run("when concurrent migrations for the same migrator occur, the second one should fail", func(t *testing.T) {
for i := 0; i < 2; i++ {
i := i // capture i variable
t.Run(fmt.Sprintf("run migration %d", i), func(t *testing.T) {
t.Parallel()
err = mg.Start(true, 0)
if err != nil {
if errors.Is(err, ErrMigratorIsLocked) {
atomic.AddInt64(&errorNum, 1)
}
}
})
}
})
assert.Equal(t, int64(1), errorNum)
}
func TestDatabaseLocking(t *testing.T) {
dbType := getDBType()
// skip for SQLite since there is no database locking (only migrator locking)
if dbType == SQLite {
t.Skip()
}
testDB := getTestDB(dbType)
x, err := xorm.NewEngine(testDB.DriverName, testDB.ConnStr)
require.NoError(t, err)
err = NewDialect(x).CleanDB()
require.NoError(t, err)
mg1 := NewMigrator(x, &setting.Cfg{})
migrations := &OSSMigrations{}
migrations.AddMigration(mg1)
reg := registry{
migrators: make(map[int]*Migrator, 2),
}
reg.set(0, mg1)
mg2 := NewMigrator(x, &setting.Cfg{})
migrations.AddMigration(mg2)
reg.set(1, mg2)
var errorNum int64
t.Run("when concurrent migrations occur for different migrators occur, the second one should fail", func(t *testing.T) {
for i := 0; i < 2; i++ {
i := i // capture i variable
t.Run(fmt.Sprintf("run migration %d", i), func(t *testing.T) {
mg, err := reg.get(i)
require.NoError(t, err)
t.Parallel()
err = mg.Start(true, 0)
if err != nil {
assert.ErrorIs(t, err, ErrLockDB)
if errors.Is(err, ErrLockDB) {
atomic.AddInt64(&errorNum, 1)
}
}
})
}
})
assert.Equal(t, int64(1), errorNum)
}
func checkStepsAndDatabaseMatch(t *testing.T, mg *Migrator, expected []string) { func checkStepsAndDatabaseMatch(t *testing.T, mg *Migrator, expected []string) {
t.Helper() t.Helper()
log, err := mg.GetMigrationLog() log, err := mg.GetMigrationLog()
@@ -91,3 +270,61 @@ func checkStepsAndDatabaseMatch(t *testing.T, mg *Migrator, expected []string) {
} }
require.Failf(t, "the number of migrations does not match log in database", msg) require.Failf(t, "the number of migrations does not match log in database", msg)
} }
func getDBType() string {
dbType := SQLite
// environment variable present for test db?
if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
dbType = db
}
return dbType
}
func getTestDB(dbType string) sqlutil.TestDB {
switch dbType {
case "mysql":
return sqlutil.MySQLTestDB()
case "postgres":
return sqlutil.PostgresTestDB()
default:
return sqlutil.SQLite3TestDB()
}
}
func replaceDBName(t *testing.T, connStr, dbType string) string {
switch dbType {
case "mysql":
cfg, err := mysql.ParseDSN(connStr)
require.NoError(t, err)
cfg.DBName = "grafana_ds_tests"
return cfg.FormatDSN()
case "postgres":
return strings.Replace(connStr, "dbname=grafanatest", "dbname=grafanadstest", 1)
default:
return connStr
}
}
type registry struct {
mu sync.Mutex
migrators map[int]*Migrator
}
func (r *registry) get(i int) (*Migrator, error) {
r.mu.Lock()
defer r.mu.Unlock()
m, ok := r.migrators[i]
if !ok {
return nil, fmt.Errorf("invalid index: %d", i)
}
return m, nil
}
func (r *registry) set(i int, mg *Migrator) {
r.mu.Lock()
defer r.mu.Unlock()
r.migrators[i] = mg
}

View File

@@ -7,6 +7,11 @@ import (
"xorm.io/xorm" "xorm.io/xorm"
) )
var (
ErrLockDB = fmt.Errorf("failed to obtain lock")
ErrReleaseLockDB = fmt.Errorf("failed to release lock")
)
type Dialect interface { type Dialect interface {
DriverName() string DriverName() string
Quote(string) string Quote(string) string
@@ -53,6 +58,13 @@ type Dialect interface {
IsUniqueConstraintViolation(err error) bool IsUniqueConstraintViolation(err error) bool
ErrorMessage(err error) string ErrorMessage(err error) string
IsDeadlock(err error) bool IsDeadlock(err error) bool
Lock(LockCfg) error
Unlock(LockCfg) error
}
type LockCfg struct {
Session *xorm.Session
Timeout int
} }
type dialectFunc func(*xorm.Engine) Dialect type dialectFunc func(*xorm.Engine) Dialect
@@ -288,3 +300,11 @@ func (b *BaseDialect) TruncateDBTables() error {
func (b *BaseDialect) UpsertSQL(tableName string, keyCols, updateCols []string) string { func (b *BaseDialect) UpsertSQL(tableName string, keyCols, updateCols []string) string {
return "" return ""
} }
func (b *BaseDialect) Lock(_ LockCfg) error {
return nil
}
func (b *BaseDialect) Unlock(_ LockCfg) error {
return nil
}

View File

@@ -7,6 +7,7 @@ import (
_ "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql"
_ "github.com/lib/pq" _ "github.com/lib/pq"
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
"go.uber.org/atomic"
"xorm.io/xorm" "xorm.io/xorm"
"github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/log"
@@ -14,12 +15,18 @@ import (
"github.com/grafana/grafana/pkg/util/errutil" "github.com/grafana/grafana/pkg/util/errutil"
) )
var (
ErrMigratorIsLocked = fmt.Errorf("migrator is locked")
ErrMigratorIsUnlocked = fmt.Errorf("migrator is unlocked")
)
type Migrator struct { type Migrator struct {
DBEngine *xorm.Engine DBEngine *xorm.Engine
Dialect Dialect Dialect Dialect
migrations []Migration migrations []Migration
Logger log.Logger Logger log.Logger
Cfg *setting.Cfg Cfg *setting.Cfg
isLocked atomic.Bool
} }
type MigrationLog struct { type MigrationLog struct {
@@ -87,7 +94,32 @@ func (mg *Migrator) GetMigrationLog() (map[string]MigrationLog, error) {
return logMap, nil return logMap, nil
} }
func (mg *Migrator) Start() error { func (mg *Migrator) Start(isDatabaseLockingEnabled bool, lockAttemptTimeout int) (err error) {
if !isDatabaseLockingEnabled {
return mg.run()
}
return mg.InTransaction(func(sess *xorm.Session) error {
mg.Logger.Info("Locking database")
if err := casRestoreOnErr(&mg.isLocked, false, true, ErrMigratorIsLocked, mg.Dialect.Lock, LockCfg{Session: sess, Timeout: lockAttemptTimeout}); err != nil {
mg.Logger.Error("Failed to lock database", "error", err)
return err
}
defer func() {
mg.Logger.Info("Unlocking database")
unlockErr := casRestoreOnErr(&mg.isLocked, true, false, ErrMigratorIsUnlocked, mg.Dialect.Unlock, LockCfg{Session: sess})
if unlockErr != nil {
mg.Logger.Error("Failed to unlock database", "error", unlockErr)
}
}()
// migration will run inside a nested transaction
return mg.run()
})
}
func (mg *Migrator) run() (err error) {
mg.Logger.Info("Starting DB migrations") mg.Logger.Info("Starting DB migrations")
logMap, err := mg.GetMigrationLog() logMap, err := mg.GetMigrationLog()
@@ -211,3 +243,15 @@ func (mg *Migrator) InTransaction(callback dbTransactionFunc) error {
return nil return nil
} }
func casRestoreOnErr(lock *atomic.Bool, o, n bool, casErr error, f func(LockCfg) error, lockCfg LockCfg) error {
if !lock.CAS(o, n) {
return casErr
}
if err := f(lockCfg); err != nil {
// Automatically unlock/lock on error
lock.Store(o)
return err
}
return nil
}

View File

@@ -1,6 +1,7 @@
package migrator package migrator
import ( import (
"database/sql"
"errors" "errors"
"fmt" "fmt"
"strconv" "strconv"
@@ -8,6 +9,7 @@ import (
"github.com/VividCortex/mysqlerr" "github.com/VividCortex/mysqlerr"
"github.com/go-sql-driver/mysql" "github.com/go-sql-driver/mysql"
"github.com/golang-migrate/migrate/v4/database"
"github.com/grafana/grafana/pkg/util/errutil" "github.com/grafana/grafana/pkg/util/errutil"
"xorm.io/xorm" "xorm.io/xorm"
) )
@@ -225,3 +227,66 @@ func (db *MySQLDialect) UpsertSQL(tableName string, keyCols, updateCols []string
) )
return s return s
} }
func (db *MySQLDialect) Lock(cfg LockCfg) error {
query := "SELECT GET_LOCK(?, ?)"
var success sql.NullBool
lockName, err := db.getLockName()
if err != nil {
return fmt.Errorf("failed to generate lock name: %w", err)
}
// trying to obtain the lock with the specific name
// the lock is exclusive per session and is released explicitly by executing RELEASE_LOCK() or implicitly when the session terminates
// it returns 1 if the lock was obtained successfully,
// 0 if the attempt timed out (for example, because another client has previously locked the name),
// or NULL if an error occurred
// starting from MySQL 5.7 it is even possible for a given session to acquire multiple locks for the same name
// however other sessions cannot acquire a lock with that name until the acquiring session releases all its locks for the name.
_, err = cfg.Session.SQL(query, lockName, cfg.Timeout).Get(&success)
if err != nil {
return err
}
if !success.Valid || !success.Bool {
return ErrLockDB
}
return nil
}
func (db *MySQLDialect) Unlock(cfg LockCfg) error {
query := "SELECT RELEASE_LOCK(?)"
var success sql.NullBool
lockName, err := db.getLockName()
if err != nil {
return fmt.Errorf("failed to generate lock name: %w", err)
}
// trying to release the lock with the specific name
// it returns 1 if the lock was released,
// 0 if the lock was not established by this thread (in which case the lock is not released),
// and NULL if the named lock did not exist (it was never obtained by a call to GET_LOCK() or if it has previously been released)
_, err = cfg.Session.SQL(query, lockName).Get(&success)
if err != nil {
return err
}
if !success.Valid || !success.Bool {
return ErrReleaseLockDB
}
return nil
}
func (db *MySQLDialect) getLockName() (string, error) {
cfg, err := mysql.ParseDSN(db.engine.DataSourceName())
if err != nil {
return "", err
}
s, err := database.GenerateAdvisoryLockId(cfg.DBName)
if err != nil {
return "", fmt.Errorf("failed to generate advisory lock key: %w", err)
}
return s, nil
}

View File

@@ -3,9 +3,11 @@ package migrator
import ( import (
"errors" "errors"
"fmt" "fmt"
"regexp"
"strconv" "strconv"
"strings" "strings"
"github.com/golang-migrate/migrate/v4/database"
"github.com/lib/pq" "github.com/lib/pq"
"github.com/grafana/grafana/pkg/util/errutil" "github.com/grafana/grafana/pkg/util/errutil"
@@ -257,3 +259,76 @@ func (db *PostgresDialect) UpsertSQL(tableName string, keyCols, updateCols []str
) )
return s return s
} }
func (db *PostgresDialect) Lock(cfg LockCfg) error {
// trying to obtain the lock for a resource identified by a 64-bit or 32-bit key value
// the lock is exclusive: multiple lock requests stack, so that if the same resource is locked three times
// it must then be unlocked three times to be released for other sessions' use.
// it will either obtain the lock immediately and return true,
// or return false if the lock cannot be acquired immediately.
query := "SELECT pg_try_advisory_lock(?)"
var success bool
key, err := db.getLockKey()
if err != nil {
return fmt.Errorf("failed to generate advisory lock key: %w", err)
}
_, err = cfg.Session.SQL(query, key).Get(&success)
if err != nil {
return err
}
if !success {
return ErrLockDB
}
return nil
}
func (db *PostgresDialect) Unlock(cfg LockCfg) error {
// trying to release a previously-acquired exclusive session level advisory lock.
// it will either return true if the lock is successfully released or
// false if the lock was not held (in addition an SQL warning will be reported by the server)
query := "SELECT pg_advisory_unlock(?)"
var success bool
key, err := db.getLockKey()
if err != nil {
return fmt.Errorf("failed to generate advisory lock key: %w", err)
}
_, err = cfg.Session.SQL(query, key).Get(&success)
if err != nil {
return err
}
if !success {
return ErrReleaseLockDB
}
return nil
}
func getDBName(dsn string) (string, error) {
if strings.HasPrefix(dsn, "postgres://") || strings.HasPrefix(dsn, "postgresql://") {
parsedDSN, err := pq.ParseURL(dsn)
if err != nil {
return "", err
}
dsn = parsedDSN
}
re := regexp.MustCompile(`dbname=(\w+)`)
submatch := re.FindSubmatch([]byte(dsn))
if len(submatch) < 2 {
return "", fmt.Errorf("failed to get database name")
}
return string(submatch[1]), nil
}
func (db *PostgresDialect) getLockKey() (string, error) {
dbName, err := getDBName(db.engine.DataSourceName())
if err != nil {
return "", err
}
key, err := database.GenerateAdvisoryLockId(dbName)
if err != nil {
return "", err
}
return key, nil
}

View File

@@ -546,7 +546,7 @@ func (m *SQLStoreMock) UpdateDataSource(ctx context.Context, cmd *models.UpdateD
return m.ExpectedError return m.ExpectedError
} }
func (m *SQLStoreMock) Migrate() error { func (m *SQLStoreMock) Migrate(_ bool) error {
return m.ExpectedError return m.ExpectedError
} }

View File

@@ -56,8 +56,7 @@ type SQLStore struct {
tracer tracing.Tracer tracer tracing.Tracer
} }
func ProvideService(cfg *setting.Cfg, cacheService *localcache.CacheService, bus bus.Bus, migrations registry.DatabaseMigrator, tracer tracing.Tracer, func ProvideService(cfg *setting.Cfg, cacheService *localcache.CacheService, bus bus.Bus, migrations registry.DatabaseMigrator, tracer tracing.Tracer, features *featuremgmt.FeatureManager) (*SQLStore, error) {
) (*SQLStore, error) {
// This change will make xorm use an empty default schema for postgres and // This change will make xorm use an empty default schema for postgres and
// by that mimic the functionality of how it was functioning before // by that mimic the functionality of how it was functioning before
// xorm's changes above. // xorm's changes above.
@@ -67,7 +66,7 @@ func ProvideService(cfg *setting.Cfg, cacheService *localcache.CacheService, bus
return nil, err return nil, err
} }
if err := s.Migrate(); err != nil { if err := s.Migrate(features.IsEnabled(featuremgmt.FlagMigrationLocking)); err != nil {
return nil, err return nil, err
} }
@@ -153,7 +152,7 @@ func newSQLStore(cfg *setting.Cfg, cacheService *localcache.CacheService, b bus.
// Migrate performs database migrations. // Migrate performs database migrations.
// Has to be done in a second phase (after initialization), since other services can register migrations during // Has to be done in a second phase (after initialization), since other services can register migrations during
// the initialization phase. // the initialization phase.
func (ss *SQLStore) Migrate() error { func (ss *SQLStore) Migrate(isDatabaseLockingEnabled bool) error {
if ss.dbCfg.SkipMigrations { if ss.dbCfg.SkipMigrations {
return nil return nil
} }
@@ -161,7 +160,7 @@ func (ss *SQLStore) Migrate() error {
migrator := migrator.NewMigrator(ss.engine, ss.Cfg) migrator := migrator.NewMigrator(ss.engine, ss.Cfg)
ss.migrations.AddMigration(migrator) ss.migrations.AddMigration(migrator)
return migrator.Start() return migrator.Start(isDatabaseLockingEnabled, ss.dbCfg.MigrationLockAttemptTimeout)
} }
// Sync syncs changes to the database. // Sync syncs changes to the database.
@@ -443,6 +442,7 @@ func (ss *SQLStore) readConfig() error {
ss.dbCfg.CacheMode = sec.Key("cache_mode").MustString("private") ss.dbCfg.CacheMode = sec.Key("cache_mode").MustString("private")
ss.dbCfg.SkipMigrations = sec.Key("skip_migrations").MustBool() ss.dbCfg.SkipMigrations = sec.Key("skip_migrations").MustBool()
ss.dbCfg.MigrationLockAttemptTimeout = sec.Key("locking_attempt_timeout_sec").MustInt()
return nil return nil
} }
@@ -562,7 +562,7 @@ func initTestDB(migration registry.DatabaseMigrator, opts ...InitTestDBOpt) (*SQ
return nil, err return nil, err
} }
if err := testSQLStore.Migrate(); err != nil { if err := testSQLStore.Migrate(false); err != nil {
return nil, err return nil, err
} }
@@ -622,23 +622,24 @@ func IsTestDBMSSQL() bool {
} }
type DatabaseConfig struct { type DatabaseConfig struct {
Type string Type string
Host string Host string
Name string Name string
User string User string
Pwd string Pwd string
Path string Path string
SslMode string SslMode string
CaCertPath string CaCertPath string
ClientKeyPath string ClientKeyPath string
ClientCertPath string ClientCertPath string
ServerCertName string ServerCertName string
ConnectionString string ConnectionString string
IsolationLevel string IsolationLevel string
MaxOpenConn int MaxOpenConn int
MaxIdleConn int MaxIdleConn int
ConnMaxLifetime int ConnMaxLifetime int
CacheMode string CacheMode string
UrlQueryParams map[string][]string UrlQueryParams map[string][]string
SkipMigrations bool SkipMigrations bool
MigrationLockAttemptTimeout int
} }

View File

@@ -125,7 +125,7 @@ type Store interface {
DeleteDataSource(ctx context.Context, cmd *models.DeleteDataSourceCommand) error DeleteDataSource(ctx context.Context, cmd *models.DeleteDataSourceCommand) error
AddDataSource(ctx context.Context, cmd *models.AddDataSourceCommand) error AddDataSource(ctx context.Context, cmd *models.AddDataSourceCommand) error
UpdateDataSource(ctx context.Context, cmd *models.UpdateDataSourceCommand) error UpdateDataSource(ctx context.Context, cmd *models.UpdateDataSourceCommand) error
Migrate() error Migrate(bool) error
Sync() error Sync() error
Reset() error Reset() error
Quote(value string) string Quote(value string) string