mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Database: Make dialects independent of xorm Engine (#69955)
* make dialects independent of xorm Engine * goimports
This commit is contained in:
parent
e7d8d48407
commit
a6279b2d62
@ -256,7 +256,7 @@ func setupTestDB(t *testing.T) *xorm.Engine {
|
||||
x, err := xorm.NewEngine(testDB.DriverName, testDB.ConnStr)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = migrator.NewDialect(x).CleanDB()
|
||||
err = migrator.NewDialect(x.DriverName()).CleanDB(x)
|
||||
require.NoError(t, err)
|
||||
|
||||
mg := migrator.NewMigrator(x, &setting.Cfg{
|
||||
|
@ -28,7 +28,7 @@ func TestMigrations(t *testing.T) {
|
||||
x, err := xorm.NewEngine(testDB.DriverName, testDB.ConnStr)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = NewDialect(x).CleanDB()
|
||||
err = NewDialect(x.DriverName()).CleanDB(x)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = x.SQL(query).Get(&result)
|
||||
@ -71,14 +71,17 @@ func TestMigrationLock(t *testing.T) {
|
||||
x, err := xorm.NewEngine(testDB.DriverName, testDB.ConnStr)
|
||||
require.NoError(t, err)
|
||||
|
||||
dialect := NewDialect(x)
|
||||
dialect := NewDialect(x.DriverName())
|
||||
|
||||
sess := x.NewSession()
|
||||
t.Cleanup(func() {
|
||||
sess.Close()
|
||||
})
|
||||
|
||||
cfg := LockCfg{Session: sess}
|
||||
cfg := LockCfg{
|
||||
Session: sess,
|
||||
Key: "test",
|
||||
}
|
||||
|
||||
t.Run("obtaining lock should succeed", func(t *testing.T) {
|
||||
err := dialect.Lock(cfg)
|
||||
@ -117,7 +120,7 @@ func TestMigrationLock(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
sess2 := x2.NewSession()
|
||||
|
||||
d2 := NewDialect(x2)
|
||||
d2 := NewDialect(x2.DriverName())
|
||||
|
||||
err = dialect.Lock(cfg)
|
||||
require.NoError(t, err)
|
||||
@ -139,7 +142,7 @@ func TestMigrationLock(t *testing.T) {
|
||||
x, err := xorm.NewEngine(testDB.DriverName, replaceDBName(t, testDB.ConnStr, dbType))
|
||||
require.NoError(t, err)
|
||||
|
||||
d := NewDialect(x)
|
||||
d := NewDialect(x.DriverName())
|
||||
err = d.Lock(cfg)
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -165,7 +168,7 @@ func TestMigratorLocking(t *testing.T) {
|
||||
x, err := xorm.NewEngine(testDB.DriverName, testDB.ConnStr)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = NewDialect(x).CleanDB()
|
||||
err = NewDialect(x.DriverName()).CleanDB(x)
|
||||
require.NoError(t, err)
|
||||
|
||||
mg := NewMigrator(x, &setting.Cfg{})
|
||||
@ -202,7 +205,7 @@ func TestDatabaseLocking(t *testing.T) {
|
||||
x, err := xorm.NewEngine(testDB.DriverName, testDB.ConnStr)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = NewDialect(x).CleanDB()
|
||||
err = NewDialect(x.DriverName()).CleanDB(x)
|
||||
require.NoError(t, err)
|
||||
|
||||
mg1 := NewMigrator(x, &setting.Cfg{})
|
||||
|
@ -627,7 +627,7 @@ func setupTestDB(t *testing.T) *xorm.Engine {
|
||||
x, err := xorm.NewEngine(testDB.DriverName, testDB.ConnStr)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = migrator.NewDialect(x).CleanDB()
|
||||
err = migrator.NewDialect(x.DriverName()).CleanDB(x)
|
||||
require.NoError(t, err)
|
||||
|
||||
mg := migrator.NewMigrator(x, &setting.Cfg{Raw: ini.Empty()})
|
||||
|
@ -61,8 +61,8 @@ type Dialect interface {
|
||||
PreInsertId(table string, sess *xorm.Session) error
|
||||
PostInsertId(table string, sess *xorm.Session) error
|
||||
|
||||
CleanDB() error
|
||||
TruncateDBTables() error
|
||||
CleanDB(engine *xorm.Engine) error
|
||||
TruncateDBTables(engine *xorm.Engine) error
|
||||
NoOpSQL() string
|
||||
|
||||
IsUniqueConstraintViolation(err error) bool
|
||||
@ -70,14 +70,17 @@ type Dialect interface {
|
||||
IsDeadlock(err error) bool
|
||||
Lock(LockCfg) error
|
||||
Unlock(LockCfg) error
|
||||
|
||||
GetDBName(string) (string, error)
|
||||
}
|
||||
|
||||
type LockCfg struct {
|
||||
Session *xorm.Session
|
||||
Key string
|
||||
Timeout int
|
||||
}
|
||||
|
||||
type dialectFunc func(*xorm.Engine) Dialect
|
||||
type dialectFunc func() Dialect
|
||||
|
||||
var supportedDialects = map[string]dialectFunc{
|
||||
MySQL: NewMysqlDialect,
|
||||
@ -88,18 +91,16 @@ var supportedDialects = map[string]dialectFunc{
|
||||
Postgres + "WithHooks": NewPostgresDialect,
|
||||
}
|
||||
|
||||
func NewDialect(engine *xorm.Engine) Dialect {
|
||||
name := engine.DriverName()
|
||||
if fn, exist := supportedDialects[name]; exist {
|
||||
return fn(engine)
|
||||
func NewDialect(driverName string) Dialect {
|
||||
if fn, exist := supportedDialects[driverName]; exist {
|
||||
return fn()
|
||||
}
|
||||
|
||||
panic("Unsupported database type: " + name)
|
||||
panic("Unsupported database type: " + driverName)
|
||||
}
|
||||
|
||||
type BaseDialect struct {
|
||||
dialect Dialect
|
||||
engine *xorm.Engine
|
||||
driverName string
|
||||
}
|
||||
|
||||
@ -302,7 +303,7 @@ func (b *BaseDialect) PostInsertId(table string, sess *xorm.Session) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *BaseDialect) CleanDB() error {
|
||||
func (b *BaseDialect) CleanDB(engine *xorm.Engine) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -310,7 +311,7 @@ func (b *BaseDialect) NoOpSQL() string {
|
||||
return "SELECT 0;"
|
||||
}
|
||||
|
||||
func (b *BaseDialect) TruncateDBTables() error {
|
||||
func (b *BaseDialect) TruncateDBTables(engine *xorm.Engine) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -330,3 +331,7 @@ func (b *BaseDialect) Unlock(_ LockCfg) error {
|
||||
func (b *BaseDialect) OrderBy(order string) string {
|
||||
return order
|
||||
}
|
||||
|
||||
func (b *BaseDialect) GetDBName(_ string) (string, error) {
|
||||
return "", nil
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"time"
|
||||
|
||||
_ "github.com/go-sql-driver/mysql"
|
||||
"github.com/golang-migrate/migrate/v4/database"
|
||||
_ "github.com/lib/pq"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
"go.uber.org/atomic"
|
||||
@ -51,7 +52,7 @@ func NewScopedMigrator(engine *xorm.Engine, cfg *setting.Cfg, scope string) *Mig
|
||||
DBEngine: engine,
|
||||
migrations: make([]Migration, 0),
|
||||
migrationIds: make(map[string]struct{}),
|
||||
Dialect: NewDialect(engine),
|
||||
Dialect: NewDialect(engine.DriverName()),
|
||||
}
|
||||
if scope == "" {
|
||||
mg.tableName = "migration_log"
|
||||
@ -142,16 +143,31 @@ func (mg *Migrator) Start(isDatabaseLockingEnabled bool, lockAttemptTimeout int)
|
||||
return mg.run()
|
||||
}
|
||||
|
||||
dbName, err := mg.Dialect.GetDBName(mg.DBEngine.DataSourceName())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
key, err := database.GenerateAdvisoryLockId(dbName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return mg.InTransaction(func(sess *xorm.Session) error {
|
||||
mg.Logger.Info("Locking database")
|
||||
if err := casRestoreOnErr(&mg.isLocked, false, true, ErrMigratorIsLocked, mg.Dialect.Lock, LockCfg{Session: sess, Timeout: lockAttemptTimeout}); err != nil {
|
||||
lockCfg := LockCfg{
|
||||
Session: sess,
|
||||
Key: key,
|
||||
Timeout: lockAttemptTimeout,
|
||||
}
|
||||
|
||||
if err := casRestoreOnErr(&mg.isLocked, false, true, ErrMigratorIsLocked, mg.Dialect.Lock, lockCfg); err != nil {
|
||||
mg.Logger.Error("Failed to lock database", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
mg.Logger.Info("Unlocking database")
|
||||
unlockErr := casRestoreOnErr(&mg.isLocked, true, false, ErrMigratorIsUnlocked, mg.Dialect.Unlock, LockCfg{Session: sess})
|
||||
unlockErr := casRestoreOnErr(&mg.isLocked, true, false, ErrMigratorIsUnlocked, mg.Dialect.Unlock, lockCfg)
|
||||
if unlockErr != nil {
|
||||
mg.Logger.Error("Failed to unlock database", "error", unlockErr)
|
||||
}
|
||||
|
@ -9,7 +9,6 @@ import (
|
||||
|
||||
"github.com/VividCortex/mysqlerr"
|
||||
"github.com/go-sql-driver/mysql"
|
||||
"github.com/golang-migrate/migrate/v4/database"
|
||||
"xorm.io/xorm"
|
||||
)
|
||||
|
||||
@ -17,10 +16,9 @@ type MySQLDialect struct {
|
||||
BaseDialect
|
||||
}
|
||||
|
||||
func NewMysqlDialect(engine *xorm.Engine) Dialect {
|
||||
func NewMysqlDialect() Dialect {
|
||||
d := MySQLDialect{}
|
||||
d.BaseDialect.dialect = &d
|
||||
d.BaseDialect.engine = engine
|
||||
d.BaseDialect.driverName = MySQL
|
||||
return &d
|
||||
}
|
||||
@ -133,12 +131,12 @@ func (db *MySQLDialect) RenameColumn(table Table, column *Column, newName string
|
||||
)
|
||||
}
|
||||
|
||||
func (db *MySQLDialect) CleanDB() error {
|
||||
tables, err := db.engine.DBMetas()
|
||||
func (db *MySQLDialect) CleanDB(engine *xorm.Engine) error {
|
||||
tables, err := engine.DBMetas()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sess := db.engine.NewSession()
|
||||
sess := engine.NewSession()
|
||||
defer sess.Close()
|
||||
|
||||
for _, table := range tables {
|
||||
@ -161,12 +159,12 @@ func (db *MySQLDialect) CleanDB() error {
|
||||
|
||||
// TruncateDBTables truncates all the tables.
|
||||
// A special case is the dashboard_acl table where we keep the default permissions.
|
||||
func (db *MySQLDialect) TruncateDBTables() error {
|
||||
tables, err := db.engine.DBMetas()
|
||||
func (db *MySQLDialect) TruncateDBTables(engine *xorm.Engine) error {
|
||||
tables, err := engine.DBMetas()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sess := db.engine.NewSession()
|
||||
sess := engine.NewSession()
|
||||
defer sess.Close()
|
||||
|
||||
for _, table := range tables {
|
||||
@ -265,11 +263,6 @@ func (db *MySQLDialect) Lock(cfg LockCfg) error {
|
||||
query := "SELECT GET_LOCK(?, ?)"
|
||||
var success sql.NullBool
|
||||
|
||||
lockName, err := db.getLockName()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to generate lock name: %w", err)
|
||||
}
|
||||
|
||||
// trying to obtain the lock with the specific name
|
||||
// the lock is exclusive per session and is released explicitly by executing RELEASE_LOCK() or implicitly when the session terminates
|
||||
// it returns 1 if the lock was obtained successfully,
|
||||
@ -277,7 +270,7 @@ func (db *MySQLDialect) Lock(cfg LockCfg) error {
|
||||
// or NULL if an error occurred
|
||||
// starting from MySQL 5.7 it is even possible for a given session to acquire multiple locks for the same name
|
||||
// however other sessions cannot acquire a lock with that name until the acquiring session releases all its locks for the name.
|
||||
_, err = cfg.Session.SQL(query, lockName, cfg.Timeout).Get(&success)
|
||||
_, err := cfg.Session.SQL(query, cfg.Key, cfg.Timeout).Get(&success)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -291,16 +284,11 @@ func (db *MySQLDialect) Unlock(cfg LockCfg) error {
|
||||
query := "SELECT RELEASE_LOCK(?)"
|
||||
var success sql.NullBool
|
||||
|
||||
lockName, err := db.getLockName()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to generate lock name: %w", err)
|
||||
}
|
||||
|
||||
// trying to release the lock with the specific name
|
||||
// it returns 1 if the lock was released,
|
||||
// 0 if the lock was not established by this thread (in which case the lock is not released),
|
||||
// and NULL if the named lock did not exist (it was never obtained by a call to GET_LOCK() or if it has previously been released)
|
||||
_, err = cfg.Session.SQL(query, lockName).Get(&success)
|
||||
_, err := cfg.Session.SQL(query, cfg.Key).Get(&success)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -310,16 +298,11 @@ func (db *MySQLDialect) Unlock(cfg LockCfg) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *MySQLDialect) getLockName() (string, error) {
|
||||
cfg, err := mysql.ParseDSN(db.engine.DataSourceName())
|
||||
func (db *MySQLDialect) GetDBName(dsn string) (string, error) {
|
||||
cfg, err := mysql.ParseDSN(dsn)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
s, err := database.GenerateAdvisoryLockId(cfg.DBName)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to generate advisory lock key: %w", err)
|
||||
}
|
||||
|
||||
return s, nil
|
||||
return cfg.DBName, nil
|
||||
}
|
||||
|
@ -7,7 +7,6 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/golang-migrate/migrate/v4/database"
|
||||
"github.com/lib/pq"
|
||||
"xorm.io/xorm"
|
||||
)
|
||||
@ -16,10 +15,9 @@ type PostgresDialect struct {
|
||||
BaseDialect
|
||||
}
|
||||
|
||||
func NewPostgresDialect(engine *xorm.Engine) Dialect {
|
||||
func NewPostgresDialect() Dialect {
|
||||
d := PostgresDialect{}
|
||||
d.BaseDialect.dialect = &d
|
||||
d.BaseDialect.engine = engine
|
||||
d.BaseDialect.driverName = Postgres
|
||||
return &d
|
||||
}
|
||||
@ -130,8 +128,8 @@ func (db *PostgresDialect) UpdateTableSQL(tableName string, columns []*Column) s
|
||||
return "ALTER TABLE " + db.Quote(tableName) + " " + strings.Join(statements, ", ") + ";"
|
||||
}
|
||||
|
||||
func (db *PostgresDialect) CleanDB() error {
|
||||
sess := db.engine.NewSession()
|
||||
func (db *PostgresDialect) CleanDB(engine *xorm.Engine) error {
|
||||
sess := engine.NewSession()
|
||||
defer sess.Close()
|
||||
|
||||
if _, err := sess.Exec("DROP SCHEMA public CASCADE;"); err != nil {
|
||||
@ -147,12 +145,12 @@ func (db *PostgresDialect) CleanDB() error {
|
||||
|
||||
// TruncateDBTables truncates all the tables.
|
||||
// A special case is the dashboard_acl table where we keep the default permissions.
|
||||
func (db *PostgresDialect) TruncateDBTables() error {
|
||||
tables, err := db.engine.DBMetas()
|
||||
func (db *PostgresDialect) TruncateDBTables(engine *xorm.Engine) error {
|
||||
tables, err := engine.DBMetas()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sess := db.engine.NewSession()
|
||||
sess := engine.NewSession()
|
||||
defer sess.Close()
|
||||
|
||||
for _, table := range tables {
|
||||
@ -303,11 +301,7 @@ func (db *PostgresDialect) Lock(cfg LockCfg) error {
|
||||
query := "SELECT pg_try_advisory_lock(?)"
|
||||
var success bool
|
||||
|
||||
key, err := db.getLockKey()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to generate advisory lock key: %w", err)
|
||||
}
|
||||
_, err = cfg.Session.SQL(query, key).Get(&success)
|
||||
_, err := cfg.Session.SQL(query, cfg.Key).Get(&success)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -338,11 +332,7 @@ func (db *PostgresDialect) Unlock(cfg LockCfg) error {
|
||||
query := "SELECT pg_advisory_unlock(?)"
|
||||
var success bool
|
||||
|
||||
key, err := db.getLockKey()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to generate advisory lock key: %w", err)
|
||||
}
|
||||
_, err = cfg.Session.SQL(query, key).Get(&success)
|
||||
_, err := cfg.Session.SQL(query, cfg.Key).Get(&success)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -352,7 +342,7 @@ func (db *PostgresDialect) Unlock(cfg LockCfg) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func getDBName(dsn string) (string, error) {
|
||||
func (db *PostgresDialect) GetDBName(dsn string) (string, error) {
|
||||
if strings.HasPrefix(dsn, "postgres://") || strings.HasPrefix(dsn, "postgresql://") {
|
||||
parsedDSN, err := pq.ParseURL(dsn)
|
||||
if err != nil {
|
||||
@ -367,15 +357,3 @@ func getDBName(dsn string) (string, error) {
|
||||
}
|
||||
return string(submatch[1]), nil
|
||||
}
|
||||
|
||||
func (db *PostgresDialect) getLockKey() (string, error) {
|
||||
dbName, err := getDBName(db.engine.DataSourceName())
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
key, err := database.GenerateAdvisoryLockId(dbName)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return key, nil
|
||||
}
|
||||
|
@ -13,10 +13,9 @@ type SQLite3 struct {
|
||||
BaseDialect
|
||||
}
|
||||
|
||||
func NewSQLite3Dialect(engine *xorm.Engine) Dialect {
|
||||
func NewSQLite3Dialect() Dialect {
|
||||
d := SQLite3{}
|
||||
d.BaseDialect.dialect = &d
|
||||
d.BaseDialect.engine = engine
|
||||
d.BaseDialect.driverName = SQLite
|
||||
return &d
|
||||
}
|
||||
@ -89,19 +88,19 @@ func (db *SQLite3) DropIndexSQL(tableName string, index *Index) string {
|
||||
return fmt.Sprintf("DROP INDEX %v", quote(idxName))
|
||||
}
|
||||
|
||||
func (db *SQLite3) CleanDB() error {
|
||||
func (db *SQLite3) CleanDB(engine *xorm.Engine) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// TruncateDBTables deletes all data from all the tables and resets the sequences.
|
||||
// A special case is the dashboard_acl table where we keep the default permissions.
|
||||
func (db *SQLite3) TruncateDBTables() error {
|
||||
tables, err := db.engine.DBMetas()
|
||||
func (db *SQLite3) TruncateDBTables(engine *xorm.Engine) error {
|
||||
tables, err := engine.DBMetas()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sess := db.engine.NewSession()
|
||||
sess := engine.NewSession()
|
||||
defer sess.Close()
|
||||
|
||||
for _, table := range tables {
|
||||
|
@ -112,7 +112,7 @@ func newSQLStore(cfg *setting.Cfg, cacheService *localcache.CacheService, engine
|
||||
return nil, fmt.Errorf("%v: %w", "failed to connect to database", err)
|
||||
}
|
||||
|
||||
ss.Dialect = migrator.NewDialect(ss.engine)
|
||||
ss.Dialect = migrator.NewDialect(ss.engine.DriverName())
|
||||
|
||||
// if err := ss.Reset(); err != nil {
|
||||
// return nil, err
|
||||
@ -704,7 +704,7 @@ func initTestDB(testCfg *setting.Cfg, migration registry.DatabaseMigrator, opts
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := testSQLStore.Dialect.TruncateDBTables(); err != nil {
|
||||
if err := testSQLStore.Dialect.TruncateDBTables(engine); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -732,7 +732,7 @@ func initTestDB(testCfg *setting.Cfg, migration registry.DatabaseMigrator, opts
|
||||
return false
|
||||
}
|
||||
|
||||
if err := testSQLStore.Dialect.TruncateDBTables(); err != nil {
|
||||
if err := testSQLStore.Dialect.TruncateDBTables(testSQLStore.GetEngine()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := testSQLStore.Reset(); err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user