mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Alerting: Mark AM configuration as applied (#61330)
* Mark AM configuration as applied * add missing checks, make linter happy * fix deadlock, mark as valid on save and on load * mark configurations only if needed * check error after applyConfig() * code review comments * code review changes * more code review changes * clean HistoricConfigFromAlertConfig function
This commit is contained in:
parent
6ad1cfef38
commit
ba731f7865
@ -535,7 +535,7 @@ func createMultiOrgAlertmanager(t *testing.T) *notifier.MultiOrgAlertmanager {
|
||||
}, // do not poll in tests.
|
||||
}
|
||||
|
||||
mam, err := notifier.NewMultiOrgAlertmanager(cfg, &configStore, &orgStore, kvStore, provStore, decryptFn, m.GetMultiOrgAlertmanagerMetrics(), nil, log.New("testlogger"), secretsService)
|
||||
mam, err := notifier.NewMultiOrgAlertmanager(cfg, configStore, &orgStore, kvStore, provStore, decryptFn, m.GetMultiOrgAlertmanagerMetrics(), nil, log.New("testlogger"), secretsService)
|
||||
require.NoError(t, err)
|
||||
err = mam.LoadAndSyncAlertmanagersForOrgs(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
@ -14,6 +14,15 @@ type AlertConfiguration struct {
|
||||
OrgID int64 `xorm:"org_id"`
|
||||
}
|
||||
|
||||
// HistoricAlertConfiguration represents a previously used alerting configuration.
|
||||
type HistoricAlertConfiguration struct {
|
||||
AlertConfiguration `xorm:"extends"`
|
||||
|
||||
// LastApplied a timestamp indicating the most recent time at which the configuration was applied to an Alertmanager, or 0 otherwise.
|
||||
// Only set this field if the configuration has been applied by the caller.
|
||||
LastApplied int64 `xorm:"last_applied"`
|
||||
}
|
||||
|
||||
// GetLatestAlertmanagerConfigurationQuery is the query to get the latest alertmanager configuration.
|
||||
type GetLatestAlertmanagerConfigurationQuery struct {
|
||||
OrgID int64
|
||||
@ -27,4 +36,25 @@ type SaveAlertmanagerConfigurationCmd struct {
|
||||
ConfigurationVersion string
|
||||
Default bool
|
||||
OrgID int64
|
||||
LastApplied int64
|
||||
}
|
||||
|
||||
// MarkConfigurationAsAppliedCmd is the command for marking a previously saved configuration as successfully applied.
|
||||
type MarkConfigurationAsAppliedCmd struct {
|
||||
OrgID int64
|
||||
ConfigurationHash string
|
||||
}
|
||||
|
||||
// GetAppliedConfigurationsQuery is the query for getting configurations that have been previously applied with no errors.
|
||||
type GetAppliedConfigurationsQuery struct {
|
||||
OrgID int64
|
||||
Result []*HistoricAlertConfiguration
|
||||
}
|
||||
|
||||
func HistoricConfigFromAlertConfig(config AlertConfiguration) HistoricAlertConfiguration {
|
||||
// Reset the Id so it can be generated by the DB.
|
||||
config.ID = 0
|
||||
return HistoricAlertConfiguration{
|
||||
AlertConfiguration: config,
|
||||
}
|
||||
}
|
||||
|
@ -154,7 +154,7 @@ func (am *Alertmanager) StopAndWait() {
|
||||
am.Base.StopAndWait()
|
||||
}
|
||||
|
||||
// SaveAndApplyDefaultConfig saves the default configuration the database and applies the configuration to the Alertmanager.
|
||||
// SaveAndApplyDefaultConfig saves the default configuration to the database and applies it to the Alertmanager.
|
||||
// It rolls back the save if we fail to apply the configuration.
|
||||
func (am *Alertmanager) SaveAndApplyDefaultConfig(ctx context.Context) error {
|
||||
var outerErr error
|
||||
@ -164,6 +164,7 @@ func (am *Alertmanager) SaveAndApplyDefaultConfig(ctx context.Context) error {
|
||||
Default: true,
|
||||
ConfigurationVersion: fmt.Sprintf("v%d", ngmodels.AlertConfigurationVersion),
|
||||
OrgID: am.orgID,
|
||||
LastApplied: time.Now().UTC().Unix(),
|
||||
}
|
||||
|
||||
cfg, err := Load([]byte(am.Settings.UnifiedAlerting.DefaultConfiguration))
|
||||
@ -173,10 +174,8 @@ func (am *Alertmanager) SaveAndApplyDefaultConfig(ctx context.Context) error {
|
||||
}
|
||||
|
||||
err = am.Store.SaveAlertmanagerConfigurationWithCallback(ctx, cmd, func() error {
|
||||
if err := am.applyConfig(cfg, []byte(am.Settings.UnifiedAlerting.DefaultConfiguration)); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
_, err := am.applyConfig(cfg, []byte(am.Settings.UnifiedAlerting.DefaultConfiguration))
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
outerErr = nil
|
||||
@ -201,13 +200,12 @@ func (am *Alertmanager) SaveAndApplyConfig(ctx context.Context, cfg *apimodels.P
|
||||
AlertmanagerConfiguration: string(rawConfig),
|
||||
ConfigurationVersion: fmt.Sprintf("v%d", ngmodels.AlertConfigurationVersion),
|
||||
OrgID: am.orgID,
|
||||
LastApplied: time.Now().UTC().Unix(),
|
||||
}
|
||||
|
||||
err = am.Store.SaveAlertmanagerConfigurationWithCallback(ctx, cmd, func() error {
|
||||
if err := am.applyConfig(cfg, rawConfig); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
_, err := am.applyConfig(cfg, rawConfig)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
outerErr = err
|
||||
@ -219,7 +217,7 @@ func (am *Alertmanager) SaveAndApplyConfig(ctx context.Context, cfg *apimodels.P
|
||||
}
|
||||
|
||||
// ApplyConfig applies the configuration to the Alertmanager.
|
||||
func (am *Alertmanager) ApplyConfig(dbCfg *ngmodels.AlertConfiguration) error {
|
||||
func (am *Alertmanager) ApplyConfig(ctx context.Context, dbCfg *ngmodels.AlertConfiguration) error {
|
||||
var err error
|
||||
cfg, err := Load([]byte(dbCfg.AlertmanagerConfiguration))
|
||||
if err != nil {
|
||||
@ -228,7 +226,7 @@ func (am *Alertmanager) ApplyConfig(dbCfg *ngmodels.AlertConfiguration) error {
|
||||
|
||||
var outerErr error
|
||||
am.Base.WithLock(func() {
|
||||
if err = am.applyConfig(cfg, nil); err != nil {
|
||||
if err := am.applyAndMarkConfig(ctx, dbCfg.ConfigurationHash, cfg, nil); err != nil {
|
||||
outerErr = fmt.Errorf("unable to apply configuration: %w", err)
|
||||
return
|
||||
}
|
||||
@ -238,21 +236,22 @@ func (am *Alertmanager) ApplyConfig(dbCfg *ngmodels.AlertConfiguration) error {
|
||||
}
|
||||
|
||||
// applyConfig applies a new configuration by re-initializing all components using the configuration provided.
|
||||
// It returns a boolean indicating whether the user config was changed and an error.
|
||||
// It is not safe to call concurrently.
|
||||
func (am *Alertmanager) applyConfig(cfg *apimodels.PostableUserConfig, rawConfig []byte) (err error) {
|
||||
func (am *Alertmanager) applyConfig(cfg *apimodels.PostableUserConfig, rawConfig []byte) (bool, error) {
|
||||
// First, let's make sure this config is not already loaded
|
||||
var configChanged bool
|
||||
var amConfigChanged bool
|
||||
if rawConfig == nil {
|
||||
enc, err := json.Marshal(cfg.AlertmanagerConfig)
|
||||
if err != nil {
|
||||
// In theory, this should never happen.
|
||||
return err
|
||||
return false, err
|
||||
}
|
||||
rawConfig = enc
|
||||
}
|
||||
|
||||
if am.Base.ConfigHash() != md5.Sum(rawConfig) {
|
||||
configChanged = true
|
||||
amConfigChanged = true
|
||||
}
|
||||
|
||||
if cfg.TemplateFiles == nil {
|
||||
@ -263,19 +262,19 @@ func (am *Alertmanager) applyConfig(cfg *apimodels.PostableUserConfig, rawConfig
|
||||
// next, we need to make sure we persist the templates to disk.
|
||||
paths, templatesChanged, err := PersistTemplates(cfg, am.WorkingDirPath())
|
||||
if err != nil {
|
||||
return err
|
||||
return false, err
|
||||
}
|
||||
|
||||
// If neither the configuration nor templates have changed, we've got nothing to do.
|
||||
if !configChanged && !templatesChanged {
|
||||
if !amConfigChanged && !templatesChanged {
|
||||
am.logger.Debug("neither config nor template have changed, skipping configuration sync.")
|
||||
return nil
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// With the templates persisted, create the template list using the paths.
|
||||
tmpl, err := am.Base.TemplateFromPaths(am.Settings.AppURL, paths...)
|
||||
if err != nil {
|
||||
return err
|
||||
return false, err
|
||||
}
|
||||
|
||||
err = am.Base.ApplyConfig(AlertingConfiguration{
|
||||
@ -285,10 +284,28 @@ func (am *Alertmanager) applyConfig(cfg *apimodels.PostableUserConfig, rawConfig
|
||||
IntegrationsFunc: am.buildIntegrationsMap,
|
||||
ReceiverIntegrationsFunc: am.buildReceiverIntegration,
|
||||
})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// applyAndMarkConfig applies a configuration and marks it as applied if no errors occur.
|
||||
func (am *Alertmanager) applyAndMarkConfig(ctx context.Context, hash string, cfg *apimodels.PostableUserConfig, rawConfig []byte) error {
|
||||
configChanged, err := am.applyConfig(cfg, rawConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if configChanged {
|
||||
markConfigCmd := ngmodels.MarkConfigurationAsAppliedCmd{
|
||||
OrgID: am.orgID,
|
||||
ConfigurationHash: hash,
|
||||
}
|
||||
return am.Store.MarkConfigurationAsApplied(ctx, &markConfigCmd)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -205,7 +205,7 @@ func (moa *MultiOrgAlertmanager) SyncAlertmanagersForOrgs(ctx context.Context, o
|
||||
continue
|
||||
}
|
||||
|
||||
err := alertmanager.ApplyConfig(dbConfig)
|
||||
err := alertmanager.ApplyConfig(ctx, dbConfig)
|
||||
if err != nil {
|
||||
moa.logger.Error("failed to apply Alertmanager config for org", "org", orgID, "id", dbConfig.ID, "error", err)
|
||||
continue
|
||||
|
@ -24,9 +24,7 @@ import (
|
||||
)
|
||||
|
||||
func TestMultiOrgAlertmanager_SyncAlertmanagersForOrgs(t *testing.T) {
|
||||
configStore := &FakeConfigStore{
|
||||
configs: map[int64]*models.AlertConfiguration{},
|
||||
}
|
||||
configStore := NewFakeConfigStore(t, map[int64]*models.AlertConfiguration{})
|
||||
orgStore := &FakeOrgStore{
|
||||
orgs: []int64{1, 2, 3},
|
||||
}
|
||||
@ -62,6 +60,13 @@ grafana_alerting_active_configurations 3
|
||||
# TYPE grafana_alerting_discovered_configurations gauge
|
||||
grafana_alerting_discovered_configurations 3
|
||||
`), "grafana_alerting_discovered_configurations", "grafana_alerting_active_configurations"))
|
||||
|
||||
// Configurations should be marked as successfully applied.
|
||||
for _, org := range orgStore.orgs {
|
||||
configs, ok := configStore.appliedConfigs[org]
|
||||
require.True(t, ok)
|
||||
require.Len(t, configs, 1)
|
||||
}
|
||||
}
|
||||
// When an org is removed, it should detect it.
|
||||
{
|
||||
@ -148,13 +153,14 @@ grafana_alerting_discovered_configurations 4
|
||||
|
||||
func TestMultiOrgAlertmanager_SyncAlertmanagersForOrgsWithFailures(t *testing.T) {
|
||||
// Include a broken configuration for organization 2.
|
||||
configStore := &FakeConfigStore{
|
||||
configs: map[int64]*models.AlertConfiguration{
|
||||
2: {AlertmanagerConfiguration: brokenConfig, OrgID: 2},
|
||||
},
|
||||
}
|
||||
var orgWithBadConfig int64 = 2
|
||||
configStore := NewFakeConfigStore(t, map[int64]*models.AlertConfiguration{
|
||||
2: {AlertmanagerConfiguration: brokenConfig, OrgID: orgWithBadConfig},
|
||||
})
|
||||
|
||||
orgs := []int64{1, 2, 3}
|
||||
orgStore := &FakeOrgStore{
|
||||
orgs: []int64{1, 2, 3},
|
||||
orgs: orgs,
|
||||
}
|
||||
|
||||
tmpDir := t.TempDir()
|
||||
@ -175,6 +181,14 @@ func TestMultiOrgAlertmanager_SyncAlertmanagersForOrgsWithFailures(t *testing.T)
|
||||
require.NoError(t, err)
|
||||
ctx := context.Background()
|
||||
|
||||
// No successfully applied configurations should be found at first.
|
||||
{
|
||||
for _, org := range orgs {
|
||||
_, ok := configStore.appliedConfigs[org]
|
||||
require.False(t, ok)
|
||||
}
|
||||
}
|
||||
|
||||
// When you sync the first time, the alertmanager is created but is doesn't become ready until you have a configuration applied.
|
||||
{
|
||||
require.NoError(t, mam.LoadAndSyncAlertmanagersForOrgs(ctx))
|
||||
@ -182,6 +196,17 @@ func TestMultiOrgAlertmanager_SyncAlertmanagersForOrgsWithFailures(t *testing.T)
|
||||
require.True(t, mam.alertmanagers[1].Ready())
|
||||
require.False(t, mam.alertmanagers[2].Ready())
|
||||
require.True(t, mam.alertmanagers[3].Ready())
|
||||
|
||||
// Configurations should be marked as successfully applied for all orgs except for org 2.
|
||||
for _, org := range orgs {
|
||||
configs, ok := configStore.appliedConfigs[org]
|
||||
if org == orgWithBadConfig {
|
||||
require.False(t, ok)
|
||||
} else {
|
||||
require.True(t, ok)
|
||||
require.Len(t, configs, 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// On the next sync, it never panics and alertmanager is still not ready.
|
||||
@ -191,6 +216,17 @@ func TestMultiOrgAlertmanager_SyncAlertmanagersForOrgsWithFailures(t *testing.T)
|
||||
require.True(t, mam.alertmanagers[1].Ready())
|
||||
require.False(t, mam.alertmanagers[2].Ready())
|
||||
require.True(t, mam.alertmanagers[3].Ready())
|
||||
|
||||
// The configuration should still be marked as successfully applied for all orgs except for org 2.
|
||||
for _, org := range orgs {
|
||||
configs, ok := configStore.appliedConfigs[org]
|
||||
if org == orgWithBadConfig {
|
||||
require.False(t, ok)
|
||||
} else {
|
||||
require.True(t, ok)
|
||||
require.Len(t, configs, 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we fix the configuration, it becomes ready.
|
||||
@ -201,13 +237,18 @@ func TestMultiOrgAlertmanager_SyncAlertmanagersForOrgsWithFailures(t *testing.T)
|
||||
require.True(t, mam.alertmanagers[1].Ready())
|
||||
require.True(t, mam.alertmanagers[2].Ready())
|
||||
require.True(t, mam.alertmanagers[3].Ready())
|
||||
|
||||
// All configurations should be marked as successfully applied.
|
||||
for _, org := range orgs {
|
||||
configs, ok := configStore.appliedConfigs[org]
|
||||
require.True(t, ok)
|
||||
require.Len(t, configs, 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMultiOrgAlertmanager_AlertmanagerFor(t *testing.T) {
|
||||
configStore := &FakeConfigStore{
|
||||
configs: map[int64]*models.AlertConfiguration{},
|
||||
}
|
||||
configStore := NewFakeConfigStore(t, map[int64]*models.AlertConfiguration{})
|
||||
orgStore := &FakeOrgStore{
|
||||
orgs: []int64{1, 2, 3},
|
||||
}
|
||||
|
@ -14,32 +14,36 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/store"
|
||||
)
|
||||
|
||||
type FakeConfigStore struct {
|
||||
type fakeConfigStore struct {
|
||||
configs map[int64]*models.AlertConfiguration
|
||||
|
||||
// appliedConfigs stores configs by orgID and config hash.
|
||||
appliedConfigs map[int64]map[string]*models.AlertConfiguration
|
||||
}
|
||||
|
||||
// Saves the image or returns an error.
|
||||
func (f *FakeConfigStore) SaveImage(ctx context.Context, img *models.Image) error {
|
||||
func (f *fakeConfigStore) SaveImage(ctx context.Context, img *models.Image) error {
|
||||
return models.ErrImageNotFound
|
||||
}
|
||||
|
||||
func (f *FakeConfigStore) GetImage(ctx context.Context, token string) (*models.Image, error) {
|
||||
func (f *fakeConfigStore) GetImage(ctx context.Context, token string) (*models.Image, error) {
|
||||
return nil, models.ErrImageNotFound
|
||||
}
|
||||
|
||||
func (f *FakeConfigStore) GetImages(ctx context.Context, tokens []string) ([]models.Image, []string, error) {
|
||||
func (f *fakeConfigStore) GetImages(ctx context.Context, tokens []string) ([]models.Image, []string, error) {
|
||||
return nil, nil, models.ErrImageNotFound
|
||||
}
|
||||
|
||||
func NewFakeConfigStore(t *testing.T, configs map[int64]*models.AlertConfiguration) FakeConfigStore {
|
||||
func NewFakeConfigStore(t *testing.T, configs map[int64]*models.AlertConfiguration) *fakeConfigStore {
|
||||
t.Helper()
|
||||
|
||||
return FakeConfigStore{
|
||||
configs: configs,
|
||||
return &fakeConfigStore{
|
||||
configs: configs,
|
||||
appliedConfigs: make(map[int64]map[string]*models.AlertConfiguration),
|
||||
}
|
||||
}
|
||||
|
||||
func (f *FakeConfigStore) GetAllLatestAlertmanagerConfiguration(context.Context) ([]*models.AlertConfiguration, error) {
|
||||
func (f *fakeConfigStore) GetAllLatestAlertmanagerConfiguration(context.Context) ([]*models.AlertConfiguration, error) {
|
||||
result := make([]*models.AlertConfiguration, 0, len(f.configs))
|
||||
for _, configuration := range f.configs {
|
||||
result = append(result, configuration)
|
||||
@ -47,7 +51,7 @@ func (f *FakeConfigStore) GetAllLatestAlertmanagerConfiguration(context.Context)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (f *FakeConfigStore) GetLatestAlertmanagerConfiguration(_ context.Context, query *models.GetLatestAlertmanagerConfigurationQuery) error {
|
||||
func (f *fakeConfigStore) GetLatestAlertmanagerConfiguration(_ context.Context, query *models.GetLatestAlertmanagerConfigurationQuery) error {
|
||||
var ok bool
|
||||
query.Result, ok = f.configs[query.OrgID]
|
||||
if !ok {
|
||||
@ -57,33 +61,35 @@ func (f *FakeConfigStore) GetLatestAlertmanagerConfiguration(_ context.Context,
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *FakeConfigStore) SaveAlertmanagerConfiguration(_ context.Context, cmd *models.SaveAlertmanagerConfigurationCmd) error {
|
||||
f.configs[cmd.OrgID] = &models.AlertConfiguration{
|
||||
AlertmanagerConfiguration: cmd.AlertmanagerConfiguration,
|
||||
OrgID: cmd.OrgID,
|
||||
ConfigurationVersion: "v1",
|
||||
Default: cmd.Default,
|
||||
}
|
||||
|
||||
return nil
|
||||
func (f *fakeConfigStore) SaveAlertmanagerConfiguration(ctx context.Context, cmd *models.SaveAlertmanagerConfigurationCmd) error {
|
||||
return f.SaveAlertmanagerConfigurationWithCallback(ctx, cmd, func() error { return nil })
|
||||
}
|
||||
|
||||
func (f *FakeConfigStore) SaveAlertmanagerConfigurationWithCallback(_ context.Context, cmd *models.SaveAlertmanagerConfigurationCmd, callback store.SaveCallback) error {
|
||||
f.configs[cmd.OrgID] = &models.AlertConfiguration{
|
||||
func (f *fakeConfigStore) SaveAlertmanagerConfigurationWithCallback(_ context.Context, cmd *models.SaveAlertmanagerConfigurationCmd, callback store.SaveCallback) error {
|
||||
cfg := models.AlertConfiguration{
|
||||
AlertmanagerConfiguration: cmd.AlertmanagerConfiguration,
|
||||
ConfigurationHash: fmt.Sprintf("%x", md5.Sum([]byte(cmd.AlertmanagerConfiguration))),
|
||||
OrgID: cmd.OrgID,
|
||||
ConfigurationVersion: "v1",
|
||||
Default: cmd.Default,
|
||||
}
|
||||
f.configs[cmd.OrgID] = &cfg
|
||||
|
||||
if err := callback(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if cmd.LastApplied != 0 {
|
||||
if _, ok := f.appliedConfigs[cmd.OrgID]; !ok {
|
||||
f.appliedConfigs[cmd.OrgID] = make(map[string]*models.AlertConfiguration)
|
||||
}
|
||||
f.appliedConfigs[cmd.OrgID][cfg.ConfigurationHash] = &cfg
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *FakeConfigStore) UpdateAlertmanagerConfiguration(_ context.Context, cmd *models.SaveAlertmanagerConfigurationCmd) error {
|
||||
func (f *fakeConfigStore) UpdateAlertmanagerConfiguration(_ context.Context, cmd *models.SaveAlertmanagerConfigurationCmd) error {
|
||||
if config, exists := f.configs[cmd.OrgID]; exists && config.ConfigurationHash == cmd.FetchedConfigurationHash {
|
||||
f.configs[cmd.OrgID] = &models.AlertConfiguration{
|
||||
AlertmanagerConfiguration: cmd.AlertmanagerConfiguration,
|
||||
@ -97,6 +103,20 @@ func (f *FakeConfigStore) UpdateAlertmanagerConfiguration(_ context.Context, cmd
|
||||
return errors.New("config not found or hash not valid")
|
||||
}
|
||||
|
||||
func (f *fakeConfigStore) MarkConfigurationAsApplied(_ context.Context, cmd *models.MarkConfigurationAsAppliedCmd) error {
|
||||
for _, config := range f.configs {
|
||||
if config.ConfigurationHash == cmd.ConfigurationHash && config.OrgID == cmd.OrgID {
|
||||
if _, ok := f.appliedConfigs[cmd.OrgID]; !ok {
|
||||
f.appliedConfigs[cmd.OrgID] = make(map[string]*models.AlertConfiguration)
|
||||
}
|
||||
f.appliedConfigs[cmd.OrgID][cmd.ConfigurationHash] = config
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return errors.New("config not found")
|
||||
}
|
||||
|
||||
type FakeOrgStore struct {
|
||||
orgs []int64
|
||||
}
|
||||
|
@ -410,7 +410,7 @@ func createMultiOrgAlertmanager(t *testing.T, orgs []int64) *notifier.MultiOrgAl
|
||||
m := metrics.NewNGAlert(registry)
|
||||
secretsService := secretsManager.SetupTestService(t, fake_secrets.NewFakeSecretsStore())
|
||||
decryptFn := secretsService.GetDecryptedValue
|
||||
moa, err := notifier.NewMultiOrgAlertmanager(cfg, &cfgStore, &orgStore, kvStore, provisioning.NewFakeProvisioningStore(), decryptFn, m.GetMultiOrgAlertmanagerMetrics(), nil, log.New("testlogger"), secretsService)
|
||||
moa, err := notifier.NewMultiOrgAlertmanager(cfg, cfgStore, &orgStore, kvStore, provisioning.NewFakeProvisioningStore(), decryptFn, m.GetMultiOrgAlertmanagerMetrics(), nil, log.New("testlogger"), secretsService)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, moa.LoadAndSyncAlertmanagersForOrgs(context.Background()))
|
||||
require.Eventually(t, func() bool {
|
||||
|
@ -76,6 +76,7 @@ func (st DBstore) SaveAlertmanagerConfigurationWithCallback(ctx context.Context,
|
||||
OrgID: cmd.OrgID,
|
||||
CreatedAt: time.Now().Unix(),
|
||||
}
|
||||
|
||||
// TODO: If we are more structured around how we seed configurations in the future, this can be a pure update instead of upsert. This should improve perf and code clarity.
|
||||
upsertSQL := st.SQLStore.GetDialect().UpsertSQL(
|
||||
"alert_configuration",
|
||||
@ -87,13 +88,16 @@ func (st DBstore) SaveAlertmanagerConfigurationWithCallback(ctx context.Context,
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := sess.Table("alert_configuration_history").Insert(config); err != nil {
|
||||
historicConfig := models.HistoricConfigFromAlertConfig(config)
|
||||
historicConfig.LastApplied = cmd.LastApplied
|
||||
if _, err := sess.Table("alert_configuration_history").Insert(historicConfig); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := st.deleteOldConfigurations(ctx, cmd.OrgID, ConfigRecordsLimit); err != nil {
|
||||
st.Logger.Warn("failed to delete old am configs", "org", cmd.OrgID, "error", err)
|
||||
}
|
||||
|
||||
if err := callback(); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -122,7 +126,9 @@ func (st *DBstore) UpdateAlertmanagerConfiguration(ctx context.Context, cmd *mod
|
||||
if rows == 0 {
|
||||
return ErrVersionLockedObjectNotFound
|
||||
}
|
||||
if _, err := sess.Table("alert_configuration_history").Insert(config); err != nil {
|
||||
|
||||
historicConfig := models.HistoricConfigFromAlertConfig(config)
|
||||
if _, err := sess.Table("alert_configuration_history").Insert(historicConfig); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := st.deleteOldConfigurations(ctx, cmd.OrgID, ConfigRecordsLimit); err != nil {
|
||||
@ -132,6 +138,47 @@ func (st *DBstore) UpdateAlertmanagerConfiguration(ctx context.Context, cmd *mod
|
||||
})
|
||||
}
|
||||
|
||||
// MarkConfigurationAsApplied sets the `last_applied` field of the last config with the given hash to the current UNIX timestamp.
|
||||
func (st *DBstore) MarkConfigurationAsApplied(ctx context.Context, cmd *models.MarkConfigurationAsAppliedCmd) error {
|
||||
return st.SQLStore.WithTransactionalDbSession(ctx, func(sess *db.Session) error {
|
||||
update := map[string]interface{}{"last_applied": time.Now().UTC().Unix()}
|
||||
rowsAffected, err := sess.Table("alert_configuration_history").
|
||||
Desc("id").
|
||||
Limit(1).
|
||||
Where("org_id = ? AND configuration_hash = ?", cmd.OrgID, cmd.ConfigurationHash).
|
||||
Cols("last_applied").
|
||||
Update(&update)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if rowsAffected != 1 {
|
||||
st.Logger.Error("update statement affected %d alert configuration history records", rowsAffected)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// GetAppliedConfigurations returns all configurations that have been marked as applied, ordered newest -> oldest by id.
|
||||
func (st *DBstore) GetAppliedConfigurations(ctx context.Context, query *models.GetAppliedConfigurationsQuery) error {
|
||||
return st.SQLStore.WithDbSession(ctx, func(sess *db.Session) error {
|
||||
cfgs := []*models.HistoricAlertConfiguration{}
|
||||
err := sess.Table("alert_configuration_history").
|
||||
Desc("id").
|
||||
Where("org_id = ? AND last_applied != 0", query.OrgID).
|
||||
Find(&cfgs)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
query.Result = cfgs
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (st *DBstore) deleteOldConfigurations(ctx context.Context, orgID int64, limit int) (int64, error) {
|
||||
if limit < 1 {
|
||||
return 0, fmt.Errorf("failed to delete old configurations: limit is set to '%d' but needs to be > 0", limit)
|
||||
@ -143,7 +190,7 @@ func (st *DBstore) deleteOldConfigurations(ctx context.Context, orgID int64, lim
|
||||
|
||||
var affectedRows int64
|
||||
err := st.SQLStore.WithDbSession(ctx, func(sess *db.Session) error {
|
||||
highest := &models.AlertConfiguration{}
|
||||
highest := &models.HistoricAlertConfiguration{}
|
||||
ok, err := sess.Table("alert_configuration_history").Desc("id").Where("org_id = ?", orgID).OrderBy("id").Limit(1, limit-1).Get(highest)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -14,7 +14,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
)
|
||||
|
||||
func TestIntegrationAlertManagerStore(t *testing.T) {
|
||||
func TestIntegrationAlertmanagerStore(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping integration test")
|
||||
}
|
||||
@ -123,7 +123,7 @@ func TestIntegrationAlertManagerStore(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestIntegrationAlertManagerHash(t *testing.T) {
|
||||
func TestIntegrationAlertmanagerHash(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping integration test")
|
||||
}
|
||||
@ -176,7 +176,7 @@ func TestIntegrationAlertManagerHash(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestIntegrationAlertManagerConfigCleanup(t *testing.T) {
|
||||
func TestIntegrationAlertmanagerConfigCleanup(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping integration test")
|
||||
}
|
||||
@ -185,7 +185,7 @@ func TestIntegrationAlertManagerConfigCleanup(t *testing.T) {
|
||||
SQLStore: sqlStore,
|
||||
Logger: log.NewNopLogger(),
|
||||
}
|
||||
t.Run("when calling the cleanup with less records than the limit all recrods should stay", func(t *testing.T) {
|
||||
t.Run("when calling the cleanup with fewer records than the limit all records should stay", func(t *testing.T) {
|
||||
var orgID int64 = 3
|
||||
oldestConfig, _ := setupConfig(t, "oldest-record", store)
|
||||
err := store.SaveAlertmanagerConfiguration(context.Background(), &models.SaveAlertmanagerConfigurationCmd{
|
||||
@ -214,8 +214,8 @@ func TestIntegrationAlertManagerConfigCleanup(t *testing.T) {
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
rowsAffacted, err := store.deleteOldConfigurations(context.Background(), orgID, 100)
|
||||
require.Equal(t, int64(0), rowsAffacted)
|
||||
rowsAffected, err := store.deleteOldConfigurations(context.Background(), orgID, 100)
|
||||
require.Equal(t, int64(0), rowsAffected)
|
||||
require.NoError(t, err)
|
||||
|
||||
req := &models.GetLatestAlertmanagerConfigurationQuery{
|
||||
@ -275,6 +275,69 @@ func TestIntegrationAlertManagerConfigCleanup(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestIntegrationMarkConfigurationAsApplied(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping integration test")
|
||||
}
|
||||
sqlStore := db.InitTestDB(t)
|
||||
store := &DBstore{
|
||||
SQLStore: sqlStore,
|
||||
Logger: log.NewNopLogger(),
|
||||
}
|
||||
|
||||
t.Run("attempting to mark a non existent config as applied shouldn't fail", func(tt *testing.T) {
|
||||
cmd := models.MarkConfigurationAsAppliedCmd{
|
||||
OrgID: 10,
|
||||
ConfigurationHash: "test",
|
||||
}
|
||||
err := store.MarkConfigurationAsApplied(context.Background(), &cmd)
|
||||
require.NoError(tt, err)
|
||||
})
|
||||
|
||||
t.Run("marking an existent config should succeed", func(tt *testing.T) {
|
||||
const orgID = 1
|
||||
ctx := context.Background()
|
||||
|
||||
config, _ := setupConfig(t, "test", store)
|
||||
err := store.SaveAlertmanagerConfiguration(ctx, &models.SaveAlertmanagerConfigurationCmd{
|
||||
AlertmanagerConfiguration: config,
|
||||
ConfigurationVersion: "v1",
|
||||
Default: false,
|
||||
OrgID: orgID,
|
||||
})
|
||||
require.NoError(tt, err)
|
||||
|
||||
// Config should be saved but not marked as applied yet.
|
||||
appliedCfgsQuery := models.GetAppliedConfigurationsQuery{
|
||||
OrgID: orgID,
|
||||
}
|
||||
err = store.GetAppliedConfigurations(ctx, &appliedCfgsQuery)
|
||||
require.NoError(tt, err)
|
||||
require.Len(tt, appliedCfgsQuery.Result, 0)
|
||||
|
||||
query := models.GetLatestAlertmanagerConfigurationQuery{
|
||||
OrgID: orgID,
|
||||
}
|
||||
err = store.GetLatestAlertmanagerConfiguration(ctx, &query)
|
||||
require.NoError(tt, err)
|
||||
|
||||
cmd := models.MarkConfigurationAsAppliedCmd{
|
||||
OrgID: orgID,
|
||||
ConfigurationHash: query.Result.ConfigurationHash,
|
||||
}
|
||||
err = store.MarkConfigurationAsApplied(ctx, &cmd)
|
||||
require.NoError(tt, err)
|
||||
|
||||
// Config should now be saved and marked as successfully applied.
|
||||
appliedCfgsQuery = models.GetAppliedConfigurationsQuery{
|
||||
OrgID: orgID,
|
||||
}
|
||||
err = store.GetAppliedConfigurations(ctx, &appliedCfgsQuery)
|
||||
require.NoError(tt, err)
|
||||
require.Len(tt, appliedCfgsQuery.Result, 1)
|
||||
})
|
||||
}
|
||||
|
||||
func setupConfig(t *testing.T, config string, store *DBstore) (string, string) {
|
||||
t.Helper()
|
||||
return setupConfigInOrg(t, config, 1, store)
|
||||
|
@ -27,6 +27,7 @@ type AlertingStore interface {
|
||||
SaveAlertmanagerConfiguration(ctx context.Context, cmd *models.SaveAlertmanagerConfigurationCmd) error
|
||||
SaveAlertmanagerConfigurationWithCallback(ctx context.Context, cmd *models.SaveAlertmanagerConfigurationCmd, callback SaveCallback) error
|
||||
UpdateAlertmanagerConfiguration(ctx context.Context, cmd *models.SaveAlertmanagerConfigurationCmd) error
|
||||
MarkConfigurationAsApplied(ctx context.Context, cmd *models.MarkConfigurationAsAppliedCmd) error
|
||||
}
|
||||
|
||||
// DBstore stores the alert definitions and instances in the database.
|
||||
|
@ -20,6 +20,10 @@ func AddTablesMigrations(mg *migrator.Migrator) {
|
||||
// should come in the form of a new migration appended to the end of AddTablesMigrations
|
||||
// instead of modifying an existing one. This ensure that tables are modified in a consistent and correct order.
|
||||
historicalTableMigrations(mg)
|
||||
|
||||
mg.AddMigration("add last_applied column to alert_configuration_history", migrator.NewAddColumnMigration(migrator.Table{Name: "alert_configuration_history"}, &migrator.Column{
|
||||
Name: "last_applied", Type: migrator.DB_Int, Nullable: false,
|
||||
}))
|
||||
}
|
||||
|
||||
// historicalTableMigrations contains those migrations that existed prior to creating the improved messaging around migration immutability.
|
||||
|
Loading…
Reference in New Issue
Block a user