Migrations: Support independent migration log for entity api (#68511)

This commit is contained in:
Ryan McKinley 2023-05-22 11:31:07 -07:00 committed by GitHub
parent 2c75a51285
commit 26658d172f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 127 additions and 64 deletions

View File

@ -1,7 +1,6 @@
package migrations
import (
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/sqlstore/migrations/accesscontrol"
"github.com/grafana/grafana/pkg/services/sqlstore/migrations/ualert"
. "github.com/grafana/grafana/pkg/services/sqlstore/migrator"
@ -23,7 +22,7 @@ func ProvideOSSMigrations() *OSSMigrations {
}
func (*OSSMigrations) AddMigration(mg *Migrator) {
addMigrationLogMigrations(mg)
mg.AddCreateMigration()
addUserMigrations(mg)
addTempUserMigrations(mg)
addStarMigrations(mg)
@ -70,12 +69,6 @@ func (*OSSMigrations) AddMigration(mg *Migrator) {
addCorrelationsMigrations(mg)
if mg.Cfg != nil && mg.Cfg.IsFeatureToggleEnabled != nil {
if mg.Cfg.IsFeatureToggleEnabled(featuremgmt.FlagEntityStore) {
addEntityStoreMigrations(mg)
}
}
addEntityEventsTableMigration(mg)
addPublicDashboardMigration(mg)
@ -98,22 +91,6 @@ func (*OSSMigrations) AddMigration(mg *Migrator) {
addFolderMigrations(mg)
}
func addMigrationLogMigrations(mg *Migrator) {
migrationLogV1 := Table{
Name: "migration_log",
Columns: []*Column{
{Name: "id", Type: DB_BigInt, IsPrimaryKey: true, IsAutoIncrement: true},
{Name: "migration_id", Type: DB_NVarchar, Length: 255},
{Name: "sql", Type: DB_Text},
{Name: "success", Type: DB_Bool},
{Name: "error", Type: DB_Text},
{Name: "timestamp", Type: DB_DateTime},
},
}
mg.AddMigration("create migration_log table", NewAddTableMigration(migrationLogV1))
}
func addStarMigrations(mg *Migrator) {
starV1 := Table{
Name: "star",

View File

@ -28,6 +28,7 @@ type Migrator struct {
Cfg *setting.Cfg
isLocked atomic.Bool
logMap map[string]MigrationLog
tableName string
}
type MigrationLog struct {
@ -40,16 +41,44 @@ type MigrationLog struct {
}
func NewMigrator(engine *xorm.Engine, cfg *setting.Cfg) *Migrator {
mg := &Migrator{}
mg.DBEngine = engine
mg.Logger = log.New("migrator")
mg.migrations = make([]Migration, 0)
mg.migrationIds = make(map[string]struct{})
mg.Dialect = NewDialect(mg.DBEngine)
mg.Cfg = cfg
return NewScopedMigrator(engine, cfg, "")
}
// NewScopedMigrator should only be used for the transition to a new storage engine
func NewScopedMigrator(engine *xorm.Engine, cfg *setting.Cfg, scope string) *Migrator {
mg := &Migrator{
Cfg: cfg,
DBEngine: engine,
migrations: make([]Migration, 0),
migrationIds: make(map[string]struct{}),
Dialect: NewDialect(engine),
}
if scope == "" {
mg.tableName = "migration_log"
mg.Logger = log.New("migrator")
} else {
mg.tableName = scope + "_migration_log"
mg.Logger = log.New(scope + " migrator")
}
return mg
}
// AddCreateMigration adds the initial migration log table -- this should likely be
// automatic and first, but enough tests exists that do not expect that we can keep it explicit
func (mg *Migrator) AddCreateMigration() {
mg.AddMigration("create "+mg.tableName+" table", NewAddTableMigration(Table{
Name: mg.tableName,
Columns: []*Column{
{Name: "id", Type: DB_BigInt, IsPrimaryKey: true, IsAutoIncrement: true},
{Name: "migration_id", Type: DB_NVarchar, Length: 255},
{Name: "sql", Type: DB_Text},
{Name: "success", Type: DB_Bool},
{Name: "error", Type: DB_Text},
{Name: "timestamp", Type: DB_DateTime},
},
}))
}
func (mg *Migrator) MigrationsCount() int {
return len(mg.migrations)
}
@ -79,7 +108,7 @@ func (mg *Migrator) GetMigrationLog() (map[string]MigrationLog, error) {
logMap := make(map[string]MigrationLog)
logItems := make([]MigrationLog, 0)
exists, err := mg.DBEngine.IsTableExist(new(MigrationLog))
exists, err := mg.DBEngine.IsTableExist(mg.tableName)
if err != nil {
return nil, fmt.Errorf("%v: %w", "failed to check table existence", err)
}
@ -87,7 +116,7 @@ func (mg *Migrator) GetMigrationLog() (map[string]MigrationLog, error) {
return logMap, nil
}
if err = mg.DBEngine.Find(&logItems); err != nil {
if err = mg.DBEngine.Table(mg.tableName).Find(&logItems); err != nil {
return nil, err
}
@ -167,7 +196,7 @@ func (mg *Migrator) run() (err error) {
mg.Logger.Error("Exec failed", "error", err, "sql", sql)
record.Error = err.Error()
if !m.SkipMigrationLog() {
if _, err := sess.Insert(&record); err != nil {
if _, err := sess.Table(mg.tableName).Insert(&record); err != nil {
return err
}
}
@ -175,7 +204,7 @@ func (mg *Migrator) run() (err error) {
}
record.Success = true
if !m.SkipMigrationLog() {
_, err = sess.Insert(&record)
_, err = sess.Table(mg.tableName).Insert(&record)
}
if err == nil {
migrationsPerformed++

View File

@ -479,6 +479,10 @@ func (ss *SQLStore) readConfig() error {
return nil
}
func (ss *SQLStore) GetMigrationLockAttemptTimeout() int {
return ss.dbCfg.MigrationLockAttemptTimeout
}
func (ss *SQLStore) RecursiveQueriesAreSupported() (bool, error) {
if ss.recursiveQueriesAreSupported != nil {
return *ss.recursiveQueriesAreSupported, nil

View File

@ -2,10 +2,8 @@ package migrations
import (
"fmt"
"strings"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
"github.com/grafana/grafana/pkg/setting"
)
func getLatinPathColumn(name string) *migrator.Column {
@ -18,7 +16,7 @@ func getLatinPathColumn(name string) *migrator.Column {
}
}
func addEntityStoreMigrations(mg *migrator.Migrator) {
func initEntityTables(mg *migrator.Migrator) {
grnLength := 256 // len(tenant)~8 + len(kind)!16 + len(kind)~128 = 256
tables := []migrator.Table{}
tables = append(tables, migrator.Table{
@ -182,38 +180,16 @@ func addEntityStoreMigrations(mg *migrator.Migrator) {
},
})
// !!! This should not run in production!
// The object store SQL schema is still in active development and this
// will only be called when the feature toggle is enabled
// this check should not be necessary, but is added as an extra check
if setting.Env == setting.Prod {
return
}
// Migration cleanups: given that this is a complex setup
// that requires a lot of testing before we are ready to push out of dev
// this script lets us easy wipe previous changes and initialize clean tables
suffix := " (v010)" // change this when we want to wipe and reset the object tables
mg.AddMigration("EntityStore init: cleanup"+suffix, migrator.NewRawSQLMigration(strings.TrimSpace(`
DELETE FROM migration_log WHERE migration_id LIKE 'EntityStore init%';
`)))
// for a while this was called "ObjectStore"... this can be removed before we remove the dev only flags
mg.AddMigration("EntityStore init: object cleanup"+suffix, migrator.NewRawSQLMigration(strings.TrimSpace(`
DELETE FROM migration_log WHERE migration_id LIKE 'ObjectStore init%';
`)))
// Initialize all tables
for t := range tables {
mg.AddMigration("EntityStore init: drop "+tables[t].Name+suffix, migrator.NewRawSQLMigration(
fmt.Sprintf("DROP TABLE IF EXISTS %s", tables[t].Name),
))
mg.AddMigration("EntityStore init: table "+tables[t].Name+suffix, migrator.NewAddTableMigration(tables[t]))
mg.AddMigration("drop table "+tables[t].Name, migrator.NewDropTableMigration(tables[t].Name))
mg.AddMigration("create table "+tables[t].Name, migrator.NewAddTableMigration(tables[t]))
for i := range tables[t].Indices {
mg.AddMigration(fmt.Sprintf("EntityStore init: index %s[%d]"+suffix, tables[t].Name, i), migrator.NewAddIndexMigration(tables[t], tables[t].Indices[i]))
mg.AddMigration(fmt.Sprintf("create table %s, index: %d", tables[t].Name, i), migrator.NewAddIndexMigration(tables[t], tables[t].Indices[i]))
}
}
mg.AddMigration("EntityStore init: set path collation in entity tables"+suffix, migrator.NewRawSQLMigration("").
mg.AddMigration("set path collation on entity table", migrator.NewRawSQLMigration("").
// MySQL `utf8mb4_unicode_ci` collation is set in `mysql_dialect.go`
// SQLite uses a `BINARY` collation by default
Postgres("ALTER TABLE entity_folder ALTER COLUMN slug_path TYPE VARCHAR(1024) COLLATE \"C\";")) // Collate C - sorting done based on character code byte values

View File

@ -0,0 +1,72 @@
package migrations
import (
"context"
"fmt"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
"github.com/grafana/grafana/pkg/services/sqlstore/session"
"github.com/grafana/grafana/pkg/setting"
)
func MigrateEntityStore(xdb db.DB, features featuremgmt.FeatureToggles) error {
// Skip if feature flag is not enabled
if !features.IsEnabled(featuremgmt.FlagEntityStore) {
return nil
}
// Migrations depend on upstream xorm implementations
sql, ok := xdb.(*sqlstore.SQLStore)
if !ok {
return nil
}
// !!! This should not run in production!
// The object store SQL schema is still in active development and this
// will only be called when the feature toggle is enabled
// this check should not be necessary, but is added as an extra check
if setting.Env == setting.Prod {
return nil
}
marker := "Initialize entity tables (v0)" // changing this key wipe+rewrite everything
mg := migrator.NewScopedMigrator(sql.GetEngine(), sql.Cfg, "entity")
mg.AddCreateMigration()
mg.AddMigration(marker, &migrator.RawSQLMigration{})
initEntityTables(mg)
// While this feature is under development, we can completly wipe and recreate
// The initial plan is to keep the source of truth in existing SQL tables, and mirrot it
// to a kubernetes model. Once the kubernetes model needs to be preserved,
// this code should be removed
log, err := mg.GetMigrationLog()
if err != nil {
return err
}
_, found := log[marker]
if !found && len(log) > 0 {
// Remove the migration log (and potential other orphan tables)
tables := []string{"entity_migration_log"}
ctx := context.Background()
err = sql.GetSqlxSession().WithTransaction(ctx, func(tx *session.SessionTx) error {
for _, t := range tables {
_, err := tx.Exec(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", t))
if err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}
}
return mg.Start(
features.IsEnabled(featuremgmt.FlagMigrationLocking),
sql.GetMigrationLockAttemptTimeout())
}

View File

@ -19,6 +19,7 @@ import (
"github.com/grafana/grafana/pkg/services/org"
"github.com/grafana/grafana/pkg/services/quota"
"github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/services/store/entity/migrations"
"github.com/grafana/grafana/pkg/services/user"
"github.com/grafana/grafana/pkg/setting"
)
@ -103,6 +104,10 @@ func ProvideService(
grafanaStorageLogger.Warn("error loading storage config", "error", err)
}
if err := migrations.MigrateEntityStore(sql, features); err != nil {
return nil, err
}
// always exists
globalRoots := []storageRuntime{
newDiskStorage(RootStorageMeta{