extracts alertmanager from DI, including migrations (#34071)

* extracts alertmanager from DI, including migrations

* includes alertmanager Run method in ngalert

* removes 3s test shutdown timeout

* lint
This commit is contained in:
Owen Diehl 2021-05-13 14:01:38 -04:00 committed by GitHub
parent ec3214bac2
commit baca873a84
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 77 additions and 79 deletions

View File

@ -387,7 +387,7 @@ func (hs *HTTPServer) registerRoutes() {
}) })
apiRoute.Get("/alert-notifiers", reqEditorRole, routing.Wrap( apiRoute.Get("/alert-notifiers", reqEditorRole, routing.Wrap(
GetAlertNotifiers(hs.Alertmanager != nil && !hs.Alertmanager.IsDisabled())), GetAlertNotifiers(hs.Alertmanager != nil && hs.Cfg.IsNgAlertEnabled())),
) )
apiRoute.Group("/alert-notifications", func(alertNotifications routing.RouteRegister) { apiRoute.Group("/alert-notifications", func(alertNotifications routing.RouteRegister) {

View File

@ -5,6 +5,7 @@ import (
"time" "time"
"github.com/grafana/grafana/pkg/services/quota" "github.com/grafana/grafana/pkg/services/quota"
"golang.org/x/sync/errgroup"
"github.com/grafana/grafana/pkg/services/ngalert/metrics" "github.com/grafana/grafana/pkg/services/ngalert/metrics"
"github.com/grafana/grafana/pkg/services/ngalert/state" "github.com/grafana/grafana/pkg/services/ngalert/state"
@ -45,10 +46,10 @@ type AlertNG struct {
RouteRegister routing.RouteRegister `inject:""` RouteRegister routing.RouteRegister `inject:""`
SQLStore *sqlstore.SQLStore `inject:""` SQLStore *sqlstore.SQLStore `inject:""`
DataService *tsdb.Service `inject:""` DataService *tsdb.Service `inject:""`
Alertmanager *notifier.Alertmanager `inject:""`
DataProxy *datasourceproxy.DatasourceProxyService `inject:""` DataProxy *datasourceproxy.DatasourceProxyService `inject:""`
QuotaService *quota.QuotaService `inject:""` QuotaService *quota.QuotaService `inject:""`
Metrics *metrics.Metrics `inject:""` Metrics *metrics.Metrics `inject:""`
Alertmanager *notifier.Alertmanager
Log log.Logger Log log.Logger
schedule schedule.ScheduleService schedule schedule.ScheduleService
stateManager *state.Manager stateManager *state.Manager
@ -64,7 +65,13 @@ func (ng *AlertNG) Init() error {
ng.stateManager = state.NewManager(ng.Log, ng.Metrics) ng.stateManager = state.NewManager(ng.Log, ng.Metrics)
baseInterval := baseIntervalSeconds * time.Second baseInterval := baseIntervalSeconds * time.Second
store := store.DBstore{BaseInterval: baseInterval, DefaultIntervalSeconds: defaultIntervalSeconds, SQLStore: ng.SQLStore} store := &store.DBstore{BaseInterval: baseInterval, DefaultIntervalSeconds: defaultIntervalSeconds, SQLStore: ng.SQLStore}
var err error
ng.Alertmanager, err = notifier.New(ng.Cfg, store, ng.Metrics)
if err != nil {
return err
}
schedCfg := schedule.SchedulerCfg{ schedCfg := schedule.SchedulerCfg{
C: clock.New(), C: clock.New(),
@ -101,7 +108,15 @@ func (ng *AlertNG) Init() error {
func (ng *AlertNG) Run(ctx context.Context) error { func (ng *AlertNG) Run(ctx context.Context) error {
ng.Log.Debug("ngalert starting") ng.Log.Debug("ngalert starting")
ng.schedule.WarmStateCache(ng.stateManager) ng.schedule.WarmStateCache(ng.stateManager)
return ng.schedule.Ticker(ctx, ng.stateManager)
children, subCtx := errgroup.WithContext(ctx)
children.Go(func() error {
return ng.schedule.Ticker(subCtx, ng.stateManager)
})
children.Go(func() error {
return ng.Alertmanager.Run(subCtx)
})
return children.Wait()
} }
// IsDisabled returns true if the alerting service is disable for this instance. // IsDisabled returns true if the alerting service is disable for this instance.

View File

@ -28,14 +28,12 @@ import (
"github.com/grafana/grafana/pkg/components/securejsondata" "github.com/grafana/grafana/pkg/components/securejsondata"
"github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/registry"
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions" apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
"github.com/grafana/grafana/pkg/services/ngalert/metrics" "github.com/grafana/grafana/pkg/services/ngalert/metrics"
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models" ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/notifier/channels" "github.com/grafana/grafana/pkg/services/ngalert/notifier/channels"
"github.com/grafana/grafana/pkg/services/ngalert/store" "github.com/grafana/grafana/pkg/services/ngalert/store"
"github.com/grafana/grafana/pkg/services/sqlstore" "github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
"github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/setting"
) )
@ -106,41 +104,28 @@ type Alertmanager struct {
config []byte config []byte
} }
func init() { func New(cfg *setting.Cfg, store store.AlertingStore, m *metrics.Metrics) (*Alertmanager, error) {
registry.RegisterService(&Alertmanager{}) am := &Alertmanager{
} Settings: cfg,
stopc: make(chan struct{}),
func (am *Alertmanager) IsDisabled() bool { logger: log.New("alertmanager"),
if am.Settings == nil { marker: types.NewMarker(m.Registerer),
return true stageMetrics: notify.NewMetrics(m.Registerer),
dispatcherMetrics: dispatch.NewDispatcherMetrics(m.Registerer),
Store: store,
Metrics: m,
} }
return !am.Settings.IsNgAlertEnabled()
}
func (am *Alertmanager) Init() error {
return am.InitWithMetrics(am.Metrics)
}
// InitWithMetrics uses the supplied metrics for instantiation and
// allows testware to circumvent duplicate registration errors.
func (am *Alertmanager) InitWithMetrics(m *metrics.Metrics) (err error) {
am.stopc = make(chan struct{})
am.logger = log.New("alertmanager")
am.marker = types.NewMarker(m.Registerer)
am.stageMetrics = notify.NewMetrics(m.Registerer)
am.dispatcherMetrics = dispatch.NewDispatcherMetrics(m.Registerer)
am.Metrics = m
am.Store = store.DBstore{SQLStore: am.SQLStore}
// Initialize the notification log // Initialize the notification log
am.wg.Add(1) am.wg.Add(1)
var err error
am.notificationLog, err = nflog.New( am.notificationLog, err = nflog.New(
nflog.WithRetention(retentionNotificationsAndSilences), nflog.WithRetention(retentionNotificationsAndSilences),
nflog.WithSnapshot(filepath.Join(am.WorkingDirPath(), "notifications")), nflog.WithSnapshot(filepath.Join(am.WorkingDirPath(), "notifications")),
nflog.WithMaintenance(maintenanceNotificationAndSilences, am.stopc, am.wg.Done), nflog.WithMaintenance(maintenanceNotificationAndSilences, am.stopc, am.wg.Done),
) )
if err != nil { if err != nil {
return fmt.Errorf("unable to initialize the notification log component of alerting: %w", err) return nil, fmt.Errorf("unable to initialize the notification log component of alerting: %w", err)
} }
// Initialize silences // Initialize silences
am.silences, err = silence.New(silence.Options{ am.silences, err = silence.New(silence.Options{
@ -149,7 +134,7 @@ func (am *Alertmanager) InitWithMetrics(m *metrics.Metrics) (err error) {
Retention: retentionNotificationsAndSilences, Retention: retentionNotificationsAndSilences,
}) })
if err != nil { if err != nil {
return fmt.Errorf("unable to initialize the silencing component of alerting: %w", err) return nil, fmt.Errorf("unable to initialize the silencing component of alerting: %w", err)
} }
am.wg.Add(1) am.wg.Add(1)
@ -161,10 +146,10 @@ func (am *Alertmanager) InitWithMetrics(m *metrics.Metrics) (err error) {
// Initialize in-memory alerts // Initialize in-memory alerts
am.alerts, err = mem.NewAlerts(context.Background(), am.marker, memoryAlertsGCInterval, gokit_log.NewNopLogger()) am.alerts, err = mem.NewAlerts(context.Background(), am.marker, memoryAlertsGCInterval, gokit_log.NewNopLogger())
if err != nil { if err != nil {
return fmt.Errorf("unable to initialize the alert provider component of alerting: %w", err) return nil, fmt.Errorf("unable to initialize the alert provider component of alerting: %w", err)
} }
return nil return am, nil
} }
func (am *Alertmanager) Run(ctx context.Context) error { func (am *Alertmanager) Run(ctx context.Context) error {
@ -185,11 +170,6 @@ func (am *Alertmanager) Run(ctx context.Context) error {
} }
} }
// AddMigration runs the database migrations as the service starts.
func (am *Alertmanager) AddMigration(mg *migrator.Migrator) {
alertmanagerConfigurationMigration(mg)
}
func (am *Alertmanager) StopAndWait() error { func (am *Alertmanager) StopAndWait() error {
if am.dispatcher != nil { if am.dispatcher != nil {
am.dispatcher.Stop() am.dispatcher.Stop()

View File

@ -20,39 +20,41 @@ import (
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions" apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
"github.com/grafana/grafana/pkg/services/ngalert/metrics" "github.com/grafana/grafana/pkg/services/ngalert/metrics"
"github.com/grafana/grafana/pkg/services/ngalert/store"
"github.com/grafana/grafana/pkg/services/sqlstore" "github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/setting"
) )
func TestAlertmanager_ShouldUseDefaultConfigurationWhenNoConfiguration(t *testing.T) { func setupAMTest(t *testing.T) *Alertmanager {
am := &Alertmanager{}
dir, err := ioutil.TempDir("", "") dir, err := ioutil.TempDir("", "")
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { t.Cleanup(func() {
require.NoError(t, os.RemoveAll(dir)) require.NoError(t, os.RemoveAll(dir))
}) })
am.Settings = &setting.Cfg{ cfg := &setting.Cfg{
DataPath: dir, DataPath: dir,
} }
am.SQLStore = sqlstore.InitTestDB(t)
require.NoError(t, am.InitWithMetrics(metrics.NewMetrics(prometheus.NewRegistry()))) sqlStore := sqlstore.InitTestDB(t)
store := &store.DBstore{
BaseInterval: 10 * time.Second,
DefaultIntervalSeconds: 60,
SQLStore: sqlStore,
}
am, err := New(cfg, store, metrics.NewMetrics(prometheus.NewRegistry()))
require.NoError(t, err)
return am
}
func TestAlertmanager_ShouldUseDefaultConfigurationWhenNoConfiguration(t *testing.T) {
am := setupAMTest(t)
require.NoError(t, am.SyncAndApplyConfigFromDatabase()) require.NoError(t, am.SyncAndApplyConfigFromDatabase())
require.NotNil(t, am.config) require.NotNil(t, am.config)
} }
func TestPutAlert(t *testing.T) { func TestPutAlert(t *testing.T) {
am := &Alertmanager{} am := setupAMTest(t)
dir, err := ioutil.TempDir("", "")
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, os.RemoveAll(dir))
})
am.Settings = &setting.Cfg{
DataPath: dir,
}
require.NoError(t, am.InitWithMetrics(metrics.NewMetrics(prometheus.NewRegistry())))
startTime := time.Now() startTime := time.Now()
endTime := startTime.Add(2 * time.Hour) endTime := startTime.Add(2 * time.Hour)
@ -269,6 +271,7 @@ func TestPutAlert(t *testing.T) {
} }
for _, c := range cases { for _, c := range cases {
var err error
t.Run(c.title, func(t *testing.T) { t.Run(c.title, func(t *testing.T) {
r := prometheus.NewRegistry() r := prometheus.NewRegistry()
am.marker = types.NewMarker(r) am.marker = types.NewMarker(r)

View File

@ -1,17 +0,0 @@
package notifier
import "github.com/grafana/grafana/pkg/services/sqlstore/migrator"
func alertmanagerConfigurationMigration(mg *migrator.Migrator) {
alertConfiguration := migrator.Table{
Name: "alert_configuration",
Columns: []*migrator.Column{
{Name: "id", Type: migrator.DB_BigInt, IsPrimaryKey: true, IsAutoIncrement: true},
{Name: "alertmanager_configuration", Type: migrator.DB_Text, Nullable: false},
{Name: "configuration_version", Type: migrator.DB_NVarchar, Length: 3}, // In a format of vXX e.g. v1, v2, v10, etc
{Name: "created_at", Type: migrator.DB_Int, Nullable: false},
},
}
mg.AddMigration("create_alert_configuration_table", migrator.NewAddTableMigration(alertConfiguration))
}

View File

@ -44,7 +44,7 @@ func getLatestAlertmanagerConfiguration(sess *sqlstore.DBSession) (*models.Alert
// GetLatestAlertmanagerConfiguration returns the lastest version of the alertmanager configuration. // GetLatestAlertmanagerConfiguration returns the lastest version of the alertmanager configuration.
// It returns ErrNoAlertmanagerConfiguration if no configuration is found. // It returns ErrNoAlertmanagerConfiguration if no configuration is found.
func (st DBstore) GetLatestAlertmanagerConfiguration(query *models.GetLatestAlertmanagerConfigurationQuery) error { func (st *DBstore) GetLatestAlertmanagerConfiguration(query *models.GetLatestAlertmanagerConfigurationQuery) error {
return st.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error { return st.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
c, err := getLatestAlertmanagerConfiguration(sess) c, err := getLatestAlertmanagerConfiguration(sess)
if err != nil { if err != nil {
@ -57,7 +57,7 @@ func (st DBstore) GetLatestAlertmanagerConfiguration(query *models.GetLatestAler
// GetAlertmanagerConfiguration returns the alertmanager configuration identified by the query. // GetAlertmanagerConfiguration returns the alertmanager configuration identified by the query.
// It returns ErrNoAlertmanagerConfiguration if no such configuration is found. // It returns ErrNoAlertmanagerConfiguration if no such configuration is found.
func (st DBstore) GetAlertmanagerConfiguration(query *models.GetAlertmanagerConfigurationQuery) error { func (st *DBstore) GetAlertmanagerConfiguration(query *models.GetAlertmanagerConfigurationQuery) error {
return st.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error { return st.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
c, err := getAlertmanagerConfigurationByID(sess, query.ID) c, err := getAlertmanagerConfigurationByID(sess, query.ID)
if err != nil { if err != nil {
@ -69,7 +69,7 @@ func (st DBstore) GetAlertmanagerConfiguration(query *models.GetAlertmanagerConf
} }
// SaveAlertmanagerConfiguration creates an alertmanager configuration. // SaveAlertmanagerConfiguration creates an alertmanager configuration.
func (st DBstore) SaveAlertmanagerConfiguration(cmd *models.SaveAlertmanagerConfigurationCmd) error { func (st *DBstore) SaveAlertmanagerConfiguration(cmd *models.SaveAlertmanagerConfigurationCmd) error {
return st.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error { return st.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
config := models.AlertConfiguration{ config := models.AlertConfiguration{
AlertmanagerConfiguration: cmd.AlertmanagerConfiguration, AlertmanagerConfiguration: cmd.AlertmanagerConfiguration,

View File

@ -8,6 +8,8 @@ import (
"time" "time"
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions" apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/grafana/grafana/pkg/services/ngalert/models" "github.com/grafana/grafana/pkg/services/ngalert/models"
@ -44,6 +46,7 @@ func overrideAlertNGInRegistry(t *testing.T, cfg *setting.Cfg) ngalert.AlertNG {
Cfg: cfg, Cfg: cfg,
RouteRegister: routing.NewRouteRegister(), RouteRegister: routing.NewRouteRegister(),
Log: log.New("ngalert-test"), Log: log.New("ngalert-test"),
Metrics: metrics.NewMetrics(prometheus.NewRegistry()),
} }
// hook for initialising the service after the Cfg is populated // hook for initialising the service after the Cfg is populated

View File

@ -17,6 +17,9 @@ func AddTablesMigrations(mg *migrator.Migrator) {
// Create alert_rule // Create alert_rule
AddAlertRuleMigrations(mg, 60) AddAlertRuleMigrations(mg, 60)
AddAlertRuleVersionMigrations(mg) AddAlertRuleVersionMigrations(mg)
// Create Alertmanager configurations
AddAlertmanagerConfigMigrations(mg)
} }
// AddAlertDefinitionMigrations should not be modified. // AddAlertDefinitionMigrations should not be modified.
@ -235,3 +238,17 @@ func AddAlertRuleVersionMigrations(mg *migrator.Migrator) {
// add labels column // add labels column
mg.AddMigration("add column labels to alert_rule_version", migrator.NewAddColumnMigration(alertRuleVersion, &migrator.Column{Name: "labels", Type: migrator.DB_Text, Nullable: true})) mg.AddMigration("add column labels to alert_rule_version", migrator.NewAddColumnMigration(alertRuleVersion, &migrator.Column{Name: "labels", Type: migrator.DB_Text, Nullable: true}))
} }
func AddAlertmanagerConfigMigrations(mg *migrator.Migrator) {
alertConfiguration := migrator.Table{
Name: "alert_configuration",
Columns: []*migrator.Column{
{Name: "id", Type: migrator.DB_BigInt, IsPrimaryKey: true, IsAutoIncrement: true},
{Name: "alertmanager_configuration", Type: migrator.DB_Text, Nullable: false},
{Name: "configuration_version", Type: migrator.DB_NVarchar, Length: 3}, // In a format of vXX e.g. v1, v2, v10, etc
{Name: "created_at", Type: migrator.DB_Int, Nullable: false},
},
}
mg.AddMigration("create_alert_configuration_table", migrator.NewAddTableMigration(alertConfiguration))
}

View File

@ -10,7 +10,6 @@ import (
"path/filepath" "path/filepath"
"strings" "strings"
"testing" "testing"
"time"
"github.com/grafana/grafana/pkg/infra/fs" "github.com/grafana/grafana/pkg/infra/fs"
"github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/models"
@ -66,8 +65,6 @@ func StartGrafana(t *testing.T, grafDir, cfgPath string, sqlStore *sqlstore.SQLS
} }
}() }()
t.Cleanup(func() { t.Cleanup(func() {
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
if err := server.Shutdown(ctx, "test cleanup"); err != nil { if err := server.Shutdown(ctx, "test cleanup"); err != nil {
t.Error("Timed out waiting on server to shut down") t.Error("Timed out waiting on server to shut down")
} }