From 0e7640475fccfcc74bfe7909b5cd963e83775705 Mon Sep 17 00:00:00 2001 From: Alexander Weaver Date: Wed, 4 Jan 2023 10:43:26 -0600 Subject: [PATCH] Alerting: Store alertmanager configuration history in a separate table in the database (#60492) * Update config store to split between active and history tables * Migrations to fix up indexes * Implement migration from old format to new * Move add migrations call * Delete duplicated rows * Explicitly map fields * Quote the column name because it's a reserved word * Lift migrations to top * Use XORM for nearly everything, avoid any non trivial raw SQL * Touch up indexes and zero out IDs on move * Drop TODO that's already completed * Fix assignment of IDs --- pkg/services/ngalert/store/alertmanager.go | 126 ++++-------------- .../sqlstore/migrations/ualert/tables.go | 1 + .../sqlstore/migrations/ualert/ualert.go | 77 +++++++++++ 3 files changed, 104 insertions(+), 100 deletions(-) diff --git a/pkg/services/ngalert/store/alertmanager.go b/pkg/services/ngalert/store/alertmanager.go index da3d90cc706..20db7d31dfe 100644 --- a/pkg/services/ngalert/store/alertmanager.go +++ b/pkg/services/ngalert/store/alertmanager.go @@ -6,9 +6,6 @@ import ( "fmt" "time" - "xorm.io/builder" - "xorm.io/core" - "github.com/grafana/grafana/pkg/infra/db" "github.com/grafana/grafana/pkg/services/ngalert/models" ) @@ -31,7 +28,7 @@ func (st *DBstore) GetLatestAlertmanagerConfiguration(ctx context.Context, query return st.SQLStore.WithDbSession(ctx, func(sess *db.Session) error { c := &models.AlertConfiguration{} // The ID is already an auto incremental column, using the ID as an order should guarantee the latest. - ok, err := sess.Desc("id").Where("org_id = ?", query.OrgID).Limit(1).Get(c) + ok, err := sess.Table("alert_configuration").Where("org_id = ?", query.OrgID).Get(c) if err != nil { return err } @@ -49,8 +46,7 @@ func (st *DBstore) GetLatestAlertmanagerConfiguration(ctx context.Context, query func (st *DBstore) GetAllLatestAlertmanagerConfiguration(ctx context.Context) ([]*models.AlertConfiguration, error) { var result []*models.AlertConfiguration err := st.SQLStore.WithDbSession(ctx, func(sess *db.Session) error { - condition := builder.In("id", builder.Select("MAX(id)").From("alert_configuration").GroupBy("org_id")) - if err := sess.Table("alert_configuration").Where(condition).Find(&result); err != nil { + if err := sess.Table("alert_configuration").Find(&result); err != nil { return err } return nil @@ -78,10 +74,23 @@ func (st DBstore) SaveAlertmanagerConfigurationWithCallback(ctx context.Context, ConfigurationVersion: cmd.ConfigurationVersion, Default: cmd.Default, OrgID: cmd.OrgID, + CreatedAt: time.Now().Unix(), } - if _, err := sess.Insert(config); err != nil { + // TODO: If we are more structured around how we seed configurations in the future, this can be a pure update instead of upsert. This should improve perf and code clarity. + upsertSQL := st.SQLStore.GetDialect().UpsertSQL( + "alert_configuration", + []string{"org_id"}, + []string{"alertmanager_configuration", "configuration_version", "created_at", "default", "org_id", "configuration_hash"}, + ) + params := append(make([]interface{}, 0), cmd.AlertmanagerConfiguration, cmd.ConfigurationVersion, config.CreatedAt, config.Default, config.OrgID, config.ConfigurationHash) + if _, err := sess.SQL(upsertSQL, params...).Query(); err != nil { return err } + + if _, err := sess.Table("alert_configuration_history").Insert(config); err != nil { + return err + } + if _, err := st.deleteOldConfigurations(ctx, cmd.OrgID, ConfigRecordsLimit); err != nil { st.Logger.Warn("failed to delete old am configs", "org", cmd.OrgID, "error", err) } @@ -93,6 +102,7 @@ func (st DBstore) SaveAlertmanagerConfigurationWithCallback(ctx context.Context, }) } +// UpdateAlertmanagerConfiguration replaces an alertmanager configuration with optimistic locking. It assumes that an existing revision of the configuration exists in the store, and will return an error otherwise. func (st *DBstore) UpdateAlertmanagerConfiguration(ctx context.Context, cmd *models.SaveAlertmanagerConfigurationCmd) error { return st.SQLStore.WithTransactionalDbSession(ctx, func(sess *db.Session) error { config := models.AlertConfiguration{ @@ -103,109 +113,25 @@ func (st *DBstore) UpdateAlertmanagerConfiguration(ctx context.Context, cmd *mod OrgID: cmd.OrgID, CreatedAt: time.Now().Unix(), } - res, err := sess.Exec(fmt.Sprintf(getInsertQuery(st.SQLStore.GetDialect().DriverName()), st.SQLStore.GetDialect().Quote("default")), - config.AlertmanagerConfiguration, - config.ConfigurationHash, - config.ConfigurationVersion, - config.OrgID, - config.CreatedAt, - st.SQLStore.GetDialect().BooleanStr(config.Default), - cmd.OrgID, - cmd.OrgID, - cmd.FetchedConfigurationHash, - ) - if err != nil { - return err - } - rows, err := res.RowsAffected() + rows, err := sess.Table("alert_configuration"). + Where("org_id = ? AND configuration_hash = ?", config.OrgID, cmd.FetchedConfigurationHash). + Update(config) if err != nil { return err } if rows == 0 { return ErrVersionLockedObjectNotFound } + if _, err := sess.Table("alert_configuration_history").Insert(config); err != nil { + return err + } if _, err := st.deleteOldConfigurations(ctx, cmd.OrgID, ConfigRecordsLimit); err != nil { st.Logger.Warn("failed to delete old am configs", "org", cmd.OrgID, "error", err) } - return err + return nil }) } -// getInsertQuery is used to determinate the insert query for the alertmanager config -// based on the provided sql driver. This is necesarry as such an advanced query -// is not supported by our ORM and we need to generate it manually for each SQL dialect. -// We introduced this as part of a bug fix as the old approach wasn't working. -// Rel: https://github.com/grafana/grafana/issues/51356 -func getInsertQuery(driver string) string { - switch driver { - case core.MYSQL: - return ` - INSERT INTO alert_configuration - (alertmanager_configuration, configuration_hash, configuration_version, org_id, created_at, %s) - SELECT T.* FROM (SELECT ? AS alertmanager_configuration,? AS configuration_hash,? AS configuration_version,? AS org_id,? AS created_at,? AS 'default') AS T - WHERE - EXISTS ( - SELECT 1 - FROM alert_configuration - WHERE - org_id = ? - AND - id = (SELECT MAX(id) FROM alert_configuration WHERE org_id = ?) - AND - configuration_hash = ? - )` - case core.POSTGRES: - return ` - INSERT INTO alert_configuration - (alertmanager_configuration, configuration_hash, configuration_version, org_id, created_at, %s) - SELECT T.* FROM (VALUES($1,$2,$3,$4::bigint,$5::integer,$6::boolean)) AS T - WHERE - EXISTS ( - SELECT 1 - FROM alert_configuration - WHERE - org_id = $7 - AND - id = (SELECT MAX(id) FROM alert_configuration WHERE org_id = $8::bigint) - AND - configuration_hash = $9 - )` - case core.SQLITE: - return ` - INSERT INTO alert_configuration - (alertmanager_configuration, configuration_hash, configuration_version, org_id, created_at, %s) - SELECT T.* FROM (VALUES(?,?,?,?,?,?)) AS T - WHERE - EXISTS ( - SELECT 1 - FROM alert_configuration - WHERE - org_id = ? - AND - id = (SELECT MAX(id) FROM alert_configuration WHERE org_id = ?) - AND - configuration_hash = ? - )` - default: - // SQLite version - return ` - INSERT INTO alert_configuration - (alertmanager_configuration, configuration_hash, configuration_version, org_id, created_at, %s) - SELECT T.* FROM (VALUES(?,?,?,?,?,?)) AS T - WHERE - EXISTS ( - SELECT 1 - FROM alert_configuration - WHERE - org_id = ? - AND - id = (SELECT MAX(id) FROM alert_configuration WHERE org_id = ?) - AND - configuration_hash = ? - )` - } -} - func (st *DBstore) deleteOldConfigurations(ctx context.Context, orgID int64, limit int) (int64, error) { if limit < 1 { return 0, fmt.Errorf("failed to delete old configurations: limit is set to '%d' but needs to be > 0", limit) @@ -218,7 +144,7 @@ func (st *DBstore) deleteOldConfigurations(ctx context.Context, orgID int64, lim var affectedRows int64 err := st.SQLStore.WithDbSession(ctx, func(sess *db.Session) error { highest := &models.AlertConfiguration{} - ok, err := sess.Desc("id").Where("org_id = ?", orgID).OrderBy("id").Limit(1, limit-1).Get(highest) + ok, err := sess.Table("alert_configuration_history").Desc("id").Where("org_id = ?", orgID).OrderBy("id").Limit(1, limit-1).Get(highest) if err != nil { return err } @@ -237,7 +163,7 @@ func (st *DBstore) deleteOldConfigurations(ctx context.Context, orgID int64, lim res, err := sess.Exec(` DELETE FROM - alert_configuration + alert_configuration_history WHERE org_id = ? AND diff --git a/pkg/services/sqlstore/migrations/ualert/tables.go b/pkg/services/sqlstore/migrations/ualert/tables.go index 875f9f68894..24f12167a29 100644 --- a/pkg/services/sqlstore/migrations/ualert/tables.go +++ b/pkg/services/sqlstore/migrations/ualert/tables.go @@ -35,6 +35,7 @@ func AddTablesMigrations(mg *migrator.Migrator) { AddAlertImageMigrations(mg) AddAlertmanagerConfigHistoryMigrations(mg) + ExtractAlertmanagerConfigurationHistoryMigration(mg) } // AddAlertDefinitionMigrations should not be modified. diff --git a/pkg/services/sqlstore/migrations/ualert/ualert.go b/pkg/services/sqlstore/migrations/ualert/ualert.go index 29765e1ccd1..d2073b9f023 100644 --- a/pkg/services/sqlstore/migrations/ualert/ualert.go +++ b/pkg/services/sqlstore/migrations/ualert/ualert.go @@ -927,3 +927,80 @@ func (s *uidSet) generateUid() (string, error) { return "", errors.New("failed to generate UID") } + +func ExtractAlertmanagerConfigurationHistoryMigration(mg *migrator.Migrator) { + if !mg.Cfg.UnifiedAlerting.IsEnabled() { + return + } + // Since it's not always consistent as to what state the org ID indexes are in, just drop them all and rebuild from scratch. + // This is not expensive since this table is guaranteed to have a small number of rows. + mg.AddMigration("drop non-unique orgID index on alert_configuration", migrator.NewDropIndexMigration(migrator.Table{Name: "alert_configuration"}, &migrator.Index{Cols: []string{"org_id"}})) + mg.AddMigration("drop unique orgID index on alert_configuration if exists", migrator.NewDropIndexMigration(migrator.Table{Name: "alert_configuration"}, &migrator.Index{Type: migrator.UniqueIndex, Cols: []string{"org_id"}})) + mg.AddMigration("extract alertmanager configuration history to separate table", &extractAlertmanagerConfigurationHistory{}) + mg.AddMigration("add unique index on orgID to alert_configuration", migrator.NewAddIndexMigration(migrator.Table{Name: "alert_configuration"}, &migrator.Index{Type: migrator.UniqueIndex, Cols: []string{"org_id"}})) +} + +type extractAlertmanagerConfigurationHistory struct { + migrator.MigrationBase +} + +// extractAMConfigHistoryConfigModel is the model of an alertmanager configuration row, at the time that the extractAlertmanagerConfigurationHistory migration was run. +// This is not to be used outside of the extractAlertmanagerConfigurationHistory migration. +type extractAMConfigHistoryConfigModel struct { + ID int64 `xorm:"pk autoincr 'id'"` + AlertmanagerConfiguration string + ConfigurationHash string + ConfigurationVersion string + CreatedAt int64 `xorm:"created"` + Default bool + OrgID int64 `xorm:"org_id"` +} + +func (c extractAlertmanagerConfigurationHistory) SQL(migrator.Dialect) string { + return codeMigration +} + +func (c extractAlertmanagerConfigurationHistory) Exec(sess *xorm.Session, migrator *migrator.Migrator) error { + var orgs []int64 + if err := sess.Table("alert_configuration").Distinct("org_id").Find(&orgs); err != nil { + return fmt.Errorf("failed to retrieve the organizations with alerting configurations: %w", err) + } + + // Clear out the history table, just in case. It should already be empty. + if _, err := sess.Exec("DELETE FROM alert_configuration_history"); err != nil { + return fmt.Errorf("failed to clear the config history table: %w", err) + } + + for _, orgID := range orgs { + var activeConfigID int64 + has, err := sess.SQL(`SELECT MAX(id) FROM alert_configuration WHERE org_id = ?`, orgID).Get(&activeConfigID) + if err != nil { + return fmt.Errorf("failed to query active config ID for org %d: %w", orgID, err) + } + if !has { + return fmt.Errorf("we previously found a config for org, but later it was unexpectedly missing: %d", orgID) + } + + history := make([]extractAMConfigHistoryConfigModel, 0) + err = sess.Table("alert_configuration").Where("org_id = ? AND id < ?", orgID, activeConfigID).Find(&history) + if err != nil { + return fmt.Errorf("failed to query for non-active configs for org %d: %w", orgID, err) + } + + // Set the IDs back to the default, so XORM will ignore the field and auto-assign them. + for i := range history { + history[i].ID = 0 + } + + _, err = sess.Table("alert_configuration_history").InsertMulti(history) + if err != nil { + return fmt.Errorf("failed to insert historical configs for org: %d: %w", orgID, err) + } + + _, err = sess.Exec("DELETE FROM alert_configuration WHERE org_id = ? AND id < ?", orgID, activeConfigID) + if err != nil { + return fmt.Errorf("failed to evict old configurations for org after moving to history table: %d: %w", orgID, err) + } + } + return nil +}