mirror of
https://github.com/grafana/grafana.git
synced 2024-11-25 18:30:41 -06:00
Alerting: Rename sender.Sender to sender.ExternalAlertmanagers (#52463)
This commit is contained in:
parent
146c90d59e
commit
79d92aa03e
@ -27,11 +27,11 @@ type AlertsRouter struct {
|
||||
clock clock.Clock
|
||||
adminConfigStore store.AdminConfigurationStore
|
||||
|
||||
// senders help us send alerts to external Alertmanagers.
|
||||
adminConfigMtx sync.RWMutex
|
||||
sendAlertsTo map[int64]models.AlertmanagersChoice
|
||||
senders map[int64]*Sender
|
||||
sendersCfgHash map[int64]string
|
||||
// externalAlertmanagers help us send alerts to external Alertmanagers.
|
||||
adminConfigMtx sync.RWMutex
|
||||
sendAlertsTo map[int64]models.AlertmanagersChoice
|
||||
externalAlertmanagers map[int64]*ExternalAlertmanager
|
||||
externalAlertmanagersCfgHash map[int64]string
|
||||
|
||||
multiOrgNotifier *notifier.MultiOrgAlertmanager
|
||||
|
||||
@ -46,10 +46,10 @@ func NewAlertsRouter(multiOrgNotifier *notifier.MultiOrgAlertmanager, store stor
|
||||
clock: clk,
|
||||
adminConfigStore: store,
|
||||
|
||||
adminConfigMtx: sync.RWMutex{},
|
||||
senders: map[int64]*Sender{},
|
||||
sendersCfgHash: map[int64]string{},
|
||||
sendAlertsTo: map[int64]models.AlertmanagersChoice{},
|
||||
adminConfigMtx: sync.RWMutex{},
|
||||
externalAlertmanagers: map[int64]*ExternalAlertmanager{},
|
||||
externalAlertmanagersCfgHash: map[int64]string{},
|
||||
sendAlertsTo: map[int64]models.AlertmanagersChoice{},
|
||||
|
||||
multiOrgNotifier: multiOrgNotifier,
|
||||
|
||||
@ -83,9 +83,9 @@ func (d *AlertsRouter) SyncAndApplyConfigFromDatabase() error {
|
||||
// Update the Alertmanagers choice for the organization.
|
||||
d.sendAlertsTo[cfg.OrgID] = cfg.SendAlertsTo
|
||||
|
||||
orgsFound[cfg.OrgID] = struct{}{} // keep track of the which senders we need to keep.
|
||||
orgsFound[cfg.OrgID] = struct{}{} // keep track of the which externalAlertmanagers we need to keep.
|
||||
|
||||
existing, ok := d.senders[cfg.OrgID]
|
||||
existing, ok := d.externalAlertmanagers[cfg.OrgID]
|
||||
|
||||
// We have no running sender and no Alertmanager(s) configured, no-op.
|
||||
if !ok && len(cfg.Alertmanagers) == 0 {
|
||||
@ -107,7 +107,7 @@ func (d *AlertsRouter) SyncAndApplyConfigFromDatabase() error {
|
||||
|
||||
// We have a running sender, check if we need to apply a new config.
|
||||
if ok {
|
||||
if d.sendersCfgHash[cfg.OrgID] == cfg.AsSHA256() {
|
||||
if d.externalAlertmanagersCfgHash[cfg.OrgID] == cfg.AsSHA256() {
|
||||
d.logger.Debug("sender configuration is the same as the one running, no-op", "org", cfg.OrgID, "alertmanagers", cfg.Alertmanagers)
|
||||
continue
|
||||
}
|
||||
@ -118,19 +118,19 @@ func (d *AlertsRouter) SyncAndApplyConfigFromDatabase() error {
|
||||
d.logger.Error("failed to apply configuration", "err", err, "org", cfg.OrgID)
|
||||
continue
|
||||
}
|
||||
d.sendersCfgHash[cfg.OrgID] = cfg.AsSHA256()
|
||||
d.externalAlertmanagersCfgHash[cfg.OrgID] = cfg.AsSHA256()
|
||||
continue
|
||||
}
|
||||
|
||||
// No sender and have Alertmanager(s) to send to - start a new one.
|
||||
d.logger.Info("creating new sender for the external alertmanagers", "org", cfg.OrgID, "alertmanagers", cfg.Alertmanagers)
|
||||
s, err := New()
|
||||
s, err := NewExternalAlertmanagerSender()
|
||||
if err != nil {
|
||||
d.logger.Error("unable to start the sender", "err", err, "org", cfg.OrgID)
|
||||
continue
|
||||
}
|
||||
|
||||
d.senders[cfg.OrgID] = s
|
||||
d.externalAlertmanagers[cfg.OrgID] = s
|
||||
s.Run()
|
||||
|
||||
err = s.ApplyConfig(cfg)
|
||||
@ -139,21 +139,21 @@ func (d *AlertsRouter) SyncAndApplyConfigFromDatabase() error {
|
||||
continue
|
||||
}
|
||||
|
||||
d.sendersCfgHash[cfg.OrgID] = cfg.AsSHA256()
|
||||
d.externalAlertmanagersCfgHash[cfg.OrgID] = cfg.AsSHA256()
|
||||
}
|
||||
|
||||
sendersToStop := map[int64]*Sender{}
|
||||
sendersToStop := map[int64]*ExternalAlertmanager{}
|
||||
|
||||
for orgID, s := range d.senders {
|
||||
for orgID, s := range d.externalAlertmanagers {
|
||||
if _, exists := orgsFound[orgID]; !exists {
|
||||
sendersToStop[orgID] = s
|
||||
delete(d.senders, orgID)
|
||||
delete(d.sendersCfgHash, orgID)
|
||||
delete(d.externalAlertmanagers, orgID)
|
||||
delete(d.externalAlertmanagersCfgHash, orgID)
|
||||
}
|
||||
}
|
||||
d.adminConfigMtx.Unlock()
|
||||
|
||||
// We can now stop these senders w/o having to hold a lock.
|
||||
// We can now stop these externalAlertmanagers w/o having to hold a lock.
|
||||
for orgID, s := range sendersToStop {
|
||||
d.logger.Info("stopping sender", "org", orgID)
|
||||
s.Stop()
|
||||
@ -197,7 +197,7 @@ func (d *AlertsRouter) Send(key models.AlertRuleKey, alerts definitions.Postable
|
||||
// and alerts are not being handled just internally.
|
||||
d.adminConfigMtx.RLock()
|
||||
defer d.adminConfigMtx.RUnlock()
|
||||
s, ok := d.senders[key.OrgID]
|
||||
s, ok := d.externalAlertmanagers[key.OrgID]
|
||||
if ok && d.sendAlertsTo[key.OrgID] != models.InternalAlertmanager {
|
||||
logger.Debug("sending alerts to external notifier", "count", len(alerts.PostableAlerts), "alerts", alerts.PostableAlerts)
|
||||
s.SendAlerts(alerts)
|
||||
@ -213,7 +213,7 @@ func (d *AlertsRouter) Send(key models.AlertRuleKey, alerts definitions.Postable
|
||||
func (d *AlertsRouter) AlertmanagersFor(orgID int64) []*url.URL {
|
||||
d.adminConfigMtx.RLock()
|
||||
defer d.adminConfigMtx.RUnlock()
|
||||
s, ok := d.senders[orgID]
|
||||
s, ok := d.externalAlertmanagers[orgID]
|
||||
if !ok {
|
||||
return []*url.URL{}
|
||||
}
|
||||
@ -224,7 +224,7 @@ func (d *AlertsRouter) AlertmanagersFor(orgID int64) []*url.URL {
|
||||
func (d *AlertsRouter) DroppedAlertmanagersFor(orgID int64) []*url.URL {
|
||||
d.adminConfigMtx.RLock()
|
||||
defer d.adminConfigMtx.RUnlock()
|
||||
s, ok := d.senders[orgID]
|
||||
s, ok := d.externalAlertmanagers[orgID]
|
||||
if !ok {
|
||||
return []*url.URL{}
|
||||
}
|
||||
@ -243,8 +243,8 @@ func (d *AlertsRouter) Run(ctx context.Context) error {
|
||||
case <-ctx.Done():
|
||||
// Stop sending alerts to all external Alertmanager(s).
|
||||
d.adminConfigMtx.Lock()
|
||||
for orgID, s := range d.senders {
|
||||
delete(d.senders, orgID) // delete before we stop to make sure we don't accept any more alerts.
|
||||
for orgID, s := range d.externalAlertmanagers {
|
||||
delete(d.externalAlertmanagers, orgID) // delete before we stop to make sure we don't accept any more alerts.
|
||||
s.Stop()
|
||||
}
|
||||
d.adminConfigMtx.Unlock()
|
||||
|
@ -53,8 +53,8 @@ func TestSendingToExternalAlertmanager(t *testing.T) {
|
||||
// Make sure we sync the configuration at least once before the evaluation happens to guarantee the sender is running
|
||||
// when the first alert triggers.
|
||||
require.NoError(t, alertsRouter.SyncAndApplyConfigFromDatabase())
|
||||
require.Equal(t, 1, len(alertsRouter.senders))
|
||||
require.Equal(t, 1, len(alertsRouter.sendersCfgHash))
|
||||
require.Equal(t, 1, len(alertsRouter.externalAlertmanagers))
|
||||
require.Equal(t, 1, len(alertsRouter.externalAlertmanagersCfgHash))
|
||||
|
||||
// Then, ensure we've discovered the Alertmanager.
|
||||
assertAlertmanagersStatusForOrg(t, alertsRouter, ruleKey.OrgID, 1, 0)
|
||||
@ -74,10 +74,10 @@ func TestSendingToExternalAlertmanager(t *testing.T) {
|
||||
|
||||
// Now, let's remove the Alertmanager from the admin configuration.
|
||||
mockedGetAdminConfigurations.Return(nil, nil)
|
||||
// Again, make sure we sync and verify the senders.
|
||||
// Again, make sure we sync and verify the externalAlertmanagers.
|
||||
require.NoError(t, alertsRouter.SyncAndApplyConfigFromDatabase())
|
||||
require.Equal(t, 0, len(alertsRouter.senders))
|
||||
require.Equal(t, 0, len(alertsRouter.sendersCfgHash))
|
||||
require.Equal(t, 0, len(alertsRouter.externalAlertmanagers))
|
||||
require.Equal(t, 0, len(alertsRouter.externalAlertmanagersCfgHash))
|
||||
|
||||
// Then, ensure we've dropped the Alertmanager.
|
||||
assertAlertmanagersStatusForOrg(t, alertsRouter, ruleKey.OrgID, 0, 0)
|
||||
@ -111,8 +111,8 @@ func TestSendingToExternalAlertmanager_WithMultipleOrgs(t *testing.T) {
|
||||
// Make sure we sync the configuration at least once before the evaluation happens to guarantee the sender is running
|
||||
// when the first alert triggers.
|
||||
require.NoError(t, alertsRouter.SyncAndApplyConfigFromDatabase())
|
||||
require.Equal(t, 1, len(alertsRouter.senders))
|
||||
require.Equal(t, 1, len(alertsRouter.sendersCfgHash))
|
||||
require.Equal(t, 1, len(alertsRouter.externalAlertmanagers))
|
||||
require.Equal(t, 1, len(alertsRouter.externalAlertmanagersCfgHash))
|
||||
|
||||
// Then, ensure we've discovered the Alertmanager.
|
||||
assertAlertmanagersStatusForOrg(t, alertsRouter, ruleKey1.OrgID, 1, 0)
|
||||
@ -123,10 +123,10 @@ func TestSendingToExternalAlertmanager_WithMultipleOrgs(t *testing.T) {
|
||||
{OrgID: ruleKey2.OrgID, Alertmanagers: []string{fakeAM.Server.URL}},
|
||||
}, nil)
|
||||
|
||||
// If we sync again, new senders must have spawned.
|
||||
// If we sync again, new externalAlertmanagers must have spawned.
|
||||
require.NoError(t, alertsRouter.SyncAndApplyConfigFromDatabase())
|
||||
require.Equal(t, 2, len(alertsRouter.senders))
|
||||
require.Equal(t, 2, len(alertsRouter.sendersCfgHash))
|
||||
require.Equal(t, 2, len(alertsRouter.externalAlertmanagers))
|
||||
require.Equal(t, 2, len(alertsRouter.externalAlertmanagersCfgHash))
|
||||
|
||||
// Then, ensure we've discovered the Alertmanager for the new organization.
|
||||
assertAlertmanagersStatusForOrg(t, alertsRouter, ruleKey1.OrgID, 1, 0)
|
||||
@ -160,15 +160,15 @@ func TestSendingToExternalAlertmanager_WithMultipleOrgs(t *testing.T) {
|
||||
}, nil)
|
||||
|
||||
// Before we sync, let's grab the existing hash of this particular org.
|
||||
currentHash := alertsRouter.sendersCfgHash[ruleKey2.OrgID]
|
||||
currentHash := alertsRouter.externalAlertmanagersCfgHash[ruleKey2.OrgID]
|
||||
|
||||
// Now, sync again.
|
||||
require.NoError(t, alertsRouter.SyncAndApplyConfigFromDatabase())
|
||||
|
||||
// The hash for org two should not be the same and we should still have two senders.
|
||||
require.NotEqual(t, alertsRouter.sendersCfgHash[ruleKey2.OrgID], currentHash)
|
||||
require.Equal(t, 2, len(alertsRouter.senders))
|
||||
require.Equal(t, 2, len(alertsRouter.sendersCfgHash))
|
||||
// The hash for org two should not be the same and we should still have two externalAlertmanagers.
|
||||
require.NotEqual(t, alertsRouter.externalAlertmanagersCfgHash[ruleKey2.OrgID], currentHash)
|
||||
require.Equal(t, 2, len(alertsRouter.externalAlertmanagers))
|
||||
require.Equal(t, 2, len(alertsRouter.externalAlertmanagersCfgHash))
|
||||
|
||||
assertAlertmanagersStatusForOrg(t, alertsRouter, ruleKey2.OrgID, 2, 0)
|
||||
|
||||
@ -179,13 +179,13 @@ func TestSendingToExternalAlertmanager_WithMultipleOrgs(t *testing.T) {
|
||||
}, nil)
|
||||
|
||||
// Before we sync, let's get the current config hash.
|
||||
currentHash = alertsRouter.sendersCfgHash[ruleKey1.OrgID]
|
||||
currentHash = alertsRouter.externalAlertmanagersCfgHash[ruleKey1.OrgID]
|
||||
|
||||
// Now, sync again.
|
||||
require.NoError(t, alertsRouter.SyncAndApplyConfigFromDatabase())
|
||||
|
||||
// The old configuration should still be running.
|
||||
require.Equal(t, alertsRouter.sendersCfgHash[ruleKey1.OrgID], currentHash)
|
||||
require.Equal(t, alertsRouter.externalAlertmanagersCfgHash[ruleKey1.OrgID], currentHash)
|
||||
require.Equal(t, 1, len(alertsRouter.AlertmanagersFor(ruleKey1.OrgID)))
|
||||
|
||||
// If we fix it - it should be applied.
|
||||
@ -195,15 +195,15 @@ func TestSendingToExternalAlertmanager_WithMultipleOrgs(t *testing.T) {
|
||||
}, nil)
|
||||
|
||||
require.NoError(t, alertsRouter.SyncAndApplyConfigFromDatabase())
|
||||
require.NotEqual(t, alertsRouter.sendersCfgHash[ruleKey1.OrgID], currentHash)
|
||||
require.NotEqual(t, alertsRouter.externalAlertmanagersCfgHash[ruleKey1.OrgID], currentHash)
|
||||
|
||||
// Finally, remove everything.
|
||||
mockedGetAdminConfigurations.Return([]*models.AdminConfiguration{}, nil)
|
||||
|
||||
require.NoError(t, alertsRouter.SyncAndApplyConfigFromDatabase())
|
||||
|
||||
require.Equal(t, 0, len(alertsRouter.senders))
|
||||
require.Equal(t, 0, len(alertsRouter.sendersCfgHash))
|
||||
require.Equal(t, 0, len(alertsRouter.externalAlertmanagers))
|
||||
require.Equal(t, 0, len(alertsRouter.externalAlertmanagersCfgHash))
|
||||
|
||||
assertAlertmanagersStatusForOrg(t, alertsRouter, ruleKey1.OrgID, 0, 0)
|
||||
assertAlertmanagersStatusForOrg(t, alertsRouter, ruleKey2.OrgID, 0, 0)
|
||||
@ -236,8 +236,8 @@ func TestChangingAlertmanagersChoice(t *testing.T) {
|
||||
// Make sure we sync the configuration at least once before the evaluation happens to guarantee the sender is running
|
||||
// when the first alert triggers.
|
||||
require.NoError(t, alertsRouter.SyncAndApplyConfigFromDatabase())
|
||||
require.Equal(t, 1, len(alertsRouter.senders))
|
||||
require.Equal(t, 1, len(alertsRouter.sendersCfgHash))
|
||||
require.Equal(t, 1, len(alertsRouter.externalAlertmanagers))
|
||||
require.Equal(t, 1, len(alertsRouter.externalAlertmanagersCfgHash))
|
||||
require.Equal(t, models.AllAlertmanagers, alertsRouter.sendAlertsTo[ruleKey.OrgID])
|
||||
|
||||
// Then, ensure we've discovered the Alertmanager.
|
||||
@ -259,10 +259,10 @@ func TestChangingAlertmanagersChoice(t *testing.T) {
|
||||
mockedGetAdminConfigurations.Return([]*models.AdminConfiguration{
|
||||
{OrgID: ruleKey.OrgID, Alertmanagers: []string{fakeAM.Server.URL}, SendAlertsTo: models.ExternalAlertmanagers},
|
||||
}, nil)
|
||||
// Again, make sure we sync and verify the senders.
|
||||
// Again, make sure we sync and verify the externalAlertmanagers.
|
||||
require.NoError(t, alertsRouter.SyncAndApplyConfigFromDatabase())
|
||||
require.Equal(t, 1, len(alertsRouter.senders))
|
||||
require.Equal(t, 1, len(alertsRouter.sendersCfgHash))
|
||||
require.Equal(t, 1, len(alertsRouter.externalAlertmanagers))
|
||||
require.Equal(t, 1, len(alertsRouter.externalAlertmanagersCfgHash))
|
||||
|
||||
assertAlertmanagersStatusForOrg(t, alertsRouter, ruleKey.OrgID, 1, 0)
|
||||
require.Equal(t, models.ExternalAlertmanagers, alertsRouter.sendAlertsTo[ruleKey.OrgID])
|
||||
@ -272,11 +272,11 @@ func TestChangingAlertmanagersChoice(t *testing.T) {
|
||||
{OrgID: ruleKey.OrgID, Alertmanagers: []string{fakeAM.Server.URL}, SendAlertsTo: models.InternalAlertmanager},
|
||||
}, nil)
|
||||
|
||||
// Again, make sure we sync and verify the senders.
|
||||
// senders should be running even though alerts are being handled externally.
|
||||
// Again, make sure we sync and verify the externalAlertmanagers.
|
||||
// externalAlertmanagers should be running even though alerts are being handled externally.
|
||||
require.NoError(t, alertsRouter.SyncAndApplyConfigFromDatabase())
|
||||
require.Equal(t, 1, len(alertsRouter.senders))
|
||||
require.Equal(t, 1, len(alertsRouter.sendersCfgHash))
|
||||
require.Equal(t, 1, len(alertsRouter.externalAlertmanagers))
|
||||
require.Equal(t, 1, len(alertsRouter.externalAlertmanagersCfgHash))
|
||||
|
||||
// Then, ensure the Alertmanager is still listed and the Alertmanagers choice has changed.
|
||||
assertAlertmanagersStatusForOrg(t, alertsRouter, ruleKey.OrgID, 1, 0)
|
||||
|
@ -26,8 +26,8 @@ const (
|
||||
defaultTimeout = 10 * time.Second
|
||||
)
|
||||
|
||||
// Sender is responsible for dispatching alert notifications to an external Alertmanager service.
|
||||
type Sender struct {
|
||||
// ExternalAlertmanager is responsible for dispatching alert notifications to an external Alertmanager service.
|
||||
type ExternalAlertmanager struct {
|
||||
logger log.Logger
|
||||
wg sync.WaitGroup
|
||||
|
||||
@ -37,10 +37,10 @@ type Sender struct {
|
||||
sdManager *discovery.Manager
|
||||
}
|
||||
|
||||
func New() (*Sender, error) {
|
||||
func NewExternalAlertmanagerSender() (*ExternalAlertmanager, error) {
|
||||
l := log.New("sender")
|
||||
sdCtx, sdCancel := context.WithCancel(context.Background())
|
||||
s := &Sender{
|
||||
s := &ExternalAlertmanager{
|
||||
logger: l,
|
||||
sdCancel: sdCancel,
|
||||
}
|
||||
@ -58,7 +58,7 @@ func New() (*Sender, error) {
|
||||
}
|
||||
|
||||
// ApplyConfig syncs a configuration with the sender.
|
||||
func (s *Sender) ApplyConfig(cfg *ngmodels.AdminConfiguration) error {
|
||||
func (s *ExternalAlertmanager) ApplyConfig(cfg *ngmodels.AdminConfiguration) error {
|
||||
notifierCfg, err := buildNotifierConfig(cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -76,7 +76,7 @@ func (s *Sender) ApplyConfig(cfg *ngmodels.AdminConfiguration) error {
|
||||
return s.sdManager.ApplyConfig(sdCfgs)
|
||||
}
|
||||
|
||||
func (s *Sender) Run() {
|
||||
func (s *ExternalAlertmanager) Run() {
|
||||
s.wg.Add(2)
|
||||
|
||||
go func() {
|
||||
@ -93,7 +93,7 @@ func (s *Sender) Run() {
|
||||
}
|
||||
|
||||
// SendAlerts sends a set of alerts to the configured Alertmanager(s).
|
||||
func (s *Sender) SendAlerts(alerts apimodels.PostableAlerts) {
|
||||
func (s *ExternalAlertmanager) SendAlerts(alerts apimodels.PostableAlerts) {
|
||||
if len(alerts.PostableAlerts) == 0 {
|
||||
s.logger.Debug("no alerts to send to external Alertmanager(s)")
|
||||
return
|
||||
@ -109,19 +109,19 @@ func (s *Sender) SendAlerts(alerts apimodels.PostableAlerts) {
|
||||
}
|
||||
|
||||
// Stop shuts down the sender.
|
||||
func (s *Sender) Stop() {
|
||||
func (s *ExternalAlertmanager) Stop() {
|
||||
s.sdCancel()
|
||||
s.manager.Stop()
|
||||
s.wg.Wait()
|
||||
}
|
||||
|
||||
// Alertmanagers returns a list of the discovered Alertmanager(s).
|
||||
func (s *Sender) Alertmanagers() []*url.URL {
|
||||
func (s *ExternalAlertmanager) Alertmanagers() []*url.URL {
|
||||
return s.manager.Alertmanagers()
|
||||
}
|
||||
|
||||
// DroppedAlertmanagers returns a list of Alertmanager(s) we no longer send alerts to.
|
||||
func (s *Sender) DroppedAlertmanagers() []*url.URL {
|
||||
func (s *ExternalAlertmanager) DroppedAlertmanagers() []*url.URL {
|
||||
return s.manager.DroppedAlertmanagers()
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user