From cdad7125471a39eb2c6bea99ba2230b4fdb4c070 Mon Sep 17 00:00:00 2001 From: Matthew Jacobson Date: Thu, 30 Nov 2023 10:25:59 -0500 Subject: [PATCH] Alerting: Keep track of individual org migration status (#78369) * Alerting: Keep track of individual org migration status Save migration status per migrated org. Change the meaning (and key/value) of the org_id=0 entry to store the current (previous) config value used by alerting. This is so we can know when to upgrade/downgrade by comparing with the new config value in UnifiedAlerting.IsEnabled. --- .../ngalert/migration/migration_test.go | 41 +++-- pkg/services/ngalert/migration/service.go | 145 ++++++++++----- .../ngalert/migration/service_test.go | 166 +++++++++++------ .../ngalert/migration/store/database.go | 168 ++++++++++++------ .../ngalert/migration/store/testing.go | 7 +- .../sqlstore/migrations/migrations.go | 2 + .../ualert/org_upgrade_state_mig.go | 102 +++++++++++ 7 files changed, 452 insertions(+), 179 deletions(-) create mode 100644 pkg/services/sqlstore/migrations/ualert/org_upgrade_state_mig.go diff --git a/pkg/services/ngalert/migration/migration_test.go b/pkg/services/ngalert/migration/migration_test.go index 01c5fb2233d..de45efba4f0 100644 --- a/pkg/services/ngalert/migration/migration_test.go +++ b/pkg/services/ngalert/migration/migration_test.go @@ -33,11 +33,11 @@ import ( // TestServiceStart tests the wrapper method that decides when to run the migration based on migration status and settings. func TestServiceStart(t *testing.T) { tc := []struct { - name string - config *setting.Cfg - isMigrationRun bool - expectedErr bool - expected bool + name string + config *setting.Cfg + starting migrationStore.AlertingType + expectedErr bool + expected migrationStore.AlertingType }{ { name: "when unified alerting enabled and migration not already run, then run migration", @@ -46,8 +46,8 @@ func TestServiceStart(t *testing.T) { Enabled: pointer(true), }, }, - isMigrationRun: false, - expected: true, + starting: migrationStore.Legacy, + expected: migrationStore.UnifiedAlerting, }, { name: "when unified alerting disabled, migration is already run and force migration is enabled, then revert migration", @@ -57,8 +57,8 @@ func TestServiceStart(t *testing.T) { }, ForceMigration: true, }, - isMigrationRun: true, - expected: false, + starting: migrationStore.UnifiedAlerting, + expected: migrationStore.Legacy, }, { name: "when unified alerting disabled, migration is already run and force migration is disabled, then the migration should panic", @@ -68,9 +68,9 @@ func TestServiceStart(t *testing.T) { }, ForceMigration: false, }, - isMigrationRun: true, - expected: true, - expectedErr: true, + starting: migrationStore.UnifiedAlerting, + expected: migrationStore.UnifiedAlerting, + expectedErr: true, }, { name: "when unified alerting enabled and migration is already run, then do nothing", @@ -79,8 +79,8 @@ func TestServiceStart(t *testing.T) { Enabled: pointer(true), }, }, - isMigrationRun: true, - expected: true, + starting: migrationStore.UnifiedAlerting, + expected: migrationStore.UnifiedAlerting, }, { name: "when unified alerting disabled and migration is not already run, then do nothing", @@ -89,8 +89,8 @@ func TestServiceStart(t *testing.T) { Enabled: pointer(false), }, }, - isMigrationRun: false, - expected: false, + starting: migrationStore.Legacy, + expected: migrationStore.Legacy, }, } @@ -100,19 +100,18 @@ func TestServiceStart(t *testing.T) { ctx := context.Background() service := NewTestMigrationService(t, sqlStore, tt.config) - err := service.migrationStore.SetMigrated(ctx, tt.isMigrationRun) - require.NoError(t, err) + require.NoError(t, service.migrationStore.SetCurrentAlertingType(ctx, tt.starting)) - err = service.Run(ctx) + err := service.Run(ctx) if tt.expectedErr { require.Error(t, err) } else { require.NoError(t, err) } - migrated, err := service.migrationStore.IsMigrated(ctx) + aType, err := service.migrationStore.GetCurrentAlertingType(ctx) require.NoError(t, err) - require.Equal(t, tt.expected, migrated) + require.Equal(t, tt.expected, aType) }) } } diff --git a/pkg/services/ngalert/migration/service.go b/pkg/services/ngalert/migration/service.go index 6f3abcc0782..5d8d6d82b05 100644 --- a/pkg/services/ngalert/migration/service.go +++ b/pkg/services/ngalert/migration/service.go @@ -50,58 +50,18 @@ func ProvideService( }, nil } -// Run starts the migration. This will either migrate from legacy alerting to unified alerting or revert the migration. -// If the migration status in the kvstore is not set and unified alerting is enabled, the migration will be executed. -// If the migration status in the kvstore is set and both unified alerting is disabled and ForceMigration is set to true, the migration will be reverted. +// Run starts the migration to transition between legacy alerting and unified alerting based on the current and desired +// alerting type as determined by the kvstore and configuration, respectively. func (ms *migrationService) Run(ctx context.Context) error { var errMigration error errLock := ms.lock.LockExecuteAndRelease(ctx, actionName, time.Minute*10, func(ctx context.Context) { ms.log.Info("Starting") errMigration = ms.store.InTransaction(ctx, func(ctx context.Context) error { - migrated, err := ms.migrationStore.IsMigrated(ctx) + currentType, err := ms.migrationStore.GetCurrentAlertingType(ctx) if err != nil { return fmt.Errorf("getting migration status: %w", err) } - if migrated == ms.cfg.UnifiedAlerting.IsEnabled() { - // Nothing to do. - ms.log.Info("No migrations to run") - return nil - } - - if migrated { - // If legacy alerting is also disabled, there is nothing to do - if setting.AlertingEnabled != nil && !*setting.AlertingEnabled { - return nil - } - - // Safeguard to prevent data loss when reverting from UA to LA. - if !ms.cfg.ForceMigration { - return ForceMigrationError - } - - // Revert migration - ms.log.Info("Reverting legacy migration") - err := ms.migrationStore.RevertAllOrgs(ctx) - if err != nil { - return fmt.Errorf("reverting migration: %w", err) - } - ms.log.Info("Legacy migration reverted") - return nil - } - - ms.log.Info("Starting legacy migration") - err = ms.migrateAllOrgs(ctx) - if err != nil { - return fmt.Errorf("executing migration: %w", err) - } - - err = ms.migrationStore.SetMigrated(ctx, true) - if err != nil { - return fmt.Errorf("setting migration status: %w", err) - } - - ms.log.Info("Completed legacy migration") - return nil + return ms.applyTransition(ctx, newTransition(currentType, ms.cfg)) }) }) if errLock != nil { @@ -114,6 +74,89 @@ func (ms *migrationService) Run(ctx context.Context) error { return nil } +// newTransition creates a transition based on the current alerting type and the current configuration. +func newTransition(currentType migrationStore.AlertingType, cfg *setting.Cfg) transition { + desiredType := migrationStore.Legacy + if cfg.UnifiedAlerting.IsEnabled() { + desiredType = migrationStore.UnifiedAlerting + } + return transition{ + CurrentType: currentType, + DesiredType: desiredType, + CleanOnDowngrade: cfg.ForceMigration, + } +} + +// transition represents a migration from one alerting type to another. +type transition struct { + CurrentType migrationStore.AlertingType + DesiredType migrationStore.AlertingType + CleanOnDowngrade bool +} + +// isNoChange returns true if the migration is a no-op. +func (t transition) isNoChange() bool { + return t.CurrentType == t.DesiredType +} + +// isUpgrading returns true if the migration is an upgrade from legacy alerting to unified alerting. +func (t transition) isUpgrading() bool { + return t.CurrentType == migrationStore.Legacy && t.DesiredType == migrationStore.UnifiedAlerting +} + +// isDowngrading returns true if the migration is a downgrade from unified alerting to legacy alerting. +func (t transition) isDowngrading() bool { + return t.CurrentType == migrationStore.UnifiedAlerting && t.DesiredType == migrationStore.Legacy +} + +// shouldClean returns true if the migration should delete all unified alerting data. +func (t transition) shouldClean() bool { + return t.isDowngrading() && t.CleanOnDowngrade +} + +// applyTransition applies the transition to the database. +// If the transition is a no-op, nothing will be done. +// If the transition is a downgrade and CleanOnDowngrade is true, all unified alerting data will be deleted. +// If the transition is a downgrade and CleanOnDowngrade is false, an error will be returned. +// If the transition is an upgrade, all orgs will be migrated. +func (ms *migrationService) applyTransition(ctx context.Context, t transition) error { + l := ms.log.New( + "CurrentType", t.CurrentType, + "DesiredType", t.DesiredType, + "CleanOnDowngrade", t.CleanOnDowngrade, + ) + if t.isNoChange() { + l.Info("Migration already complete") + return nil + } + + // Safeguard to prevent accidental data loss when reverting from UA to LA. + if t.isDowngrading() && !ms.cfg.ForceMigration { + return ForceMigrationError + } + + if t.shouldClean() { + l.Info("Cleaning up unified alerting data") + if err := ms.migrationStore.RevertAllOrgs(ctx); err != nil { + return fmt.Errorf("cleaning up unified alerting data: %w", err) + } + l.Info("Unified alerting data deleted") + } + + if t.isUpgrading() { + if err := ms.migrateAllOrgs(ctx); err != nil { + return fmt.Errorf("executing migration: %w", err) + } + } + + if err := ms.migrationStore.SetCurrentAlertingType(ctx, t.DesiredType); err != nil { + return fmt.Errorf("setting migration status: %w", err) + } + + l.Info("Completed legacy migration") + return nil +} + // migrateAllOrgs executes the migration for all orgs. func (ms *migrationService) migrateAllOrgs(ctx context.Context) error { orgs, err := ms.migrationStore.GetAllOrgs(ctx) @@ -123,6 +166,15 @@ func (ms *migrationService) migrateAllOrgs(ctx context.Context) error { for _, o := range orgs { om := ms.newOrgMigration(o.ID) + migrated, err := ms.migrationStore.IsMigrated(ctx, o.ID) + if err != nil { + return fmt.Errorf("getting migration status for org %d: %w", o.ID, err) + } + if migrated { + om.log.Info("Org already migrated, skipping") + continue + } + if err := om.migrateOrg(ctx); err != nil { return fmt.Errorf("migrate org %d: %w", o.ID, err) } @@ -131,6 +183,11 @@ func (ms *migrationService) migrateAllOrgs(ctx context.Context) error { if err != nil { return fmt.Errorf("set org migration state: %w", err) } + + err = ms.migrationStore.SetMigrated(ctx, o.ID, true) + if err != nil { + return fmt.Errorf("setting migration status: %w", err) + } } return nil } diff --git a/pkg/services/ngalert/migration/service_test.go b/pkg/services/ngalert/migration/service_test.go index 0b4c464e974..978d802b9f4 100644 --- a/pkg/services/ngalert/migration/service_test.go +++ b/pkg/services/ngalert/migration/service_test.go @@ -2,11 +2,15 @@ package migration import ( "context" + "fmt" "testing" "time" "github.com/stretchr/testify/require" + migrationStore "github.com/grafana/grafana/pkg/services/ngalert/migration/store" + "xorm.io/xorm" + "github.com/grafana/grafana/pkg/infra/db" legacymodels "github.com/grafana/grafana/pkg/services/alerting/models" "github.com/grafana/grafana/pkg/services/dashboards" @@ -44,26 +48,23 @@ func TestServiceRevert(t *testing.T) { // Run migration. ctx := context.Background() cfg := &setting.Cfg{ - ForceMigration: true, UnifiedAlerting: setting.UnifiedAlertingSettings{ Enabled: pointer(true), }, } service := NewTestMigrationService(t, sqlStore, cfg) - err = service.migrationStore.SetMigrated(ctx, false) + err = service.migrationStore.SetCurrentAlertingType(ctx, migrationStore.Legacy) require.NoError(t, err) - err = service.Run(ctx) - require.NoError(t, err) + require.NoError(t, service.Run(ctx)) // Verify migration was run. - migrated, err := service.migrationStore.IsMigrated(ctx) - require.NoError(t, err) - require.Equal(t, true, migrated) + checkAlertingType(t, ctx, service, migrationStore.UnifiedAlerting) + checkMigrationStatus(t, ctx, service, 1, true) // Currently, we fill in some random data for tables that aren't populated during migration. - _, err = x.Table("ngalert_configuration").Insert(models.AdminConfiguration{}) + _, err = x.Table("ngalert_configuration").Insert(models.AdminConfiguration{OrgID: 1}) require.NoError(t, err) _, err = x.Table("alert_instance").Insert(models.AlertInstance{ AlertInstanceKey: models.AlertInstanceKey{ @@ -79,34 +80,32 @@ func TestServiceRevert(t *testing.T) { require.NoError(t, err) // Verify various UA resources exist - tables := []string{ - "alert_rule", - "alert_rule_version", - "alert_configuration", - "ngalert_configuration", - "alert_instance", + tables := [][2]string{ + {"alert_rule", "org_id"}, + {"alert_rule_version", "rule_org_id"}, + {"alert_configuration", "org_id"}, + {"ngalert_configuration", "org_id"}, + {"alert_instance", "rule_org_id"}, } for _, table := range tables { - count, err := x.Table(table).Count() - require.NoError(t, err) - require.True(t, count > 0, "table %s should have at least one row", table) + count, err := x.Table(table[0]).Where(fmt.Sprintf("%s=?", table[1]), 1).Count() + require.NoErrorf(t, err, "table %s error", table[0]) + require.True(t, count > 0, "table %s should have at least one row", table[0]) } // Revert migration. - service.cfg.UnifiedAlerting.Enabled = pointer(false) - err = service.Run(context.Background()) + err = service.migrationStore.RevertAllOrgs(context.Background()) require.NoError(t, err) // Verify revert was run. - migrated, err = service.migrationStore.IsMigrated(ctx) - require.NoError(t, err) - require.Equal(t, false, migrated) + checkAlertingType(t, ctx, service, migrationStore.Legacy) + checkMigrationStatus(t, ctx, service, 1, false) // Verify various UA resources are gone for _, table := range tables { - count, err := x.Table(table).Count() - require.NoError(t, err) - require.Equal(t, int64(0), count, "table %s should have no rows", table) + count, err := x.Table(table[0]).Where(fmt.Sprintf("%s=?", table[1]), 1).Count() + require.NoErrorf(t, err, "table %s error", table[0]) + require.Equal(t, int64(0), count, "table %s should have no rows", table[0]) } }) @@ -125,23 +124,20 @@ func TestServiceRevert(t *testing.T) { // Run migration. ctx := context.Background() cfg := &setting.Cfg{ - ForceMigration: true, UnifiedAlerting: setting.UnifiedAlertingSettings{ Enabled: pointer(true), }, } service := NewTestMigrationService(t, sqlStore, cfg) - err = service.migrationStore.SetMigrated(ctx, false) + err = service.migrationStore.SetCurrentAlertingType(ctx, migrationStore.Legacy) require.NoError(t, err) - err = service.Run(ctx) - require.NoError(t, err) + require.NoError(t, service.Run(ctx)) // Verify migration was run. - migrated, err := service.migrationStore.IsMigrated(ctx) - require.NoError(t, err) - require.Equal(t, true, migrated) + checkAlertingType(t, ctx, service, migrationStore.UnifiedAlerting) + checkMigrationStatus(t, ctx, service, 1, true) // Verify we created some folders. newDashCount, err := x.Table("dashboard").Count(&dashboards.Dashboard{}) @@ -163,14 +159,12 @@ func TestServiceRevert(t *testing.T) { } // Revert migration. - service.cfg.UnifiedAlerting.Enabled = pointer(false) - err = service.Run(context.Background()) + err = service.migrationStore.RevertAllOrgs(context.Background()) require.NoError(t, err) - // Verify revert was run. - migrated, err = service.migrationStore.IsMigrated(ctx) - require.NoError(t, err) - require.Equal(t, false, migrated) + // Verify revert was run. Should only set migration status for org. + checkAlertingType(t, ctx, service, migrationStore.Legacy) + checkMigrationStatus(t, ctx, service, 1, false) // Verify we are back to the original count. newDashCount, err = x.Table("dashboard").Count(&dashboards.Dashboard{}) @@ -210,19 +204,17 @@ func TestServiceRevert(t *testing.T) { } service := NewTestMigrationService(t, sqlStore, cfg) - err = service.migrationStore.SetMigrated(ctx, false) + err = service.migrationStore.SetCurrentAlertingType(ctx, migrationStore.Legacy) require.NoError(t, err) - err = service.Run(ctx) - require.NoError(t, err) + require.NoError(t, service.Run(ctx)) // Verify migration was run. - migrated, err := service.migrationStore.IsMigrated(ctx) - require.NoError(t, err) - require.Equal(t, true, migrated) + checkAlertingType(t, ctx, service, migrationStore.UnifiedAlerting) + checkMigrationStatus(t, ctx, service, 1, true) // Verify we created some folders. - newDashCount, err := x.Table("dashboard").Count(&dashboards.Dashboard{}) + newDashCount, err := x.Table("dashboard").Count(&dashboards.Dashboard{OrgID: 1}) require.NoError(t, err) require.Truef(t, newDashCount > dashCount, "newDashCount: %d should be greater than dashCount: %d", newDashCount, dashCount) @@ -257,17 +249,15 @@ func TestServiceRevert(t *testing.T) { require.NotNil(t, newF) // Revert migration. - service.cfg.UnifiedAlerting.Enabled = pointer(false) - err = service.Run(ctx) + err = service.migrationStore.RevertAllOrgs(context.Background()) require.NoError(t, err) - // Verify revert was run. - migrated, err = service.migrationStore.IsMigrated(ctx) - require.NoError(t, err) - require.Equal(t, false, migrated) + // Verify revert was run. Should only set migration status for org. + checkAlertingType(t, ctx, service, migrationStore.Legacy) + checkMigrationStatus(t, ctx, service, 1, false) // Verify we are back to the original count + 2. - newDashCount, err = x.Table("dashboard").Count(&dashboards.Dashboard{}) + newDashCount, err = x.Table("dashboard").Count(&dashboards.Dashboard{OrgID: 1}) require.NoError(t, err) require.Equalf(t, dashCount+2, newDashCount, "newDashCount: %d should be equal to dashCount + 2: %d after revert", newDashCount, dashCount) @@ -289,4 +279,76 @@ func TestServiceRevert(t *testing.T) { require.Nil(t, getDashboard(t, x, 1, uid)) } }) + + t.Run("ForceMigration story", func(t *testing.T) { + sqlStore := db.InitTestDB(t) + x := sqlStore.GetEngine() + + setupLegacyAlertsTables(t, x, channels, alerts, folders, dashes) + + ctx := context.Background() + cfg := &setting.Cfg{ + UnifiedAlerting: setting.UnifiedAlertingSettings{ + Enabled: pointer(true), + }, + } + service := NewTestMigrationService(t, sqlStore, cfg) + checkAlertingType(t, ctx, service, migrationStore.Legacy) + checkMigrationStatus(t, ctx, service, 1, false) + checkAlertRulesCount(t, x, 1, 0) + + // Enable UA. + // First run should migrate org. + require.NoError(t, service.Run(ctx)) + checkAlertingType(t, ctx, service, migrationStore.UnifiedAlerting) + checkMigrationStatus(t, ctx, service, 1, true) + checkAlertRulesCount(t, x, 1, 1) + + // Disable UA without ForceMigration. + // This run should throw an error. + service.cfg.UnifiedAlerting.Enabled = pointer(false) + require.ErrorContains(t, service.Run(ctx), ForceMigrationError.Error()) + checkAlertingType(t, ctx, service, migrationStore.UnifiedAlerting) + checkMigrationStatus(t, ctx, service, 1, true) + checkAlertRulesCount(t, x, 1, 1) + + // Disable UA with force flag. + // This run should not revert UA data. + service.cfg.UnifiedAlerting.Enabled = pointer(false) + service.cfg.ForceMigration = true + require.NoError(t, service.Run(ctx)) + checkAlertingType(t, ctx, service, migrationStore.Legacy) + checkMigrationStatus(t, ctx, service, 1, false) + checkAlertRulesCount(t, x, 1, 0) // Alerts are gone. + + // Add another alert. + _, alertErr := x.Insert(createAlert(t, 1, 1, 2, "alert2", []string{"notifier1"})) + require.NoError(t, alertErr) + + // Enable UA. + // This run should remigrate org, new alert is migrated. + service.cfg.UnifiedAlerting.Enabled = pointer(true) + require.NoError(t, service.Run(ctx)) + checkAlertingType(t, ctx, service, migrationStore.UnifiedAlerting) + checkMigrationStatus(t, ctx, service, 1, true) + checkAlertRulesCount(t, x, 1, 2) // Now we have 2 + }) +} + +func checkMigrationStatus(t *testing.T, ctx context.Context, service *migrationService, orgID int64, expected bool) { + migrated, err := service.migrationStore.IsMigrated(ctx, orgID) + require.NoError(t, err) + require.Equal(t, expected, migrated) +} + +func checkAlertingType(t *testing.T, ctx context.Context, service *migrationService, expected migrationStore.AlertingType) { + aType, err := service.migrationStore.GetCurrentAlertingType(ctx) + require.NoError(t, err) + require.Equal(t, expected, aType) +} + +func checkAlertRulesCount(t *testing.T, x *xorm.Engine, orgID int64, count int) { + cnt, err := x.Table("alert_rule").Where("org_id=?", orgID).Count() + require.NoError(t, err, "table alert_rule error") + require.Equal(t, int(cnt), count, "table alert_rule should have no rows") } diff --git a/pkg/services/ngalert/migration/store/database.go b/pkg/services/ngalert/migration/store/database.go index 2eecc8924ad..97e792adf2e 100644 --- a/pkg/services/ngalert/migration/store/database.go +++ b/pkg/services/ngalert/migration/store/database.go @@ -55,8 +55,10 @@ type Store interface { GetFolder(ctx context.Context, cmd *folder.GetFolderQuery) (*folder.Folder, error) CreateFolder(ctx context.Context, cmd *folder.CreateFolderCommand) (*folder.Folder, error) - IsMigrated(ctx context.Context) (bool, error) - SetMigrated(ctx context.Context, migrated bool) error + IsMigrated(ctx context.Context, orgID int64) (bool, error) + SetMigrated(ctx context.Context, orgID int64, migrated bool) error + GetCurrentAlertingType(ctx context.Context) (AlertingType, error) + SetCurrentAlertingType(ctx context.Context, t AlertingType) error GetOrgMigrationState(ctx context.Context, orgID int64) (*migmodels.OrgMigrationState, error) SetOrgMigrationState(ctx context.Context, orgID int64, summary *migmodels.OrgMigrationState) error @@ -122,11 +124,12 @@ const migratedKey = "migrated" // stateKey is the kvstore key used for the OrgMigrationState. const stateKey = "stateKey" -const anyOrg = 0 +// typeKey is the kvstore key used for the current AlertingType. +const typeKey = "currentAlertingType" // IsMigrated returns the migration status from the kvstore. -func (ms *migrationStore) IsMigrated(ctx context.Context) (bool, error) { - kv := kvstore.WithNamespace(ms.kv, anyOrg, KVNamespace) +func (ms *migrationStore) IsMigrated(ctx context.Context, orgID int64) (bool, error) { + kv := kvstore.WithNamespace(ms.kv, orgID, KVNamespace) content, exists, err := kv.Get(ctx, migratedKey) if err != nil { return false, err @@ -140,11 +143,60 @@ func (ms *migrationStore) IsMigrated(ctx context.Context) (bool, error) { } // SetMigrated sets the migration status in the kvstore. -func (ms *migrationStore) SetMigrated(ctx context.Context, migrated bool) error { - kv := kvstore.WithNamespace(ms.kv, anyOrg, KVNamespace) +func (ms *migrationStore) SetMigrated(ctx context.Context, orgID int64, migrated bool) error { + kv := kvstore.WithNamespace(ms.kv, orgID, KVNamespace) return kv.Set(ctx, migratedKey, strconv.FormatBool(migrated)) } +// AlertingType represents the current alerting type of Grafana. This is used to detect transitions between +// Legacy and UnifiedAlerting by comparing to the desired type in the configuration. +type AlertingType string + +const ( + Legacy AlertingType = "Legacy" + UnifiedAlerting AlertingType = "UnifiedAlerting" +) + +// typeFromString converts a string to an AlertingType. +func typeFromString(s string) (AlertingType, error) { + switch s { + case "Legacy": + return Legacy, nil + case "UnifiedAlerting": + return UnifiedAlerting, nil + default: + return "", fmt.Errorf("unknown alerting type: %s", s) + } +} + +const anyOrg = 0 + +// GetCurrentAlertingType returns the current AlertingType of Grafana. +func (ms *migrationStore) GetCurrentAlertingType(ctx context.Context) (AlertingType, error) { + kv := kvstore.WithNamespace(ms.kv, anyOrg, KVNamespace) + content, exists, err := kv.Get(ctx, typeKey) + if err != nil { + return "", err + } + + if !exists { + return Legacy, nil + } + + t, err := typeFromString(content) + if err != nil { + return "", err + } + + return t, nil +} + +// SetCurrentAlertingType stores the current AlertingType of Grafana. +func (ms *migrationStore) SetCurrentAlertingType(ctx context.Context, t AlertingType) error { + kv := kvstore.WithNamespace(ms.kv, anyOrg, KVNamespace) + return kv.Set(ctx, typeKey, string(t)) +} + // GetOrgMigrationState returns a summary of a previous migration. func (ms *migrationStore) GetOrgMigrationState(ctx context.Context, orgID int64) (*migmodels.OrgMigrationState, error) { kv := kvstore.WithNamespace(ms.kv, orgID, KVNamespace) @@ -224,64 +276,62 @@ var revertPermissions = []accesscontrol.Permission{ } // RevertAllOrgs reverts the migration, deleting all unified alerting resources such as alert rules, alertmanager configurations, and silence files. -// In addition, it will delete all folders and permissions originally created by this migration, these are stored in the kvstore. +// In addition, it will delete all folders and permissions originally created by this migration, as well as the various migration statuses stored +// in kvstore, both org-specific and anyOrg. func (ms *migrationStore) RevertAllOrgs(ctx context.Context) error { - return ms.store.WithTransactionalDbSession(ctx, func(sess *db.Session) error { - if _, err := sess.Exec("DELETE FROM alert_rule"); err != nil { - return err - } - - if _, err := sess.Exec("DELETE FROM alert_rule_version"); err != nil { - return err - } - - orgs, err := ms.GetAllOrgs(ctx) - if err != nil { - return fmt.Errorf("get orgs: %w", err) - } - for _, o := range orgs { - if err := ms.DeleteMigratedFolders(ctx, o.ID); err != nil { - ms.log.Warn("Failed to delete migrated folders", "orgID", o.ID, "err", err) - continue + return ms.store.InTransaction(ctx, func(ctx context.Context) error { + return ms.store.WithDbSession(ctx, func(sess *db.Session) error { + if _, err := sess.Exec("DELETE FROM alert_rule"); err != nil { + return err } - } - if _, err := sess.Exec("DELETE FROM alert_configuration"); err != nil { - return err - } - - if _, err := sess.Exec("DELETE FROM ngalert_configuration"); err != nil { - return err - } - - if _, err := sess.Exec("DELETE FROM alert_instance"); err != nil { - return err - } - - if _, err := sess.Exec("DELETE FROM kv_store WHERE namespace = ?", notifier.KVNamespace); err != nil { - return err - } - - if _, err := sess.Exec("DELETE FROM kv_store WHERE namespace = ?", KVNamespace); err != nil { - return err - } - - files, err := filepath.Glob(filepath.Join(ms.cfg.DataPath, "alerting", "*", "silences")) - if err != nil { - return err - } - for _, f := range files { - if err := os.Remove(f); err != nil { - ms.log.Error("Failed to remove silence file", "file", f, "err", err) + if _, err := sess.Exec("DELETE FROM alert_rule_version"); err != nil { + return err } - } - err = ms.SetMigrated(ctx, false) - if err != nil { - return fmt.Errorf("setting migration status: %w", err) - } + orgs, err := ms.GetAllOrgs(ctx) + if err != nil { + return fmt.Errorf("get orgs: %w", err) + } + for _, o := range orgs { + if err := ms.DeleteMigratedFolders(ctx, o.ID); err != nil { + ms.log.Warn("Failed to delete migrated folders", "orgID", o.ID, "err", err) + continue + } + } - return nil + if _, err := sess.Exec("DELETE FROM alert_configuration"); err != nil { + return err + } + + if _, err := sess.Exec("DELETE FROM ngalert_configuration"); err != nil { + return err + } + + if _, err := sess.Exec("DELETE FROM alert_instance"); err != nil { + return err + } + + if _, err := sess.Exec("DELETE FROM kv_store WHERE namespace = ?", notifier.KVNamespace); err != nil { + return err + } + + if _, err := sess.Exec("DELETE FROM kv_store WHERE namespace = ?", KVNamespace); err != nil { + return err + } + + files, err := filepath.Glob(filepath.Join(ms.cfg.DataPath, "alerting", "*", "silences")) + if err != nil { + return err + } + for _, f := range files { + if err := os.Remove(f); err != nil { + ms.log.Error("Failed to remove silence file", "file", f, "err", err) + } + } + + return nil + }) }) } diff --git a/pkg/services/ngalert/migration/store/testing.go b/pkg/services/ngalert/migration/store/testing.go index ae8612fb685..cea8fefa4d7 100644 --- a/pkg/services/ngalert/migration/store/testing.go +++ b/pkg/services/ngalert/migration/store/testing.go @@ -9,6 +9,7 @@ import ( "github.com/grafana/grafana/pkg/api/routing" "github.com/grafana/grafana/pkg/bus" + "github.com/grafana/grafana/pkg/infra/kvstore" "github.com/grafana/grafana/pkg/infra/localcache" "github.com/grafana/grafana/pkg/infra/log/logtest" "github.com/grafana/grafana/pkg/infra/tracing" @@ -25,7 +26,6 @@ import ( "github.com/grafana/grafana/pkg/services/guardian" "github.com/grafana/grafana/pkg/services/licensing/licensingtest" "github.com/grafana/grafana/pkg/services/ngalert/store" - "github.com/grafana/grafana/pkg/services/ngalert/tests/fakes" "github.com/grafana/grafana/pkg/services/org/orgimpl" "github.com/grafana/grafana/pkg/services/quota/quotatest" "github.com/grafana/grafana/pkg/services/sqlstore" @@ -36,7 +36,7 @@ import ( "github.com/grafana/grafana/pkg/setting" ) -func NewTestMigrationStore(t *testing.T, sqlStore *sqlstore.SQLStore, cfg *setting.Cfg) *migrationStore { +func NewTestMigrationStore(t testing.TB, sqlStore *sqlstore.SQLStore, cfg *setting.Cfg) *migrationStore { if cfg.UnifiedAlerting.BaseInterval == 0 { cfg.UnifiedAlerting.BaseInterval = time.Second * 10 } @@ -45,6 +45,7 @@ func NewTestMigrationStore(t *testing.T, sqlStore *sqlstore.SQLStore, cfg *setti alertingStore := store.DBstore{ SQLStore: sqlStore, Cfg: cfg.UnifiedAlerting, + Logger: &logtest.Fake{}, } bus := bus.ProvideBus(tracing.InitializeTracerForTest()) folderStore := folderimpl.ProvideDashboardFolderStore(sqlStore) @@ -90,7 +91,7 @@ func NewTestMigrationStore(t *testing.T, sqlStore *sqlstore.SQLStore, cfg *setti log: &logtest.Fake{}, cfg: cfg, store: sqlStore, - kv: fakes.NewFakeKVStore(t), + kv: kvstore.ProvideService(sqlStore), alertingStore: &alertingStore, dashboardService: dashboardService, folderService: folderService, diff --git a/pkg/services/sqlstore/migrations/migrations.go b/pkg/services/sqlstore/migrations/migrations.go index eb8a220e42e..50968f6c378 100644 --- a/pkg/services/sqlstore/migrations/migrations.go +++ b/pkg/services/sqlstore/migrations/migrations.go @@ -111,6 +111,8 @@ func (*OSSMigrations) AddMigration(mg *Migrator) { dashboardFolderMigrations.AddDashboardFolderMigrations(mg) ssosettings.AddMigration(mg) + + ualert.CreateOrgMigratedKVStoreEntries(mg) } func addStarMigrations(mg *Migrator) { diff --git a/pkg/services/sqlstore/migrations/ualert/org_upgrade_state_mig.go b/pkg/services/sqlstore/migrations/ualert/org_upgrade_state_mig.go new file mode 100644 index 00000000000..5573888be03 --- /dev/null +++ b/pkg/services/sqlstore/migrations/ualert/org_upgrade_state_mig.go @@ -0,0 +1,102 @@ +package ualert + +import ( + "fmt" + + "xorm.io/xorm" + + "github.com/grafana/grafana/pkg/services/sqlstore/migrator" +) + +// typeKey is a vendored migration.typeKey. +var typeKey = "currentAlertingType" + +// AlertingType is a vendored migration.store.AlertingType. +type alertingType string + +const ( + Legacy alertingType = "Legacy" + UnifiedAlerting alertingType = "UnifiedAlerting" +) + +// CreateOrgMigratedKVStoreEntries creates kv store entries for each organization if the migration has been run. +// This is needed now that we've changed the semantics of data loss when upgrading / rolling back. If a user who previously +// upgraded were to rollback and upgrade again without clean_upgrade, then since they don't have org-level migrated states +// it will attempt to upgrade their orgs as if they had never upgraded before. This will almost definitely fail with +// duplicate key errors. +// +// In addition, this changes the entry for orgId=0 to be better named as it no longer tracks whether the +// migration has been run, but rather the current alerting type of Grafana; Legacy or UnifiedAlerting. This is used to +// detect transitions between Legacy and UnifiedAlerting by comparing to the desired type in the configuration. +func CreateOrgMigratedKVStoreEntries(mg *migrator.Migrator) { + mg.AddMigration("copy kvstore migration status to each org", &createOrgMigratedKVStoreEntries{}) +} + +type createOrgMigratedKVStoreEntries struct { + migrator.MigrationBase +} + +func (c createOrgMigratedKVStoreEntries) SQL(migrator.Dialect) string { + return codeMigration +} + +func (c createOrgMigratedKVStoreEntries) Exec(sess *xorm.Session, mg *migrator.Migrator) error { + var anyOrg int64 = 0 + migrated := kvStoreV1Entry{ + OrgID: &anyOrg, + Namespace: &KVNamespace, + Key: &migratedKey, + } + has, err := sess.Table("kv_store").Get(&migrated) + if err != nil { + return err + } + + if !has { + mg.Logger.Debug("No migrated status in kvstore, nothing to set") + return nil + } + + // Rename old entry key and value. + val := Legacy + if migrated.Value == "true" { + val = UnifiedAlerting + } + if _, err := sess.Table("kv_store").Where("id = ?", migrated.ID).Update(&kvStoreV1Entry{ + Key: &typeKey, + Value: string(val), + }); err != nil { + mg.Logger.Error("failed to rename org migrated status in kvstore", "err", err) + return err + } + + var orgs []struct { + ID int64 `xorm:"id"` + } + if err := sess.SQL("select id from org").Find(&orgs); err != nil { + return err + } + + if len(orgs) == 0 { + mg.Logger.Debug("no orgs, nothing to set in kvstore") + return nil + } + + for _, org := range orgs { + id := org.ID + entry := kvStoreV1Entry{ + OrgID: &id, + Namespace: &KVNamespace, + Key: &migratedKey, + Value: migrated.Value, + Created: migrated.Created, + Updated: migrated.Updated, + } + if _, errCreate := sess.Table("kv_store").Insert(&entry); errCreate != nil { + mg.Logger.Error("failed to insert org migration status to kvstore", "err", errCreate) + return fmt.Errorf("failed to insert org migration status to kvstore: %w", errCreate) + } + } + + return nil +}