mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Alerting: Keep track of individual org migration status (#78369)
* Alerting: Keep track of individual org migration status Save migration status per migrated org. Change the meaning (and key/value) of the org_id=0 entry to store the current (previous) config value used by alerting. This is so we can know when to upgrade/downgrade by comparing with the new config value in UnifiedAlerting.IsEnabled.
This commit is contained in:
parent
329d0e79ec
commit
cdad712547
@ -33,11 +33,11 @@ import (
|
||||
// TestServiceStart tests the wrapper method that decides when to run the migration based on migration status and settings.
|
||||
func TestServiceStart(t *testing.T) {
|
||||
tc := []struct {
|
||||
name string
|
||||
config *setting.Cfg
|
||||
isMigrationRun bool
|
||||
expectedErr bool
|
||||
expected bool
|
||||
name string
|
||||
config *setting.Cfg
|
||||
starting migrationStore.AlertingType
|
||||
expectedErr bool
|
||||
expected migrationStore.AlertingType
|
||||
}{
|
||||
{
|
||||
name: "when unified alerting enabled and migration not already run, then run migration",
|
||||
@ -46,8 +46,8 @@ func TestServiceStart(t *testing.T) {
|
||||
Enabled: pointer(true),
|
||||
},
|
||||
},
|
||||
isMigrationRun: false,
|
||||
expected: true,
|
||||
starting: migrationStore.Legacy,
|
||||
expected: migrationStore.UnifiedAlerting,
|
||||
},
|
||||
{
|
||||
name: "when unified alerting disabled, migration is already run and force migration is enabled, then revert migration",
|
||||
@ -57,8 +57,8 @@ func TestServiceStart(t *testing.T) {
|
||||
},
|
||||
ForceMigration: true,
|
||||
},
|
||||
isMigrationRun: true,
|
||||
expected: false,
|
||||
starting: migrationStore.UnifiedAlerting,
|
||||
expected: migrationStore.Legacy,
|
||||
},
|
||||
{
|
||||
name: "when unified alerting disabled, migration is already run and force migration is disabled, then the migration should panic",
|
||||
@ -68,9 +68,9 @@ func TestServiceStart(t *testing.T) {
|
||||
},
|
||||
ForceMigration: false,
|
||||
},
|
||||
isMigrationRun: true,
|
||||
expected: true,
|
||||
expectedErr: true,
|
||||
starting: migrationStore.UnifiedAlerting,
|
||||
expected: migrationStore.UnifiedAlerting,
|
||||
expectedErr: true,
|
||||
},
|
||||
{
|
||||
name: "when unified alerting enabled and migration is already run, then do nothing",
|
||||
@ -79,8 +79,8 @@ func TestServiceStart(t *testing.T) {
|
||||
Enabled: pointer(true),
|
||||
},
|
||||
},
|
||||
isMigrationRun: true,
|
||||
expected: true,
|
||||
starting: migrationStore.UnifiedAlerting,
|
||||
expected: migrationStore.UnifiedAlerting,
|
||||
},
|
||||
{
|
||||
name: "when unified alerting disabled and migration is not already run, then do nothing",
|
||||
@ -89,8 +89,8 @@ func TestServiceStart(t *testing.T) {
|
||||
Enabled: pointer(false),
|
||||
},
|
||||
},
|
||||
isMigrationRun: false,
|
||||
expected: false,
|
||||
starting: migrationStore.Legacy,
|
||||
expected: migrationStore.Legacy,
|
||||
},
|
||||
}
|
||||
|
||||
@ -100,19 +100,18 @@ func TestServiceStart(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
service := NewTestMigrationService(t, sqlStore, tt.config)
|
||||
|
||||
err := service.migrationStore.SetMigrated(ctx, tt.isMigrationRun)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, service.migrationStore.SetCurrentAlertingType(ctx, tt.starting))
|
||||
|
||||
err = service.Run(ctx)
|
||||
err := service.Run(ctx)
|
||||
if tt.expectedErr {
|
||||
require.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
migrated, err := service.migrationStore.IsMigrated(ctx)
|
||||
aType, err := service.migrationStore.GetCurrentAlertingType(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tt.expected, migrated)
|
||||
require.Equal(t, tt.expected, aType)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -50,58 +50,18 @@ func ProvideService(
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Run starts the migration. This will either migrate from legacy alerting to unified alerting or revert the migration.
|
||||
// If the migration status in the kvstore is not set and unified alerting is enabled, the migration will be executed.
|
||||
// If the migration status in the kvstore is set and both unified alerting is disabled and ForceMigration is set to true, the migration will be reverted.
|
||||
// Run starts the migration to transition between legacy alerting and unified alerting based on the current and desired
|
||||
// alerting type as determined by the kvstore and configuration, respectively.
|
||||
func (ms *migrationService) Run(ctx context.Context) error {
|
||||
var errMigration error
|
||||
errLock := ms.lock.LockExecuteAndRelease(ctx, actionName, time.Minute*10, func(ctx context.Context) {
|
||||
ms.log.Info("Starting")
|
||||
errMigration = ms.store.InTransaction(ctx, func(ctx context.Context) error {
|
||||
migrated, err := ms.migrationStore.IsMigrated(ctx)
|
||||
currentType, err := ms.migrationStore.GetCurrentAlertingType(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting migration status: %w", err)
|
||||
}
|
||||
if migrated == ms.cfg.UnifiedAlerting.IsEnabled() {
|
||||
// Nothing to do.
|
||||
ms.log.Info("No migrations to run")
|
||||
return nil
|
||||
}
|
||||
|
||||
if migrated {
|
||||
// If legacy alerting is also disabled, there is nothing to do
|
||||
if setting.AlertingEnabled != nil && !*setting.AlertingEnabled {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Safeguard to prevent data loss when reverting from UA to LA.
|
||||
if !ms.cfg.ForceMigration {
|
||||
return ForceMigrationError
|
||||
}
|
||||
|
||||
// Revert migration
|
||||
ms.log.Info("Reverting legacy migration")
|
||||
err := ms.migrationStore.RevertAllOrgs(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("reverting migration: %w", err)
|
||||
}
|
||||
ms.log.Info("Legacy migration reverted")
|
||||
return nil
|
||||
}
|
||||
|
||||
ms.log.Info("Starting legacy migration")
|
||||
err = ms.migrateAllOrgs(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("executing migration: %w", err)
|
||||
}
|
||||
|
||||
err = ms.migrationStore.SetMigrated(ctx, true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("setting migration status: %w", err)
|
||||
}
|
||||
|
||||
ms.log.Info("Completed legacy migration")
|
||||
return nil
|
||||
return ms.applyTransition(ctx, newTransition(currentType, ms.cfg))
|
||||
})
|
||||
})
|
||||
if errLock != nil {
|
||||
@ -114,6 +74,89 @@ func (ms *migrationService) Run(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// newTransition creates a transition based on the current alerting type and the current configuration.
|
||||
func newTransition(currentType migrationStore.AlertingType, cfg *setting.Cfg) transition {
|
||||
desiredType := migrationStore.Legacy
|
||||
if cfg.UnifiedAlerting.IsEnabled() {
|
||||
desiredType = migrationStore.UnifiedAlerting
|
||||
}
|
||||
return transition{
|
||||
CurrentType: currentType,
|
||||
DesiredType: desiredType,
|
||||
CleanOnDowngrade: cfg.ForceMigration,
|
||||
}
|
||||
}
|
||||
|
||||
// transition represents a migration from one alerting type to another.
|
||||
type transition struct {
|
||||
CurrentType migrationStore.AlertingType
|
||||
DesiredType migrationStore.AlertingType
|
||||
CleanOnDowngrade bool
|
||||
}
|
||||
|
||||
// isNoChange returns true if the migration is a no-op.
|
||||
func (t transition) isNoChange() bool {
|
||||
return t.CurrentType == t.DesiredType
|
||||
}
|
||||
|
||||
// isUpgrading returns true if the migration is an upgrade from legacy alerting to unified alerting.
|
||||
func (t transition) isUpgrading() bool {
|
||||
return t.CurrentType == migrationStore.Legacy && t.DesiredType == migrationStore.UnifiedAlerting
|
||||
}
|
||||
|
||||
// isDowngrading returns true if the migration is a downgrade from unified alerting to legacy alerting.
|
||||
func (t transition) isDowngrading() bool {
|
||||
return t.CurrentType == migrationStore.UnifiedAlerting && t.DesiredType == migrationStore.Legacy
|
||||
}
|
||||
|
||||
// shouldClean returns true if the migration should delete all unified alerting data.
|
||||
func (t transition) shouldClean() bool {
|
||||
return t.isDowngrading() && t.CleanOnDowngrade
|
||||
}
|
||||
|
||||
// applyTransition applies the transition to the database.
|
||||
// If the transition is a no-op, nothing will be done.
|
||||
// If the transition is a downgrade and CleanOnDowngrade is true, all unified alerting data will be deleted.
|
||||
// If the transition is a downgrade and CleanOnDowngrade is false, an error will be returned.
|
||||
// If the transition is an upgrade, all orgs will be migrated.
|
||||
func (ms *migrationService) applyTransition(ctx context.Context, t transition) error {
|
||||
l := ms.log.New(
|
||||
"CurrentType", t.CurrentType,
|
||||
"DesiredType", t.DesiredType,
|
||||
"CleanOnDowngrade", t.CleanOnDowngrade,
|
||||
)
|
||||
if t.isNoChange() {
|
||||
l.Info("Migration already complete")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Safeguard to prevent accidental data loss when reverting from UA to LA.
|
||||
if t.isDowngrading() && !ms.cfg.ForceMigration {
|
||||
return ForceMigrationError
|
||||
}
|
||||
|
||||
if t.shouldClean() {
|
||||
l.Info("Cleaning up unified alerting data")
|
||||
if err := ms.migrationStore.RevertAllOrgs(ctx); err != nil {
|
||||
return fmt.Errorf("cleaning up unified alerting data: %w", err)
|
||||
}
|
||||
l.Info("Unified alerting data deleted")
|
||||
}
|
||||
|
||||
if t.isUpgrading() {
|
||||
if err := ms.migrateAllOrgs(ctx); err != nil {
|
||||
return fmt.Errorf("executing migration: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := ms.migrationStore.SetCurrentAlertingType(ctx, t.DesiredType); err != nil {
|
||||
return fmt.Errorf("setting migration status: %w", err)
|
||||
}
|
||||
|
||||
l.Info("Completed legacy migration")
|
||||
return nil
|
||||
}
|
||||
|
||||
// migrateAllOrgs executes the migration for all orgs.
|
||||
func (ms *migrationService) migrateAllOrgs(ctx context.Context) error {
|
||||
orgs, err := ms.migrationStore.GetAllOrgs(ctx)
|
||||
@ -123,6 +166,15 @@ func (ms *migrationService) migrateAllOrgs(ctx context.Context) error {
|
||||
|
||||
for _, o := range orgs {
|
||||
om := ms.newOrgMigration(o.ID)
|
||||
migrated, err := ms.migrationStore.IsMigrated(ctx, o.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting migration status for org %d: %w", o.ID, err)
|
||||
}
|
||||
if migrated {
|
||||
om.log.Info("Org already migrated, skipping")
|
||||
continue
|
||||
}
|
||||
|
||||
if err := om.migrateOrg(ctx); err != nil {
|
||||
return fmt.Errorf("migrate org %d: %w", o.ID, err)
|
||||
}
|
||||
@ -131,6 +183,11 @@ func (ms *migrationService) migrateAllOrgs(ctx context.Context) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("set org migration state: %w", err)
|
||||
}
|
||||
|
||||
err = ms.migrationStore.SetMigrated(ctx, o.ID, true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("setting migration status: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -2,11 +2,15 @@ package migration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
migrationStore "github.com/grafana/grafana/pkg/services/ngalert/migration/store"
|
||||
"xorm.io/xorm"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/db"
|
||||
legacymodels "github.com/grafana/grafana/pkg/services/alerting/models"
|
||||
"github.com/grafana/grafana/pkg/services/dashboards"
|
||||
@ -44,26 +48,23 @@ func TestServiceRevert(t *testing.T) {
|
||||
// Run migration.
|
||||
ctx := context.Background()
|
||||
cfg := &setting.Cfg{
|
||||
ForceMigration: true,
|
||||
UnifiedAlerting: setting.UnifiedAlertingSettings{
|
||||
Enabled: pointer(true),
|
||||
},
|
||||
}
|
||||
service := NewTestMigrationService(t, sqlStore, cfg)
|
||||
|
||||
err = service.migrationStore.SetMigrated(ctx, false)
|
||||
err = service.migrationStore.SetCurrentAlertingType(ctx, migrationStore.Legacy)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = service.Run(ctx)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, service.Run(ctx))
|
||||
|
||||
// Verify migration was run.
|
||||
migrated, err := service.migrationStore.IsMigrated(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, true, migrated)
|
||||
checkAlertingType(t, ctx, service, migrationStore.UnifiedAlerting)
|
||||
checkMigrationStatus(t, ctx, service, 1, true)
|
||||
|
||||
// Currently, we fill in some random data for tables that aren't populated during migration.
|
||||
_, err = x.Table("ngalert_configuration").Insert(models.AdminConfiguration{})
|
||||
_, err = x.Table("ngalert_configuration").Insert(models.AdminConfiguration{OrgID: 1})
|
||||
require.NoError(t, err)
|
||||
_, err = x.Table("alert_instance").Insert(models.AlertInstance{
|
||||
AlertInstanceKey: models.AlertInstanceKey{
|
||||
@ -79,34 +80,32 @@ func TestServiceRevert(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify various UA resources exist
|
||||
tables := []string{
|
||||
"alert_rule",
|
||||
"alert_rule_version",
|
||||
"alert_configuration",
|
||||
"ngalert_configuration",
|
||||
"alert_instance",
|
||||
tables := [][2]string{
|
||||
{"alert_rule", "org_id"},
|
||||
{"alert_rule_version", "rule_org_id"},
|
||||
{"alert_configuration", "org_id"},
|
||||
{"ngalert_configuration", "org_id"},
|
||||
{"alert_instance", "rule_org_id"},
|
||||
}
|
||||
for _, table := range tables {
|
||||
count, err := x.Table(table).Count()
|
||||
require.NoError(t, err)
|
||||
require.True(t, count > 0, "table %s should have at least one row", table)
|
||||
count, err := x.Table(table[0]).Where(fmt.Sprintf("%s=?", table[1]), 1).Count()
|
||||
require.NoErrorf(t, err, "table %s error", table[0])
|
||||
require.True(t, count > 0, "table %s should have at least one row", table[0])
|
||||
}
|
||||
|
||||
// Revert migration.
|
||||
service.cfg.UnifiedAlerting.Enabled = pointer(false)
|
||||
err = service.Run(context.Background())
|
||||
err = service.migrationStore.RevertAllOrgs(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify revert was run.
|
||||
migrated, err = service.migrationStore.IsMigrated(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, false, migrated)
|
||||
checkAlertingType(t, ctx, service, migrationStore.Legacy)
|
||||
checkMigrationStatus(t, ctx, service, 1, false)
|
||||
|
||||
// Verify various UA resources are gone
|
||||
for _, table := range tables {
|
||||
count, err := x.Table(table).Count()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(0), count, "table %s should have no rows", table)
|
||||
count, err := x.Table(table[0]).Where(fmt.Sprintf("%s=?", table[1]), 1).Count()
|
||||
require.NoErrorf(t, err, "table %s error", table[0])
|
||||
require.Equal(t, int64(0), count, "table %s should have no rows", table[0])
|
||||
}
|
||||
})
|
||||
|
||||
@ -125,23 +124,20 @@ func TestServiceRevert(t *testing.T) {
|
||||
// Run migration.
|
||||
ctx := context.Background()
|
||||
cfg := &setting.Cfg{
|
||||
ForceMigration: true,
|
||||
UnifiedAlerting: setting.UnifiedAlertingSettings{
|
||||
Enabled: pointer(true),
|
||||
},
|
||||
}
|
||||
service := NewTestMigrationService(t, sqlStore, cfg)
|
||||
|
||||
err = service.migrationStore.SetMigrated(ctx, false)
|
||||
err = service.migrationStore.SetCurrentAlertingType(ctx, migrationStore.Legacy)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = service.Run(ctx)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, service.Run(ctx))
|
||||
|
||||
// Verify migration was run.
|
||||
migrated, err := service.migrationStore.IsMigrated(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, true, migrated)
|
||||
checkAlertingType(t, ctx, service, migrationStore.UnifiedAlerting)
|
||||
checkMigrationStatus(t, ctx, service, 1, true)
|
||||
|
||||
// Verify we created some folders.
|
||||
newDashCount, err := x.Table("dashboard").Count(&dashboards.Dashboard{})
|
||||
@ -163,14 +159,12 @@ func TestServiceRevert(t *testing.T) {
|
||||
}
|
||||
|
||||
// Revert migration.
|
||||
service.cfg.UnifiedAlerting.Enabled = pointer(false)
|
||||
err = service.Run(context.Background())
|
||||
err = service.migrationStore.RevertAllOrgs(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify revert was run.
|
||||
migrated, err = service.migrationStore.IsMigrated(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, false, migrated)
|
||||
// Verify revert was run. Should only set migration status for org.
|
||||
checkAlertingType(t, ctx, service, migrationStore.Legacy)
|
||||
checkMigrationStatus(t, ctx, service, 1, false)
|
||||
|
||||
// Verify we are back to the original count.
|
||||
newDashCount, err = x.Table("dashboard").Count(&dashboards.Dashboard{})
|
||||
@ -210,19 +204,17 @@ func TestServiceRevert(t *testing.T) {
|
||||
}
|
||||
service := NewTestMigrationService(t, sqlStore, cfg)
|
||||
|
||||
err = service.migrationStore.SetMigrated(ctx, false)
|
||||
err = service.migrationStore.SetCurrentAlertingType(ctx, migrationStore.Legacy)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = service.Run(ctx)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, service.Run(ctx))
|
||||
|
||||
// Verify migration was run.
|
||||
migrated, err := service.migrationStore.IsMigrated(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, true, migrated)
|
||||
checkAlertingType(t, ctx, service, migrationStore.UnifiedAlerting)
|
||||
checkMigrationStatus(t, ctx, service, 1, true)
|
||||
|
||||
// Verify we created some folders.
|
||||
newDashCount, err := x.Table("dashboard").Count(&dashboards.Dashboard{})
|
||||
newDashCount, err := x.Table("dashboard").Count(&dashboards.Dashboard{OrgID: 1})
|
||||
require.NoError(t, err)
|
||||
require.Truef(t, newDashCount > dashCount, "newDashCount: %d should be greater than dashCount: %d", newDashCount, dashCount)
|
||||
|
||||
@ -257,17 +249,15 @@ func TestServiceRevert(t *testing.T) {
|
||||
require.NotNil(t, newF)
|
||||
|
||||
// Revert migration.
|
||||
service.cfg.UnifiedAlerting.Enabled = pointer(false)
|
||||
err = service.Run(ctx)
|
||||
err = service.migrationStore.RevertAllOrgs(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify revert was run.
|
||||
migrated, err = service.migrationStore.IsMigrated(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, false, migrated)
|
||||
// Verify revert was run. Should only set migration status for org.
|
||||
checkAlertingType(t, ctx, service, migrationStore.Legacy)
|
||||
checkMigrationStatus(t, ctx, service, 1, false)
|
||||
|
||||
// Verify we are back to the original count + 2.
|
||||
newDashCount, err = x.Table("dashboard").Count(&dashboards.Dashboard{})
|
||||
newDashCount, err = x.Table("dashboard").Count(&dashboards.Dashboard{OrgID: 1})
|
||||
require.NoError(t, err)
|
||||
require.Equalf(t, dashCount+2, newDashCount, "newDashCount: %d should be equal to dashCount + 2: %d after revert", newDashCount, dashCount)
|
||||
|
||||
@ -289,4 +279,76 @@ func TestServiceRevert(t *testing.T) {
|
||||
require.Nil(t, getDashboard(t, x, 1, uid))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("ForceMigration story", func(t *testing.T) {
|
||||
sqlStore := db.InitTestDB(t)
|
||||
x := sqlStore.GetEngine()
|
||||
|
||||
setupLegacyAlertsTables(t, x, channels, alerts, folders, dashes)
|
||||
|
||||
ctx := context.Background()
|
||||
cfg := &setting.Cfg{
|
||||
UnifiedAlerting: setting.UnifiedAlertingSettings{
|
||||
Enabled: pointer(true),
|
||||
},
|
||||
}
|
||||
service := NewTestMigrationService(t, sqlStore, cfg)
|
||||
checkAlertingType(t, ctx, service, migrationStore.Legacy)
|
||||
checkMigrationStatus(t, ctx, service, 1, false)
|
||||
checkAlertRulesCount(t, x, 1, 0)
|
||||
|
||||
// Enable UA.
|
||||
// First run should migrate org.
|
||||
require.NoError(t, service.Run(ctx))
|
||||
checkAlertingType(t, ctx, service, migrationStore.UnifiedAlerting)
|
||||
checkMigrationStatus(t, ctx, service, 1, true)
|
||||
checkAlertRulesCount(t, x, 1, 1)
|
||||
|
||||
// Disable UA without ForceMigration.
|
||||
// This run should throw an error.
|
||||
service.cfg.UnifiedAlerting.Enabled = pointer(false)
|
||||
require.ErrorContains(t, service.Run(ctx), ForceMigrationError.Error())
|
||||
checkAlertingType(t, ctx, service, migrationStore.UnifiedAlerting)
|
||||
checkMigrationStatus(t, ctx, service, 1, true)
|
||||
checkAlertRulesCount(t, x, 1, 1)
|
||||
|
||||
// Disable UA with force flag.
|
||||
// This run should not revert UA data.
|
||||
service.cfg.UnifiedAlerting.Enabled = pointer(false)
|
||||
service.cfg.ForceMigration = true
|
||||
require.NoError(t, service.Run(ctx))
|
||||
checkAlertingType(t, ctx, service, migrationStore.Legacy)
|
||||
checkMigrationStatus(t, ctx, service, 1, false)
|
||||
checkAlertRulesCount(t, x, 1, 0) // Alerts are gone.
|
||||
|
||||
// Add another alert.
|
||||
_, alertErr := x.Insert(createAlert(t, 1, 1, 2, "alert2", []string{"notifier1"}))
|
||||
require.NoError(t, alertErr)
|
||||
|
||||
// Enable UA.
|
||||
// This run should remigrate org, new alert is migrated.
|
||||
service.cfg.UnifiedAlerting.Enabled = pointer(true)
|
||||
require.NoError(t, service.Run(ctx))
|
||||
checkAlertingType(t, ctx, service, migrationStore.UnifiedAlerting)
|
||||
checkMigrationStatus(t, ctx, service, 1, true)
|
||||
checkAlertRulesCount(t, x, 1, 2) // Now we have 2
|
||||
})
|
||||
}
|
||||
|
||||
func checkMigrationStatus(t *testing.T, ctx context.Context, service *migrationService, orgID int64, expected bool) {
|
||||
migrated, err := service.migrationStore.IsMigrated(ctx, orgID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expected, migrated)
|
||||
}
|
||||
|
||||
func checkAlertingType(t *testing.T, ctx context.Context, service *migrationService, expected migrationStore.AlertingType) {
|
||||
aType, err := service.migrationStore.GetCurrentAlertingType(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expected, aType)
|
||||
}
|
||||
|
||||
func checkAlertRulesCount(t *testing.T, x *xorm.Engine, orgID int64, count int) {
|
||||
cnt, err := x.Table("alert_rule").Where("org_id=?", orgID).Count()
|
||||
require.NoError(t, err, "table alert_rule error")
|
||||
require.Equal(t, int(cnt), count, "table alert_rule should have no rows")
|
||||
}
|
||||
|
@ -55,8 +55,10 @@ type Store interface {
|
||||
GetFolder(ctx context.Context, cmd *folder.GetFolderQuery) (*folder.Folder, error)
|
||||
CreateFolder(ctx context.Context, cmd *folder.CreateFolderCommand) (*folder.Folder, error)
|
||||
|
||||
IsMigrated(ctx context.Context) (bool, error)
|
||||
SetMigrated(ctx context.Context, migrated bool) error
|
||||
IsMigrated(ctx context.Context, orgID int64) (bool, error)
|
||||
SetMigrated(ctx context.Context, orgID int64, migrated bool) error
|
||||
GetCurrentAlertingType(ctx context.Context) (AlertingType, error)
|
||||
SetCurrentAlertingType(ctx context.Context, t AlertingType) error
|
||||
GetOrgMigrationState(ctx context.Context, orgID int64) (*migmodels.OrgMigrationState, error)
|
||||
SetOrgMigrationState(ctx context.Context, orgID int64, summary *migmodels.OrgMigrationState) error
|
||||
|
||||
@ -122,11 +124,12 @@ const migratedKey = "migrated"
|
||||
// stateKey is the kvstore key used for the OrgMigrationState.
|
||||
const stateKey = "stateKey"
|
||||
|
||||
const anyOrg = 0
|
||||
// typeKey is the kvstore key used for the current AlertingType.
|
||||
const typeKey = "currentAlertingType"
|
||||
|
||||
// IsMigrated returns the migration status from the kvstore.
|
||||
func (ms *migrationStore) IsMigrated(ctx context.Context) (bool, error) {
|
||||
kv := kvstore.WithNamespace(ms.kv, anyOrg, KVNamespace)
|
||||
func (ms *migrationStore) IsMigrated(ctx context.Context, orgID int64) (bool, error) {
|
||||
kv := kvstore.WithNamespace(ms.kv, orgID, KVNamespace)
|
||||
content, exists, err := kv.Get(ctx, migratedKey)
|
||||
if err != nil {
|
||||
return false, err
|
||||
@ -140,11 +143,60 @@ func (ms *migrationStore) IsMigrated(ctx context.Context) (bool, error) {
|
||||
}
|
||||
|
||||
// SetMigrated sets the migration status in the kvstore.
|
||||
func (ms *migrationStore) SetMigrated(ctx context.Context, migrated bool) error {
|
||||
kv := kvstore.WithNamespace(ms.kv, anyOrg, KVNamespace)
|
||||
func (ms *migrationStore) SetMigrated(ctx context.Context, orgID int64, migrated bool) error {
|
||||
kv := kvstore.WithNamespace(ms.kv, orgID, KVNamespace)
|
||||
return kv.Set(ctx, migratedKey, strconv.FormatBool(migrated))
|
||||
}
|
||||
|
||||
// AlertingType represents the current alerting type of Grafana. This is used to detect transitions between
|
||||
// Legacy and UnifiedAlerting by comparing to the desired type in the configuration.
|
||||
type AlertingType string
|
||||
|
||||
const (
|
||||
Legacy AlertingType = "Legacy"
|
||||
UnifiedAlerting AlertingType = "UnifiedAlerting"
|
||||
)
|
||||
|
||||
// typeFromString converts a string to an AlertingType.
|
||||
func typeFromString(s string) (AlertingType, error) {
|
||||
switch s {
|
||||
case "Legacy":
|
||||
return Legacy, nil
|
||||
case "UnifiedAlerting":
|
||||
return UnifiedAlerting, nil
|
||||
default:
|
||||
return "", fmt.Errorf("unknown alerting type: %s", s)
|
||||
}
|
||||
}
|
||||
|
||||
const anyOrg = 0
|
||||
|
||||
// GetCurrentAlertingType returns the current AlertingType of Grafana.
|
||||
func (ms *migrationStore) GetCurrentAlertingType(ctx context.Context) (AlertingType, error) {
|
||||
kv := kvstore.WithNamespace(ms.kv, anyOrg, KVNamespace)
|
||||
content, exists, err := kv.Get(ctx, typeKey)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if !exists {
|
||||
return Legacy, nil
|
||||
}
|
||||
|
||||
t, err := typeFromString(content)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return t, nil
|
||||
}
|
||||
|
||||
// SetCurrentAlertingType stores the current AlertingType of Grafana.
|
||||
func (ms *migrationStore) SetCurrentAlertingType(ctx context.Context, t AlertingType) error {
|
||||
kv := kvstore.WithNamespace(ms.kv, anyOrg, KVNamespace)
|
||||
return kv.Set(ctx, typeKey, string(t))
|
||||
}
|
||||
|
||||
// GetOrgMigrationState returns a summary of a previous migration.
|
||||
func (ms *migrationStore) GetOrgMigrationState(ctx context.Context, orgID int64) (*migmodels.OrgMigrationState, error) {
|
||||
kv := kvstore.WithNamespace(ms.kv, orgID, KVNamespace)
|
||||
@ -224,64 +276,62 @@ var revertPermissions = []accesscontrol.Permission{
|
||||
}
|
||||
|
||||
// RevertAllOrgs reverts the migration, deleting all unified alerting resources such as alert rules, alertmanager configurations, and silence files.
|
||||
// In addition, it will delete all folders and permissions originally created by this migration, these are stored in the kvstore.
|
||||
// In addition, it will delete all folders and permissions originally created by this migration, as well as the various migration statuses stored
|
||||
// in kvstore, both org-specific and anyOrg.
|
||||
func (ms *migrationStore) RevertAllOrgs(ctx context.Context) error {
|
||||
return ms.store.WithTransactionalDbSession(ctx, func(sess *db.Session) error {
|
||||
if _, err := sess.Exec("DELETE FROM alert_rule"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := sess.Exec("DELETE FROM alert_rule_version"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
orgs, err := ms.GetAllOrgs(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get orgs: %w", err)
|
||||
}
|
||||
for _, o := range orgs {
|
||||
if err := ms.DeleteMigratedFolders(ctx, o.ID); err != nil {
|
||||
ms.log.Warn("Failed to delete migrated folders", "orgID", o.ID, "err", err)
|
||||
continue
|
||||
return ms.store.InTransaction(ctx, func(ctx context.Context) error {
|
||||
return ms.store.WithDbSession(ctx, func(sess *db.Session) error {
|
||||
if _, err := sess.Exec("DELETE FROM alert_rule"); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := sess.Exec("DELETE FROM alert_configuration"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := sess.Exec("DELETE FROM ngalert_configuration"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := sess.Exec("DELETE FROM alert_instance"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := sess.Exec("DELETE FROM kv_store WHERE namespace = ?", notifier.KVNamespace); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := sess.Exec("DELETE FROM kv_store WHERE namespace = ?", KVNamespace); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
files, err := filepath.Glob(filepath.Join(ms.cfg.DataPath, "alerting", "*", "silences"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, f := range files {
|
||||
if err := os.Remove(f); err != nil {
|
||||
ms.log.Error("Failed to remove silence file", "file", f, "err", err)
|
||||
if _, err := sess.Exec("DELETE FROM alert_rule_version"); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
err = ms.SetMigrated(ctx, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("setting migration status: %w", err)
|
||||
}
|
||||
orgs, err := ms.GetAllOrgs(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get orgs: %w", err)
|
||||
}
|
||||
for _, o := range orgs {
|
||||
if err := ms.DeleteMigratedFolders(ctx, o.ID); err != nil {
|
||||
ms.log.Warn("Failed to delete migrated folders", "orgID", o.ID, "err", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
if _, err := sess.Exec("DELETE FROM alert_configuration"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := sess.Exec("DELETE FROM ngalert_configuration"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := sess.Exec("DELETE FROM alert_instance"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := sess.Exec("DELETE FROM kv_store WHERE namespace = ?", notifier.KVNamespace); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := sess.Exec("DELETE FROM kv_store WHERE namespace = ?", KVNamespace); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
files, err := filepath.Glob(filepath.Join(ms.cfg.DataPath, "alerting", "*", "silences"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, f := range files {
|
||||
if err := os.Remove(f); err != nil {
|
||||
ms.log.Error("Failed to remove silence file", "file", f, "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
|
||||
"github.com/grafana/grafana/pkg/api/routing"
|
||||
"github.com/grafana/grafana/pkg/bus"
|
||||
"github.com/grafana/grafana/pkg/infra/kvstore"
|
||||
"github.com/grafana/grafana/pkg/infra/localcache"
|
||||
"github.com/grafana/grafana/pkg/infra/log/logtest"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
@ -25,7 +26,6 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/guardian"
|
||||
"github.com/grafana/grafana/pkg/services/licensing/licensingtest"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/store"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/tests/fakes"
|
||||
"github.com/grafana/grafana/pkg/services/org/orgimpl"
|
||||
"github.com/grafana/grafana/pkg/services/quota/quotatest"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||
@ -36,7 +36,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
)
|
||||
|
||||
func NewTestMigrationStore(t *testing.T, sqlStore *sqlstore.SQLStore, cfg *setting.Cfg) *migrationStore {
|
||||
func NewTestMigrationStore(t testing.TB, sqlStore *sqlstore.SQLStore, cfg *setting.Cfg) *migrationStore {
|
||||
if cfg.UnifiedAlerting.BaseInterval == 0 {
|
||||
cfg.UnifiedAlerting.BaseInterval = time.Second * 10
|
||||
}
|
||||
@ -45,6 +45,7 @@ func NewTestMigrationStore(t *testing.T, sqlStore *sqlstore.SQLStore, cfg *setti
|
||||
alertingStore := store.DBstore{
|
||||
SQLStore: sqlStore,
|
||||
Cfg: cfg.UnifiedAlerting,
|
||||
Logger: &logtest.Fake{},
|
||||
}
|
||||
bus := bus.ProvideBus(tracing.InitializeTracerForTest())
|
||||
folderStore := folderimpl.ProvideDashboardFolderStore(sqlStore)
|
||||
@ -90,7 +91,7 @@ func NewTestMigrationStore(t *testing.T, sqlStore *sqlstore.SQLStore, cfg *setti
|
||||
log: &logtest.Fake{},
|
||||
cfg: cfg,
|
||||
store: sqlStore,
|
||||
kv: fakes.NewFakeKVStore(t),
|
||||
kv: kvstore.ProvideService(sqlStore),
|
||||
alertingStore: &alertingStore,
|
||||
dashboardService: dashboardService,
|
||||
folderService: folderService,
|
||||
|
@ -111,6 +111,8 @@ func (*OSSMigrations) AddMigration(mg *Migrator) {
|
||||
dashboardFolderMigrations.AddDashboardFolderMigrations(mg)
|
||||
|
||||
ssosettings.AddMigration(mg)
|
||||
|
||||
ualert.CreateOrgMigratedKVStoreEntries(mg)
|
||||
}
|
||||
|
||||
func addStarMigrations(mg *Migrator) {
|
||||
|
102
pkg/services/sqlstore/migrations/ualert/org_upgrade_state_mig.go
Normal file
102
pkg/services/sqlstore/migrations/ualert/org_upgrade_state_mig.go
Normal file
@ -0,0 +1,102 @@
|
||||
package ualert
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"xorm.io/xorm"
|
||||
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
||||
)
|
||||
|
||||
// typeKey is a vendored migration.typeKey.
|
||||
var typeKey = "currentAlertingType"
|
||||
|
||||
// AlertingType is a vendored migration.store.AlertingType.
|
||||
type alertingType string
|
||||
|
||||
const (
|
||||
Legacy alertingType = "Legacy"
|
||||
UnifiedAlerting alertingType = "UnifiedAlerting"
|
||||
)
|
||||
|
||||
// CreateOrgMigratedKVStoreEntries creates kv store entries for each organization if the migration has been run.
|
||||
// This is needed now that we've changed the semantics of data loss when upgrading / rolling back. If a user who previously
|
||||
// upgraded were to rollback and upgrade again without clean_upgrade, then since they don't have org-level migrated states
|
||||
// it will attempt to upgrade their orgs as if they had never upgraded before. This will almost definitely fail with
|
||||
// duplicate key errors.
|
||||
//
|
||||
// In addition, this changes the entry for orgId=0 to be better named as it no longer tracks whether the
|
||||
// migration has been run, but rather the current alerting type of Grafana; Legacy or UnifiedAlerting. This is used to
|
||||
// detect transitions between Legacy and UnifiedAlerting by comparing to the desired type in the configuration.
|
||||
func CreateOrgMigratedKVStoreEntries(mg *migrator.Migrator) {
|
||||
mg.AddMigration("copy kvstore migration status to each org", &createOrgMigratedKVStoreEntries{})
|
||||
}
|
||||
|
||||
type createOrgMigratedKVStoreEntries struct {
|
||||
migrator.MigrationBase
|
||||
}
|
||||
|
||||
func (c createOrgMigratedKVStoreEntries) SQL(migrator.Dialect) string {
|
||||
return codeMigration
|
||||
}
|
||||
|
||||
func (c createOrgMigratedKVStoreEntries) Exec(sess *xorm.Session, mg *migrator.Migrator) error {
|
||||
var anyOrg int64 = 0
|
||||
migrated := kvStoreV1Entry{
|
||||
OrgID: &anyOrg,
|
||||
Namespace: &KVNamespace,
|
||||
Key: &migratedKey,
|
||||
}
|
||||
has, err := sess.Table("kv_store").Get(&migrated)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !has {
|
||||
mg.Logger.Debug("No migrated status in kvstore, nothing to set")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Rename old entry key and value.
|
||||
val := Legacy
|
||||
if migrated.Value == "true" {
|
||||
val = UnifiedAlerting
|
||||
}
|
||||
if _, err := sess.Table("kv_store").Where("id = ?", migrated.ID).Update(&kvStoreV1Entry{
|
||||
Key: &typeKey,
|
||||
Value: string(val),
|
||||
}); err != nil {
|
||||
mg.Logger.Error("failed to rename org migrated status in kvstore", "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
var orgs []struct {
|
||||
ID int64 `xorm:"id"`
|
||||
}
|
||||
if err := sess.SQL("select id from org").Find(&orgs); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(orgs) == 0 {
|
||||
mg.Logger.Debug("no orgs, nothing to set in kvstore")
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, org := range orgs {
|
||||
id := org.ID
|
||||
entry := kvStoreV1Entry{
|
||||
OrgID: &id,
|
||||
Namespace: &KVNamespace,
|
||||
Key: &migratedKey,
|
||||
Value: migrated.Value,
|
||||
Created: migrated.Created,
|
||||
Updated: migrated.Updated,
|
||||
}
|
||||
if _, errCreate := sess.Table("kv_store").Insert(&entry); errCreate != nil {
|
||||
mg.Logger.Error("failed to insert org migration status to kvstore", "err", errCreate)
|
||||
return fmt.Errorf("failed to insert org migration status to kvstore: %w", errCreate)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
Loading…
Reference in New Issue
Block a user