Alerting: Optimization of fetching data in multiorg alertmanager (#39237)

* Add method GetAllLatestAlertmanagerConfiguration to DBStore
* add method ApplyConfig to AlertManager
* update multiorg alert manager to load all alertmanager configs at once
This commit is contained in:
Yuriy Tseretyan
2021-09-21 11:01:23 -04:00
committed by GitHub
parent 428f32bcf9
commit 1910d85ae0
6 changed files with 85 additions and 49 deletions

View File

@@ -306,44 +306,20 @@ func (am *Alertmanager) SaveAndApplyConfig(cfg *apimodels.PostableUserConfig) er
return nil
}
// SyncAndApplyConfigFromDatabase picks the latest config from database and restarts
// the components with the new config.
func (am *Alertmanager) SyncAndApplyConfigFromDatabase() error {
// ApplyConfig applies the configuration to the Alertmanager.
func (am *Alertmanager) ApplyConfig(dbCfg *ngmodels.AlertConfiguration) error {
var err error
cfg, err := Load([]byte(dbCfg.AlertmanagerConfiguration))
if err != nil {
return fmt.Errorf("failed to parse Alertmanager config: %w", err)
}
am.reloadConfigMtx.Lock()
defer am.reloadConfigMtx.Unlock()
// First, let's get the configuration we need from the database.
q := &ngmodels.GetLatestAlertmanagerConfigurationQuery{OrgID: am.orgID}
if err := am.Store.GetLatestAlertmanagerConfiguration(q); err != nil {
// If there's no configuration in the database, let's use the default configuration.
if errors.Is(err, store.ErrNoAlertmanagerConfiguration) {
// First, let's save it to the database. We don't need to use a transaction here as we'll always succeed.
am.logger.Info("no Alertmanager configuration found, saving and applying a default")
savecmd := &ngmodels.SaveAlertmanagerConfigurationCmd{
AlertmanagerConfiguration: alertmanagerDefaultConfiguration,
Default: true,
ConfigurationVersion: fmt.Sprintf("v%d", ngmodels.AlertConfigurationVersion),
OrgID: am.orgID,
}
if err := am.Store.SaveAlertmanagerConfiguration(savecmd); err != nil {
return err
}
q.Result = &ngmodels.AlertConfiguration{AlertmanagerConfiguration: alertmanagerDefaultConfiguration, Default: true}
} else {
return fmt.Errorf("unable to get Alertmanager configuration from the database: %w", err)
}
if err = am.applyConfig(cfg, nil); err != nil {
return fmt.Errorf("unable to apply configuration: %w", err)
}
cfg, err := Load([]byte(q.Result.AlertmanagerConfiguration))
if err != nil {
return err
}
if err := am.applyConfig(cfg, nil); err != nil {
return fmt.Errorf("unable to reload configuration: %w", err)
}
return nil
}

View File

@@ -53,12 +53,6 @@ func setupAMTest(t *testing.T) *Alertmanager {
return am
}
func TestAlertmanager_ShouldUseDefaultConfigurationWhenNoConfiguration(t *testing.T) {
am := setupAMTest(t)
require.NoError(t, am.SyncAndApplyConfigFromDatabase())
require.NotNil(t, am.config)
}
func TestPutAlert(t *testing.T) {
am := setupAMTest(t)

View File

@@ -15,6 +15,7 @@ import (
"github.com/grafana/grafana/pkg/infra/kvstore"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/store"
"github.com/grafana/grafana/pkg/setting"
)
@@ -116,20 +117,41 @@ func (moa *MultiOrgAlertmanager) LoadAndSyncAlertmanagersForOrgs(ctx context.Con
// Then, sync them by creating or deleting Alertmanagers as necessary.
moa.metrics.DiscoveredConfigurations.Set(float64(len(orgIDs)))
moa.SyncAlertmanagersForOrgs(orgIDs)
moa.SyncAlertmanagersForOrgs(ctx, orgIDs)
moa.logger.Debug("done synchronizing Alertmanagers for orgs")
return nil
}
func (moa *MultiOrgAlertmanager) SyncAlertmanagersForOrgs(orgIDs []int64) {
// getLatestConfigs retrieves the latest Alertmanager configuration for every organization. It returns a map where the key is the ID of each organization and the value is the configuration.
func (moa *MultiOrgAlertmanager) getLatestConfigs(ctx context.Context) (map[int64]*models.AlertConfiguration, error) {
configs, err := moa.configStore.GetAllLatestAlertmanagerConfiguration(ctx)
if err != nil {
return nil, err
}
result := make(map[int64]*models.AlertConfiguration, len(configs))
for _, config := range configs {
result[config.OrgID] = config
}
return result, nil
}
// SyncAlertmanagersForOrgs syncs configuration of the Alertmanager required by each organization.
func (moa *MultiOrgAlertmanager) SyncAlertmanagersForOrgs(ctx context.Context, orgIDs []int64) {
orgsFound := make(map[int64]struct{}, len(orgIDs))
dbConfigs, err := moa.getLatestConfigs(ctx)
if err != nil {
moa.logger.Error("failed to load Alertmanager configurations", "err", err)
return
}
moa.alertmanagersMtx.Lock()
for _, orgID := range orgIDs {
orgsFound[orgID] = struct{}{}
existing, found := moa.alertmanagers[orgID]
alertmanager, found := moa.alertmanagers[orgID]
if !found {
// These metrics are not exported by Grafana and are mostly a placeholder.
// To export them, we need to translate the metrics from each individual registry and,
@@ -139,14 +161,30 @@ func (moa *MultiOrgAlertmanager) SyncAlertmanagersForOrgs(orgIDs []int64) {
if err != nil {
moa.logger.Error("unable to create Alertmanager for org", "org", orgID, "err", err)
}
moa.alertmanagers[orgID] = am
existing = am
alertmanager = am
}
//TODO: This will create an N+1 query
if err := existing.SyncAndApplyConfigFromDatabase(); err != nil {
moa.logger.Error("failed to apply Alertmanager config for org", "org", orgID, "err", err)
dbConfig, cfgFound := dbConfigs[orgID]
if !cfgFound {
if found {
// This means that the configuration is gone but the organization, as well as the Alertmanager, exists.
moa.logger.Warn("Alertmanager exists for org but the configuration is gone. Applying the default configuration", "org", orgID)
}
err := alertmanager.SaveAndApplyDefaultConfig()
if err != nil {
moa.logger.Error("failed to apply the default Alertmanager configuration", "org", orgID)
continue
}
moa.alertmanagers[orgID] = alertmanager
continue
}
err := alertmanager.ApplyConfig(dbConfig)
if err != nil {
moa.logger.Error("failed to apply Alertmanager config for org", "org", orgID, "id", dbConfig.ID, "err", err)
continue
}
moa.alertmanagers[orgID] = alertmanager
}
amsToStop := map[int64]*Alertmanager{}

View File

@@ -13,6 +13,14 @@ type FakeConfigStore struct {
configs map[int64]*models.AlertConfiguration
}
func (f *FakeConfigStore) GetAllLatestAlertmanagerConfiguration(context.Context) ([]*models.AlertConfiguration, error) {
result := make([]*models.AlertConfiguration, 0, len(f.configs))
for _, configuration := range f.configs {
result = append(result, configuration)
}
return result, nil
}
func (f *FakeConfigStore) GetLatestAlertmanagerConfiguration(query *models.GetLatestAlertmanagerConfigurationQuery) error {
var ok bool
query.Result, ok = f.configs[query.OrgID]

View File

@@ -4,6 +4,8 @@ import (
"context"
"fmt"
"xorm.io/builder"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/sqlstore"
)
@@ -33,6 +35,22 @@ func (st *DBstore) GetLatestAlertmanagerConfiguration(query *models.GetLatestAle
})
}
// GetAllLatestAlertmanagerConfiguration returns the latest configuration of every organization
func (st *DBstore) GetAllLatestAlertmanagerConfiguration(ctx context.Context) ([]*models.AlertConfiguration, error) {
var result []*models.AlertConfiguration
err := st.SQLStore.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
condition := builder.In("id", builder.Select("MAX(id)").From("alert_configuration").GroupBy("org_id"))
if err := sess.Table("alert_configuration").Where(condition).Find(&result); err != nil {
return err
}
return nil
})
if err != nil {
return nil, err
}
return result, nil
}
// SaveAlertmanagerConfiguration creates an alertmanager configuration.
func (st DBstore) SaveAlertmanagerConfiguration(cmd *models.SaveAlertmanagerConfigurationCmd) error {
return st.SaveAlertmanagerConfigurationWithCallback(cmd, func() error { return nil })
@@ -41,7 +59,7 @@ func (st DBstore) SaveAlertmanagerConfiguration(cmd *models.SaveAlertmanagerConf
type SaveCallback func() error
// SaveAlertmanagerConfigurationWithCallback creates an alertmanager configuration version and then executes a callback.
// If the callback results in error in rollsback the transaction.
// If the callback results in error it rolls back the transaction.
func (st DBstore) SaveAlertmanagerConfigurationWithCallback(cmd *models.SaveAlertmanagerConfigurationCmd, callback SaveCallback) error {
return st.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
config := models.AlertConfiguration{

View File

@@ -1,6 +1,7 @@
package store
import (
"context"
"time"
"github.com/grafana/grafana/pkg/infra/log"
@@ -17,6 +18,7 @@ const AlertDefinitionMaxTitleLength = 190
// AlertingStore is the database interface used by the Alertmanager service.
type AlertingStore interface {
GetLatestAlertmanagerConfiguration(*models.GetLatestAlertmanagerConfigurationQuery) error
GetAllLatestAlertmanagerConfiguration(ctx context.Context) ([]*models.AlertConfiguration, error)
SaveAlertmanagerConfiguration(*models.SaveAlertmanagerConfigurationCmd) error
SaveAlertmanagerConfigurationWithCallback(*models.SaveAlertmanagerConfigurationCmd, SaveCallback) error
}