Alerting: refactor scheduler and separate notification logic (#48144)

* Introduce AlertsRouter in the sender package, and move all fields and methods related to notifications out of the scheduler to this router.
* Introduce a new interface AlertsSender in the schedule package and replace calls of anonymous function `notify` inside the ruleRoutine to calling methods of that interface.
* Rename interface Scheduler in api package to ExternalAlertmanagerProvider, and replace scheduler with AlertRouter as struct that implements the interface.
This commit is contained in:
Yuriy Tseretyan 2022-07-12 15:13:04 -04:00 committed by GitHub
parent ededf1dd6f
commit a6b1090879
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 538 additions and 462 deletions

View File

@ -18,6 +18,7 @@ import (
"github.com/grafana/grafana/pkg/services/ngalert/notifier"
"github.com/grafana/grafana/pkg/services/ngalert/provisioning"
"github.com/grafana/grafana/pkg/services/ngalert/schedule"
"github.com/grafana/grafana/pkg/services/ngalert/sender"
"github.com/grafana/grafana/pkg/services/ngalert/state"
"github.com/grafana/grafana/pkg/services/ngalert/store"
"github.com/grafana/grafana/pkg/services/quota"
@ -28,7 +29,7 @@ import (
// timeNow makes it possible to test usage of time
var timeNow = time.Now
type Scheduler interface {
type ExternalAlertmanagerProvider interface {
AlertmanagersFor(orgID int64) []*url.URL
DroppedAlertmanagersFor(orgID int64) []*url.URL
}
@ -81,6 +82,7 @@ type API struct {
Templates *provisioning.TemplateService
MuteTimings *provisioning.MuteTimingService
AlertRules *provisioning.AlertRuleService
AlertsRouter *sender.AlertsRouter
}
// RegisterAPIEndpoints registers API handlers
@ -128,9 +130,9 @@ func (api *API) RegisterAPIEndpoints(m *metrics.API) {
}), m)
api.RegisterConfigurationApiEndpoints(NewForkedConfiguration(
&AdminSrv{
store: api.AdminConfigStore,
log: logger,
scheduler: api.Schedule,
store: api.AdminConfigStore,
log: logger,
alertmanagerProvider: api.AlertsRouter,
},
), m)

View File

@ -16,14 +16,14 @@ import (
)
type AdminSrv struct {
scheduler Scheduler
store store.AdminConfigurationStore
log log.Logger
alertmanagerProvider ExternalAlertmanagerProvider
store store.AdminConfigurationStore
log log.Logger
}
func (srv AdminSrv) RouteGetAlertmanagers(c *models.ReqContext) response.Response {
urls := srv.scheduler.AlertmanagersFor(c.OrgId)
droppedURLs := srv.scheduler.DroppedAlertmanagersFor(c.OrgId)
urls := srv.alertmanagerProvider.AlertmanagersFor(c.OrgId)
droppedURLs := srv.alertmanagerProvider.DroppedAlertmanagersFor(c.OrgId)
ams := v1.AlertManagersResult{Active: make([]v1.AlertManager, len(urls)), Dropped: make([]v1.AlertManager, len(droppedURLs))}
for i, url := range urls {
ams.Active[i].URL = url.String()

View File

@ -2,6 +2,7 @@ package ngalert
import (
"context"
"fmt"
"net/url"
"github.com/benbjohnson/clock"
@ -23,6 +24,7 @@ import (
"github.com/grafana/grafana/pkg/services/ngalert/notifier"
"github.com/grafana/grafana/pkg/services/ngalert/provisioning"
"github.com/grafana/grafana/pkg/services/ngalert/schedule"
"github.com/grafana/grafana/pkg/services/ngalert/sender"
"github.com/grafana/grafana/pkg/services/ngalert/state"
"github.com/grafana/grafana/pkg/services/ngalert/store"
"github.com/grafana/grafana/pkg/services/notifications"
@ -92,6 +94,7 @@ type AlertNG struct {
// Alerting notification services
MultiOrgAlertmanager *notifier.MultiOrgAlertmanager
AlertsRouter *sender.AlertsRouter
accesscontrol accesscontrol.AccessControl
bus bus.Bus
@ -125,20 +128,7 @@ func (ng *AlertNG) init() error {
// Let's make sure we're able to complete an initial sync of Alertmanagers before we start the alerting components.
if err := ng.MultiOrgAlertmanager.LoadAndSyncAlertmanagersForOrgs(context.Background()); err != nil {
return err
}
schedCfg := schedule.SchedulerCfg{
Cfg: ng.Cfg.UnifiedAlerting,
C: clock.New(),
Logger: ng.Log,
Evaluator: eval.NewEvaluator(ng.Cfg, ng.Log, ng.DataSourceCache, ng.SecretsService, ng.ExpressionService),
InstanceStore: store,
RuleStore: store,
AdminConfigStore: store,
OrgStore: store,
MultiOrgNotifier: ng.MultiOrgAlertmanager,
Metrics: ng.Metrics.GetSchedulerMetrics(),
return fmt.Errorf("failed to initialize alerting because multiorg alertmanager manager failed to warm up: %w", err)
}
appUrl, err := url.Parse(ng.Cfg.AppURL)
@ -147,7 +137,30 @@ func (ng *AlertNG) init() error {
appUrl = nil
}
stateManager := state.NewManager(ng.Log, ng.Metrics.GetStateMetrics(), appUrl, store, store, ng.dashboardService, ng.imageService, clock.New())
clk := clock.New()
alertsRouter := sender.NewAlertsRouter(ng.MultiOrgAlertmanager, store, clk, appUrl, ng.Cfg.UnifiedAlerting.DisabledOrgs, ng.Cfg.UnifiedAlerting.AdminConfigPollInterval)
// Make sure we sync at least once as Grafana starts to get the router up and running before we start sending any alerts.
if err := alertsRouter.SyncAndApplyConfigFromDatabase(); err != nil {
return fmt.Errorf("failed to initialize alerting because alert notifications router failed to warm up: %w", err)
}
ng.AlertsRouter = alertsRouter
schedCfg := schedule.SchedulerCfg{
Cfg: ng.Cfg.UnifiedAlerting,
C: clk,
Logger: ng.Log,
Evaluator: eval.NewEvaluator(ng.Cfg, ng.Log, ng.DataSourceCache, ng.SecretsService, ng.ExpressionService),
InstanceStore: store,
RuleStore: store,
OrgStore: store,
Metrics: ng.Metrics.GetSchedulerMetrics(),
AlertSender: alertsRouter,
}
stateManager := state.NewManager(ng.Log, ng.Metrics.GetStateMetrics(), appUrl, store, store, ng.dashboardService, ng.imageService, clk)
scheduler := schedule.NewScheduler(schedCfg, appUrl, stateManager, ng.bus)
ng.stateManager = stateManager
@ -185,6 +198,7 @@ func (ng *AlertNG) init() error {
Templates: templateService,
MuteTimings: muteTimingService,
AlertRules: alertRuleService,
AlertsRouter: alertsRouter,
}
api.RegisterAPIEndpoints(ng.Metrics.GetAPIMetrics())
@ -198,14 +212,18 @@ func (ng *AlertNG) Run(ctx context.Context) error {
children, subCtx := errgroup.WithContext(ctx)
children.Go(func() error {
return ng.MultiOrgAlertmanager.Run(subCtx)
})
children.Go(func() error {
return ng.AlertsRouter.Run(subCtx)
})
if ng.Cfg.UnifiedAlerting.ExecuteAlerts {
children.Go(func() error {
return ng.schedule.Run(subCtx)
})
}
children.Go(func() error {
return ng.MultiOrgAlertmanager.Run(subCtx)
})
return children.Wait()
}

View File

@ -0,0 +1,52 @@
// Code generated by mockery v2.10.0. DO NOT EDIT.
package schedule
import (
definitions "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
mock "github.com/stretchr/testify/mock"
models "github.com/grafana/grafana/pkg/services/ngalert/models"
)
// AlertsSenderMock is an autogenerated mock type for the AlertsSender type
type AlertsSenderMock struct {
mock.Mock
}
type AlertsSenderMock_Expecter struct {
mock *mock.Mock
}
func (_m *AlertsSenderMock) EXPECT() *AlertsSenderMock_Expecter {
return &AlertsSenderMock_Expecter{mock: &_m.Mock}
}
// Send provides a mock function with given fields: key, alerts
func (_m *AlertsSenderMock) Send(key models.AlertRuleKey, alerts definitions.PostableAlerts) {
_m.Called(key, alerts)
}
// AlertsSenderMock_Send_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Send'
type AlertsSenderMock_Send_Call struct {
*mock.Call
}
// Send is a helper method to define mock.On call
// - key models.AlertRuleKey
// - alerts definitions.PostableAlerts
func (_e *AlertsSenderMock_Expecter) Send(key interface{}, alerts interface{}) *AlertsSenderMock_Send_Call {
return &AlertsSenderMock_Send_Call{Call: _e.mock.On("Send", key, alerts)}
}
func (_c *AlertsSenderMock_Send_Call) Run(run func(key models.AlertRuleKey, alerts definitions.PostableAlerts)) *AlertsSenderMock_Send_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(models.AlertRuleKey), args[1].(definitions.PostableAlerts))
})
return _c
}
func (_c *AlertsSenderMock_Send_Call) Return() *AlertsSenderMock_Send_Call {
_c.Call.Return()
return _c
}

View File

@ -2,10 +2,8 @@ package schedule
import (
"context"
"errors"
"fmt"
"net/url"
"sync"
"time"
"github.com/grafana/grafana/pkg/bus"
@ -17,8 +15,6 @@ import (
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/notifier"
"github.com/grafana/grafana/pkg/services/ngalert/sender"
"github.com/grafana/grafana/pkg/services/ngalert/state"
"github.com/grafana/grafana/pkg/services/ngalert/store"
"github.com/grafana/grafana/pkg/setting"
@ -34,14 +30,6 @@ type ScheduleService interface {
// Run the scheduler until the context is canceled or the scheduler returns
// an error. The scheduler is terminated when this function returns.
Run(context.Context) error
// AlertmanagersFor returns all the discovered Alertmanager URLs for the
// organization.
AlertmanagersFor(orgID int64) []*url.URL
// DroppedAlertmanagersFor returns all the dropped Alertmanager URLs for the
// organization.
DroppedAlertmanagersFor(orgID int64) []*url.URL
// UpdateAlertRule notifies scheduler that a rule has been changed
UpdateAlertRule(key ngmodels.AlertRuleKey)
// UpdateAlertRulesByNamespaceUID notifies scheduler that all rules in a namespace should be updated.
@ -56,6 +44,12 @@ type ScheduleService interface {
folderUpdateHandler(ctx context.Context, evt *events.FolderUpdated) error
}
//go:generate mockery --name AlertsSender --structname AlertsSenderMock --inpackage --filename alerts_sender_mock.go --with-expecter
// AlertsSender is an interface for a service that is responsible for sending notifications to the end-user.
type AlertsSender interface {
Send(key ngmodels.AlertRuleKey, alerts definitions.PostableAlerts)
}
type schedule struct {
// base tick rate (fastest possible configured check)
baseInterval time.Duration
@ -83,27 +77,20 @@ type schedule struct {
evaluator eval.Evaluator
ruleStore store.RuleStore
instanceStore store.InstanceStore
adminConfigStore store.AdminConfigurationStore
orgStore store.OrgStore
ruleStore store.RuleStore
instanceStore store.InstanceStore
orgStore store.OrgStore
stateManager *state.Manager
appURL *url.URL
disableGrafanaFolder bool
multiOrgNotifier *notifier.MultiOrgAlertmanager
metrics *metrics.Scheduler
metrics *metrics.Scheduler
// Senders help us send alerts to external Alertmanagers.
adminConfigMtx sync.RWMutex
sendAlertsTo map[int64]ngmodels.AlertmanagersChoice
sendersCfgHash map[int64]string
senders map[int64]*sender.Sender
adminConfigPollInterval time.Duration
disabledOrgs map[int64]struct{}
minRuleInterval time.Duration
alertsSender AlertsSender
disabledOrgs map[int64]struct{}
minRuleInterval time.Duration
// schedulableAlertRules contains the alert rules that are considered for
// evaluation in the current tick. The evaluation of an alert rule in the
@ -117,18 +104,17 @@ type schedule struct {
// SchedulerCfg is the scheduler configuration.
type SchedulerCfg struct {
Cfg setting.UnifiedAlertingSettings
C clock.Clock
Logger log.Logger
EvalAppliedFunc func(ngmodels.AlertRuleKey, time.Time)
StopAppliedFunc func(ngmodels.AlertRuleKey)
Evaluator eval.Evaluator
RuleStore store.RuleStore
OrgStore store.OrgStore
InstanceStore store.InstanceStore
AdminConfigStore store.AdminConfigurationStore
MultiOrgNotifier *notifier.MultiOrgAlertmanager
Metrics *metrics.Scheduler
Cfg setting.UnifiedAlertingSettings
C clock.Clock
Logger log.Logger
EvalAppliedFunc func(ngmodels.AlertRuleKey, time.Time)
StopAppliedFunc func(ngmodels.AlertRuleKey)
Evaluator eval.Evaluator
RuleStore store.RuleStore
OrgStore store.OrgStore
InstanceStore store.InstanceStore
Metrics *metrics.Scheduler
AlertSender AlertsSender
}
// NewScheduler returns a new schedule.
@ -136,32 +122,27 @@ func NewScheduler(cfg SchedulerCfg, appURL *url.URL, stateManager *state.Manager
ticker := alerting.NewTicker(cfg.C, cfg.Cfg.BaseInterval, cfg.Metrics.Ticker)
sch := schedule{
registry: alertRuleInfoRegistry{alertRuleInfo: make(map[ngmodels.AlertRuleKey]*alertRuleInfo)},
maxAttempts: cfg.Cfg.MaxAttempts,
clock: cfg.C,
baseInterval: cfg.Cfg.BaseInterval,
log: cfg.Logger,
ticker: ticker,
evalAppliedFunc: cfg.EvalAppliedFunc,
stopAppliedFunc: cfg.StopAppliedFunc,
evaluator: cfg.Evaluator,
ruleStore: cfg.RuleStore,
instanceStore: cfg.InstanceStore,
orgStore: cfg.OrgStore,
adminConfigStore: cfg.AdminConfigStore,
multiOrgNotifier: cfg.MultiOrgNotifier,
metrics: cfg.Metrics,
appURL: appURL,
disableGrafanaFolder: cfg.Cfg.ReservedLabels.IsReservedLabelDisabled(ngmodels.FolderTitleLabel),
stateManager: stateManager,
sendAlertsTo: map[int64]ngmodels.AlertmanagersChoice{},
senders: map[int64]*sender.Sender{},
sendersCfgHash: map[int64]string{},
adminConfigPollInterval: cfg.Cfg.AdminConfigPollInterval,
disabledOrgs: cfg.Cfg.DisabledOrgs,
minRuleInterval: cfg.Cfg.MinInterval,
schedulableAlertRules: schedulableAlertRulesRegistry{rules: make(map[ngmodels.AlertRuleKey]*ngmodels.SchedulableAlertRule)},
bus: bus,
registry: alertRuleInfoRegistry{alertRuleInfo: make(map[ngmodels.AlertRuleKey]*alertRuleInfo)},
maxAttempts: cfg.Cfg.MaxAttempts,
clock: cfg.C,
baseInterval: cfg.Cfg.BaseInterval,
log: cfg.Logger,
ticker: ticker,
evalAppliedFunc: cfg.EvalAppliedFunc,
stopAppliedFunc: cfg.StopAppliedFunc,
evaluator: cfg.Evaluator,
ruleStore: cfg.RuleStore,
instanceStore: cfg.InstanceStore,
orgStore: cfg.OrgStore,
metrics: cfg.Metrics,
appURL: appURL,
disableGrafanaFolder: cfg.Cfg.ReservedLabels.IsReservedLabelDisabled(ngmodels.FolderTitleLabel),
stateManager: stateManager,
disabledOrgs: cfg.Cfg.DisabledOrgs,
minRuleInterval: cfg.Cfg.MinInterval,
schedulableAlertRules: schedulableAlertRulesRegistry{rules: make(map[ngmodels.AlertRuleKey]*ngmodels.SchedulableAlertRule)},
bus: bus,
alertsSender: cfg.AlertSender,
}
bus.AddEventListener(sch.folderUpdateHandler)
@ -170,158 +151,14 @@ func NewScheduler(cfg SchedulerCfg, appURL *url.URL, stateManager *state.Manager
}
func (sch *schedule) Run(ctx context.Context) error {
var wg sync.WaitGroup
wg.Add(2)
defer sch.ticker.Stop()
go func() {
defer wg.Done()
if err := sch.schedulePeriodic(ctx); err != nil {
sch.log.Error("failure while running the rule evaluation loop", "err", err)
}
}()
go func() {
defer wg.Done()
if err := sch.adminConfigSync(ctx); err != nil {
sch.log.Error("failure while running the admin configuration sync", "err", err)
}
}()
wg.Wait()
if err := sch.schedulePeriodic(ctx); err != nil {
sch.log.Error("failure while running the rule evaluation loop", "err", err)
}
return nil
}
// SyncAndApplyConfigFromDatabase looks for the admin configuration in the database
// and adjusts the sender(s) and alert handling mechanism accordingly.
func (sch *schedule) SyncAndApplyConfigFromDatabase() error {
sch.log.Debug("start of admin configuration sync")
cfgs, err := sch.adminConfigStore.GetAdminConfigurations()
if err != nil {
return err
}
sch.log.Debug("found admin configurations", "count", len(cfgs))
orgsFound := make(map[int64]struct{}, len(cfgs))
sch.adminConfigMtx.Lock()
for _, cfg := range cfgs {
_, isDisabledOrg := sch.disabledOrgs[cfg.OrgID]
if isDisabledOrg {
sch.log.Debug("skipping starting sender for disabled org", "org", cfg.OrgID)
continue
}
// Update the Alertmanagers choice for the organization.
sch.sendAlertsTo[cfg.OrgID] = cfg.SendAlertsTo
orgsFound[cfg.OrgID] = struct{}{} // keep track of the which senders we need to keep.
existing, ok := sch.senders[cfg.OrgID]
// We have no running sender and no Alertmanager(s) configured, no-op.
if !ok && len(cfg.Alertmanagers) == 0 {
sch.log.Debug("no external alertmanagers configured", "org", cfg.OrgID)
continue
}
// We have no running sender and alerts are handled internally, no-op.
if !ok && cfg.SendAlertsTo == ngmodels.InternalAlertmanager {
sch.log.Debug("alerts are handled internally", "org", cfg.OrgID)
continue
}
// We have a running sender but no Alertmanager(s) configured, shut it down.
if ok && len(cfg.Alertmanagers) == 0 {
sch.log.Debug("no external alertmanager(s) configured, sender will be stopped", "org", cfg.OrgID)
delete(orgsFound, cfg.OrgID)
continue
}
// We have a running sender, check if we need to apply a new config.
if ok {
if sch.sendersCfgHash[cfg.OrgID] == cfg.AsSHA256() {
sch.log.Debug("sender configuration is the same as the one running, no-op", "org", cfg.OrgID, "alertmanagers", cfg.Alertmanagers)
continue
}
sch.log.Debug("applying new configuration to sender", "org", cfg.OrgID, "alertmanagers", cfg.Alertmanagers)
err := existing.ApplyConfig(cfg)
if err != nil {
sch.log.Error("failed to apply configuration", "err", err, "org", cfg.OrgID)
continue
}
sch.sendersCfgHash[cfg.OrgID] = cfg.AsSHA256()
continue
}
// No sender and have Alertmanager(s) to send to - start a new one.
sch.log.Info("creating new sender for the external alertmanagers", "org", cfg.OrgID, "alertmanagers", cfg.Alertmanagers)
s, err := sender.New(sch.metrics)
if err != nil {
sch.log.Error("unable to start the sender", "err", err, "org", cfg.OrgID)
continue
}
sch.senders[cfg.OrgID] = s
s.Run()
err = s.ApplyConfig(cfg)
if err != nil {
sch.log.Error("failed to apply configuration", "err", err, "org", cfg.OrgID)
continue
}
sch.sendersCfgHash[cfg.OrgID] = cfg.AsSHA256()
}
sendersToStop := map[int64]*sender.Sender{}
for orgID, s := range sch.senders {
if _, exists := orgsFound[orgID]; !exists {
sendersToStop[orgID] = s
delete(sch.senders, orgID)
delete(sch.sendersCfgHash, orgID)
}
}
sch.adminConfigMtx.Unlock()
// We can now stop these senders w/o having to hold a lock.
for orgID, s := range sendersToStop {
sch.log.Info("stopping sender", "org", orgID)
s.Stop()
sch.log.Info("stopped sender", "org", orgID)
}
sch.log.Debug("finish of admin configuration sync")
return nil
}
// AlertmanagersFor returns all the discovered Alertmanager(s) for a particular organization.
func (sch *schedule) AlertmanagersFor(orgID int64) []*url.URL {
sch.adminConfigMtx.RLock()
defer sch.adminConfigMtx.RUnlock()
s, ok := sch.senders[orgID]
if !ok {
return []*url.URL{}
}
return s.Alertmanagers()
}
// DroppedAlertmanagersFor returns all the dropped Alertmanager(s) for a particular organization.
func (sch *schedule) DroppedAlertmanagersFor(orgID int64) []*url.URL {
sch.adminConfigMtx.RLock()
defer sch.adminConfigMtx.RUnlock()
s, ok := sch.senders[orgID]
if !ok {
return []*url.URL{}
}
return s.DroppedAlertmanagers()
}
// UpdateAlertRule looks for the active rule evaluation and commands it to update the rule
func (sch *schedule) UpdateAlertRule(key ngmodels.AlertRuleKey) {
ruleInfo, err := sch.registry.get(key)
@ -375,27 +212,6 @@ func (sch *schedule) DeleteAlertRule(key ngmodels.AlertRuleKey) {
sch.metrics.SchedulableAlertRulesHash.Set(float64(hashUIDs(alertRules)))
}
func (sch *schedule) adminConfigSync(ctx context.Context) error {
for {
select {
case <-time.After(sch.adminConfigPollInterval):
if err := sch.SyncAndApplyConfigFromDatabase(); err != nil {
sch.log.Error("unable to sync admin configuration", "err", err)
}
case <-ctx.Done():
// Stop sending alerts to all external Alertmanager(s).
sch.adminConfigMtx.Lock()
for orgID, s := range sch.senders {
delete(sch.senders, orgID) // delete before we stop to make sure we don't accept any more alerts.
s.Stop()
}
sch.adminConfigMtx.Unlock()
return nil
}
}
}
func (sch *schedule) schedulePeriodic(ctx context.Context) error {
dispatcherGroup, ctx := errgroup.WithContext(ctx)
for {
@ -521,7 +337,6 @@ func (sch *schedule) schedulePeriodic(ctx context.Context) error {
}
}
//nolint: gocyclo
func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertRuleKey, evalCh <-chan *evaluation, updateCh <-chan struct{}) error {
logger := sch.log.New("uid", key.UID, "org", key.OrgID)
logger.Debug("alert rule routine started")
@ -531,55 +346,11 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
evalDuration := sch.metrics.EvalDuration.WithLabelValues(orgID)
evalTotalFailures := sch.metrics.EvalFailures.WithLabelValues(orgID)
notify := func(alerts definitions.PostableAlerts, logger log.Logger) {
if len(alerts.PostableAlerts) == 0 {
logger.Debug("no alerts to put in the notifier or to send to external Alertmanager(s)")
return
}
// Send alerts to local notifier if they need to be handled internally
// or if no external AMs have been discovered yet.
var localNotifierExist, externalNotifierExist bool
if sch.sendAlertsTo[key.OrgID] == ngmodels.ExternalAlertmanagers && len(sch.AlertmanagersFor(key.OrgID)) > 0 {
logger.Debug("no alerts to put in the notifier")
} else {
logger.Debug("sending alerts to local notifier", "count", len(alerts.PostableAlerts), "alerts", alerts.PostableAlerts)
n, err := sch.multiOrgNotifier.AlertmanagerFor(key.OrgID)
if err == nil {
localNotifierExist = true
if err := n.PutAlerts(alerts); err != nil {
logger.Error("failed to put alerts in the local notifier", "count", len(alerts.PostableAlerts), "err", err)
}
} else {
if errors.Is(err, notifier.ErrNoAlertmanagerForOrg) {
logger.Debug("local notifier was not found")
} else {
logger.Error("local notifier is not available", "err", err)
}
}
}
// Send alerts to external Alertmanager(s) if we have a sender for this organization
// and alerts are not being handled just internally.
sch.adminConfigMtx.RLock()
defer sch.adminConfigMtx.RUnlock()
s, ok := sch.senders[key.OrgID]
if ok && sch.sendAlertsTo[key.OrgID] != ngmodels.InternalAlertmanager {
logger.Debug("sending alerts to external notifier", "count", len(alerts.PostableAlerts), "alerts", alerts.PostableAlerts)
s.SendAlerts(alerts)
externalNotifierExist = true
}
if !localNotifierExist && !externalNotifierExist {
logger.Error("no external or internal notifier - alerts not delivered!", "count", len(alerts.PostableAlerts))
}
}
clearState := func() {
states := sch.stateManager.GetStatesForRuleUID(key.OrgID, key.UID)
expiredAlerts := FromAlertsStateToStoppedAlert(states, sch.appURL, sch.clock)
sch.stateManager.RemoveByRuleUID(key.OrgID, key.UID)
notify(expiredAlerts, logger)
sch.alertsSender.Send(key, expiredAlerts)
}
updateRule := func(ctx context.Context, oldRule *ngmodels.AlertRule) (*ngmodels.AlertRule, error) {
@ -641,8 +412,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
processedStates := sch.stateManager.ProcessEvalResults(ctx, e.scheduledAt, r, results)
sch.saveAlertStates(ctx, processedStates)
alerts := FromAlertStateToPostableAlerts(processedStates, sch.stateManager, sch.appURL)
notify(alerts, logger)
sch.alertsSender.Send(key, alerts)
return nil
}

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.10.2. DO NOT EDIT.
// Code generated by mockery v2.10.0. DO NOT EDIT.
package schedule
@ -11,8 +11,6 @@ import (
models "github.com/grafana/grafana/pkg/services/ngalert/models"
time "time"
url "net/url"
)
// FakeScheduleService is an autogenerated mock type for the ScheduleService type
@ -20,43 +18,11 @@ type FakeScheduleService struct {
mock.Mock
}
// AlertmanagersFor provides a mock function with given fields: orgID
func (_m *FakeScheduleService) AlertmanagersFor(orgID int64) []*url.URL {
ret := _m.Called(orgID)
var r0 []*url.URL
if rf, ok := ret.Get(0).(func(int64) []*url.URL); ok {
r0 = rf(orgID)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*url.URL)
}
}
return r0
}
// DeleteAlertRule provides a mock function with given fields: key
func (_m *FakeScheduleService) DeleteAlertRule(key models.AlertRuleKey) {
_m.Called(key)
}
// DroppedAlertmanagersFor provides a mock function with given fields: orgID
func (_m *FakeScheduleService) DroppedAlertmanagersFor(orgID int64) []*url.URL {
ret := _m.Called(orgID)
var r0 []*url.URL
if rf, ok := ret.Get(0).(func(int64) []*url.URL); ok {
r0 = rf(orgID)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*url.URL)
}
}
return r0
}
// Run provides a mock function with given fields: _a0
func (_m *FakeScheduleService) Run(_a0 context.Context) error {
ret := _m.Called(_a0)

View File

@ -15,6 +15,7 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
busmock "github.com/grafana/grafana/pkg/bus/mock"
@ -153,6 +154,9 @@ func TestAlertingTicker(t *testing.T) {
},
}
notifier := &schedule.AlertsSenderMock{}
notifier.EXPECT().Send(mock.Anything, mock.Anything).Return()
schedCfg := schedule.SchedulerCfg{
Cfg: cfg,
C: mockedClock,
@ -166,6 +170,7 @@ func TestAlertingTicker(t *testing.T) {
InstanceStore: dbstore,
Logger: log.New("ngalert schedule test"),
Metrics: testMetrics.GetSchedulerMetrics(),
AlertSender: notifier,
}
st := state.NewManager(schedCfg.Logger, testMetrics.GetStateMetrics(), nil, dbstore, dbstore, &dashboards.FakeDashboardService{}, &image.NoopImageService{}, clock.NewMock())
appUrl := &url.URL{

View File

@ -53,19 +53,19 @@ func TestSendingToExternalAlertmanager(t *testing.T) {
cmd := store.UpdateAdminConfigurationCmd{AdminConfiguration: adminConfig}
require.NoError(t, fakeAdminConfigStore.UpdateAdminConfiguration(cmd))
sched, mockedClock := setupScheduler(t, fakeRuleStore, fakeInstanceStore, fakeAdminConfigStore, nil)
sched, mockedClock, alertsRouter := setupScheduler(t, fakeRuleStore, fakeInstanceStore, fakeAdminConfigStore, nil)
// Make sure we sync the configuration at least once before the evaluation happens to guarantee the sender is running
// when the first alert triggers.
require.NoError(t, sched.SyncAndApplyConfigFromDatabase())
sched.adminConfigMtx.Lock()
require.Equal(t, 1, len(sched.senders))
require.Equal(t, 1, len(sched.sendersCfgHash))
sched.adminConfigMtx.Unlock()
require.NoError(t, alertsRouter.SyncAndApplyConfigFromDatabase())
alertsRouter.AdminConfigMtx.Lock()
require.Equal(t, 1, len(alertsRouter.Senders))
require.Equal(t, 1, len(alertsRouter.SendersCfgHash))
alertsRouter.AdminConfigMtx.Unlock()
// Then, ensure we've discovered the Alertmanager.
require.Eventually(t, func() bool {
return len(sched.AlertmanagersFor(1)) == 1 && len(sched.DroppedAlertmanagersFor(1)) == 0
return len(alertsRouter.AlertmanagersFor(1)) == 1 && len(alertsRouter.DroppedAlertmanagersFor(1)) == 0
}, 10*time.Second, 200*time.Millisecond)
ctx, cancel := context.WithCancel(context.Background())
@ -91,15 +91,15 @@ func TestSendingToExternalAlertmanager(t *testing.T) {
require.NoError(t, fakeAdminConfigStore.UpdateAdminConfiguration(cmd))
// Again, make sure we sync and verify the senders.
require.NoError(t, sched.SyncAndApplyConfigFromDatabase())
sched.adminConfigMtx.Lock()
require.Equal(t, 0, len(sched.senders))
require.Equal(t, 0, len(sched.sendersCfgHash))
sched.adminConfigMtx.Unlock()
require.NoError(t, alertsRouter.SyncAndApplyConfigFromDatabase())
alertsRouter.AdminConfigMtx.Lock()
require.Equal(t, 0, len(alertsRouter.Senders))
require.Equal(t, 0, len(alertsRouter.SendersCfgHash))
alertsRouter.AdminConfigMtx.Unlock()
// Then, ensure we've dropped the Alertmanager.
require.Eventually(t, func() bool {
return len(sched.AlertmanagersFor(1)) == 0 && len(sched.DroppedAlertmanagersFor(1)) == 0
return len(alertsRouter.AlertmanagersFor(1)) == 0 && len(alertsRouter.DroppedAlertmanagersFor(1)) == 0
}, 10*time.Second, 200*time.Millisecond)
}
@ -115,19 +115,19 @@ func TestSendingToExternalAlertmanager_WithMultipleOrgs(t *testing.T) {
cmd := store.UpdateAdminConfigurationCmd{AdminConfiguration: adminConfig}
require.NoError(t, fakeAdminConfigStore.UpdateAdminConfiguration(cmd))
sched, mockedClock := setupScheduler(t, fakeRuleStore, fakeInstanceStore, fakeAdminConfigStore, nil)
sched, mockedClock, alertsRouter := setupScheduler(t, fakeRuleStore, fakeInstanceStore, fakeAdminConfigStore, nil)
// Make sure we sync the configuration at least once before the evaluation happens to guarantee the sender is running
// when the first alert triggers.
require.NoError(t, sched.SyncAndApplyConfigFromDatabase())
sched.adminConfigMtx.Lock()
require.Equal(t, 1, len(sched.senders))
require.Equal(t, 1, len(sched.sendersCfgHash))
sched.adminConfigMtx.Unlock()
require.NoError(t, alertsRouter.SyncAndApplyConfigFromDatabase())
alertsRouter.AdminConfigMtx.Lock()
require.Equal(t, 1, len(alertsRouter.Senders))
require.Equal(t, 1, len(alertsRouter.SendersCfgHash))
alertsRouter.AdminConfigMtx.Unlock()
// Then, ensure we've discovered the Alertmanager.
require.Eventuallyf(t, func() bool {
return len(sched.AlertmanagersFor(1)) == 1 && len(sched.DroppedAlertmanagersFor(1)) == 0
return len(alertsRouter.AlertmanagersFor(1)) == 1 && len(alertsRouter.DroppedAlertmanagersFor(1)) == 0
}, 10*time.Second, 200*time.Millisecond, "Alertmanager for org 1 was never discovered")
ctx, cancel := context.WithCancel(context.Background())
@ -145,15 +145,15 @@ func TestSendingToExternalAlertmanager_WithMultipleOrgs(t *testing.T) {
require.NoError(t, fakeAdminConfigStore.UpdateAdminConfiguration(cmd))
// If we sync again, new senders must have spawned.
require.NoError(t, sched.SyncAndApplyConfigFromDatabase())
sched.adminConfigMtx.Lock()
require.Equal(t, 2, len(sched.senders))
require.Equal(t, 2, len(sched.sendersCfgHash))
sched.adminConfigMtx.Unlock()
require.NoError(t, alertsRouter.SyncAndApplyConfigFromDatabase())
alertsRouter.AdminConfigMtx.Lock()
require.Equal(t, 2, len(alertsRouter.Senders))
require.Equal(t, 2, len(alertsRouter.SendersCfgHash))
alertsRouter.AdminConfigMtx.Unlock()
// Then, ensure we've discovered the Alertmanager for the new organization.
require.Eventuallyf(t, func() bool {
return len(sched.AlertmanagersFor(2)) == 1 && len(sched.DroppedAlertmanagersFor(2)) == 0
return len(alertsRouter.AlertmanagersFor(2)) == 1 && len(alertsRouter.DroppedAlertmanagersFor(2)) == 0
}, 10*time.Second, 200*time.Millisecond, "Alertmanager for org 2 was never discovered")
// With everything up and running, let's advance the time to make sure we get at least one alert iteration.
@ -180,23 +180,23 @@ func TestSendingToExternalAlertmanager_WithMultipleOrgs(t *testing.T) {
require.NoError(t, fakeAdminConfigStore.UpdateAdminConfiguration(cmd))
// Before we sync, let's grab the existing hash of this particular org.
sched.adminConfigMtx.Lock()
currentHash := sched.sendersCfgHash[2]
sched.adminConfigMtx.Unlock()
alertsRouter.AdminConfigMtx.Lock()
currentHash := alertsRouter.SendersCfgHash[2]
alertsRouter.AdminConfigMtx.Unlock()
// Now, sync again.
require.NoError(t, sched.SyncAndApplyConfigFromDatabase())
require.NoError(t, alertsRouter.SyncAndApplyConfigFromDatabase())
// The hash for org two should not be the same and we should still have two senders.
sched.adminConfigMtx.Lock()
require.NotEqual(t, sched.sendersCfgHash[2], currentHash)
require.Equal(t, 2, len(sched.senders))
require.Equal(t, 2, len(sched.sendersCfgHash))
sched.adminConfigMtx.Unlock()
alertsRouter.AdminConfigMtx.Lock()
require.NotEqual(t, alertsRouter.SendersCfgHash[2], currentHash)
require.Equal(t, 2, len(alertsRouter.Senders))
require.Equal(t, 2, len(alertsRouter.SendersCfgHash))
alertsRouter.AdminConfigMtx.Unlock()
// Wait for the discovery of the new Alertmanager for orgID = 2.
require.Eventuallyf(t, func() bool {
return len(sched.AlertmanagersFor(2)) == 2 && len(sched.DroppedAlertmanagersFor(2)) == 0
return len(alertsRouter.AlertmanagersFor(2)) == 2 && len(alertsRouter.DroppedAlertmanagersFor(2)) == 0
}, 10*time.Second, 200*time.Millisecond, "Alertmanager for org 2 was never re-discovered after fix")
// 3. Now, let's provide a configuration that fails for OrgID = 1.
@ -205,40 +205,40 @@ func TestSendingToExternalAlertmanager_WithMultipleOrgs(t *testing.T) {
require.NoError(t, fakeAdminConfigStore.UpdateAdminConfiguration(cmd))
// Before we sync, let's get the current config hash.
sched.adminConfigMtx.Lock()
currentHash = sched.sendersCfgHash[1]
sched.adminConfigMtx.Unlock()
alertsRouter.AdminConfigMtx.Lock()
currentHash = alertsRouter.SendersCfgHash[1]
alertsRouter.AdminConfigMtx.Unlock()
// Now, sync again.
require.NoError(t, sched.SyncAndApplyConfigFromDatabase())
require.NoError(t, alertsRouter.SyncAndApplyConfigFromDatabase())
// The old configuration should still be running.
sched.adminConfigMtx.Lock()
require.Equal(t, sched.sendersCfgHash[1], currentHash)
sched.adminConfigMtx.Unlock()
require.Equal(t, 1, len(sched.AlertmanagersFor(1)))
alertsRouter.AdminConfigMtx.Lock()
require.Equal(t, alertsRouter.SendersCfgHash[1], currentHash)
alertsRouter.AdminConfigMtx.Unlock()
require.Equal(t, 1, len(alertsRouter.AlertmanagersFor(1)))
// If we fix it - it should be applied.
adminConfig2 = &models.AdminConfiguration{OrgID: 1, Alertmanagers: []string{"notarealalertmanager:3030"}}
cmd = store.UpdateAdminConfigurationCmd{AdminConfiguration: adminConfig2}
require.NoError(t, fakeAdminConfigStore.UpdateAdminConfiguration(cmd))
require.NoError(t, sched.SyncAndApplyConfigFromDatabase())
sched.adminConfigMtx.Lock()
require.NotEqual(t, sched.sendersCfgHash[1], currentHash)
sched.adminConfigMtx.Unlock()
require.NoError(t, alertsRouter.SyncAndApplyConfigFromDatabase())
alertsRouter.AdminConfigMtx.Lock()
require.NotEqual(t, alertsRouter.SendersCfgHash[1], currentHash)
alertsRouter.AdminConfigMtx.Unlock()
// Finally, remove everything.
require.NoError(t, fakeAdminConfigStore.DeleteAdminConfiguration(1))
require.NoError(t, fakeAdminConfigStore.DeleteAdminConfiguration(2))
require.NoError(t, sched.SyncAndApplyConfigFromDatabase())
sched.adminConfigMtx.Lock()
require.Equal(t, 0, len(sched.senders))
require.Equal(t, 0, len(sched.sendersCfgHash))
sched.adminConfigMtx.Unlock()
require.NoError(t, alertsRouter.SyncAndApplyConfigFromDatabase())
alertsRouter.AdminConfigMtx.Lock()
require.Equal(t, 0, len(alertsRouter.Senders))
require.Equal(t, 0, len(alertsRouter.SendersCfgHash))
alertsRouter.AdminConfigMtx.Unlock()
require.Eventuallyf(t, func() bool {
NoAlertmanagerOrgOne := len(sched.AlertmanagersFor(1)) == 0 && len(sched.DroppedAlertmanagersFor(1)) == 0
NoAlertmanagerOrgTwo := len(sched.AlertmanagersFor(2)) == 0 && len(sched.DroppedAlertmanagersFor(2)) == 0
NoAlertmanagerOrgOne := len(alertsRouter.AlertmanagersFor(1)) == 0 && len(alertsRouter.DroppedAlertmanagersFor(1)) == 0
NoAlertmanagerOrgTwo := len(alertsRouter.AlertmanagersFor(2)) == 0 && len(alertsRouter.DroppedAlertmanagersFor(2)) == 0
return NoAlertmanagerOrgOne && NoAlertmanagerOrgTwo
}, 10*time.Second, 200*time.Millisecond, "Alertmanager for org 1 and 2 were never removed")
@ -260,21 +260,21 @@ func TestChangingAlertmanagersChoice(t *testing.T) {
cmd := store.UpdateAdminConfigurationCmd{AdminConfiguration: adminConfig}
require.NoError(t, fakeAdminConfigStore.UpdateAdminConfiguration(cmd))
sched, mockedClock := setupScheduler(t, fakeRuleStore, fakeInstanceStore, fakeAdminConfigStore, nil)
sched, mockedClock, alertsRouter := setupScheduler(t, fakeRuleStore, fakeInstanceStore, fakeAdminConfigStore, nil)
// Make sure we sync the configuration at least once before the evaluation happens to guarantee the sender is running
// when the first alert triggers.
require.NoError(t, sched.SyncAndApplyConfigFromDatabase())
sched.adminConfigMtx.Lock()
require.Equal(t, 1, len(sched.senders))
require.Equal(t, 1, len(sched.sendersCfgHash))
sched.adminConfigMtx.Unlock()
require.NoError(t, alertsRouter.SyncAndApplyConfigFromDatabase())
alertsRouter.AdminConfigMtx.Lock()
require.Equal(t, 1, len(alertsRouter.Senders))
require.Equal(t, 1, len(alertsRouter.SendersCfgHash))
alertsRouter.AdminConfigMtx.Unlock()
// Then, ensure we've discovered the Alertmanager and the Alertmanagers choice is correct.
require.Eventually(t, func() bool {
return len(sched.AlertmanagersFor(1)) == 1 &&
len(sched.DroppedAlertmanagersFor(1)) == 0 &&
sched.sendAlertsTo[1] == adminConfig.SendAlertsTo
return len(alertsRouter.AlertmanagersFor(1)) == 1 &&
len(alertsRouter.DroppedAlertmanagersFor(1)) == 0 &&
alertsRouter.SendAlertsTo[1] == adminConfig.SendAlertsTo
}, 10*time.Second, 200*time.Millisecond)
ctx, cancel := context.WithCancel(context.Background())
@ -290,10 +290,10 @@ func TestChangingAlertmanagersChoice(t *testing.T) {
mockedClock.Add(2 * time.Second)
// Eventually, our Alertmanager should have received alerts.
require.Eventually(t, func() bool {
require.Eventuallyf(t, func() bool {
return fakeAM.AlertsCount() >= 1 &&
fakeAM.AlertNamesCompare([]string{alertRule.Title})
}, 10*time.Second, 200*time.Millisecond)
}, 10*time.Second, 200*time.Millisecond, "expected at least one alert to be received and the title of the first one to be '%s'. but got [%d]: [%v]", alertRule.Title, fakeAM.AlertsCount(), fakeAM.Alerts())
// Now, let's change the Alertmanagers choice to send only to the external Alertmanager.
adminConfig.SendAlertsTo = models.ExternalAlertmanagers
@ -301,17 +301,17 @@ func TestChangingAlertmanagersChoice(t *testing.T) {
require.NoError(t, fakeAdminConfigStore.UpdateAdminConfiguration(cmd))
// Again, make sure we sync and verify the senders.
require.NoError(t, sched.SyncAndApplyConfigFromDatabase())
sched.adminConfigMtx.Lock()
require.Equal(t, 1, len(sched.senders))
require.Equal(t, 1, len(sched.sendersCfgHash))
sched.adminConfigMtx.Unlock()
require.NoError(t, alertsRouter.SyncAndApplyConfigFromDatabase())
alertsRouter.AdminConfigMtx.Lock()
require.Equal(t, 1, len(alertsRouter.Senders))
require.Equal(t, 1, len(alertsRouter.SendersCfgHash))
alertsRouter.AdminConfigMtx.Unlock()
// Then, ensure we still have the Alertmanager but the Alertmanagers choice has changed.
require.Eventually(t, func() bool {
return len(sched.AlertmanagersFor(1)) == 1 &&
len(sched.DroppedAlertmanagersFor(1)) == 0 &&
sched.sendAlertsTo[1] == adminConfig.SendAlertsTo
return len(alertsRouter.AlertmanagersFor(1)) == 1 &&
len(alertsRouter.DroppedAlertmanagersFor(1)) == 0 &&
alertsRouter.SendAlertsTo[1] == adminConfig.SendAlertsTo
}, 10*time.Second, 200*time.Millisecond)
// Finally, let's change the Alertmanagers choice to send only to the internal Alertmanager.
@ -321,34 +321,34 @@ func TestChangingAlertmanagersChoice(t *testing.T) {
// Again, make sure we sync and verify the senders.
// Senders should be running even though alerts are being handled externally.
require.NoError(t, sched.SyncAndApplyConfigFromDatabase())
sched.adminConfigMtx.Lock()
require.Equal(t, 1, len(sched.senders))
require.Equal(t, 1, len(sched.sendersCfgHash))
sched.adminConfigMtx.Unlock()
require.NoError(t, alertsRouter.SyncAndApplyConfigFromDatabase())
alertsRouter.AdminConfigMtx.Lock()
require.Equal(t, 1, len(alertsRouter.Senders))
require.Equal(t, 1, len(alertsRouter.SendersCfgHash))
alertsRouter.AdminConfigMtx.Unlock()
// Then, ensure the Alertmanager is still listed and the Alertmanagers choice has changed.
require.Eventually(t, func() bool {
return len(sched.AlertmanagersFor(1)) == 1 &&
len(sched.DroppedAlertmanagersFor(1)) == 0 &&
sched.sendAlertsTo[1] == adminConfig.SendAlertsTo
return len(alertsRouter.AlertmanagersFor(1)) == 1 &&
len(alertsRouter.DroppedAlertmanagersFor(1)) == 0 &&
alertsRouter.SendAlertsTo[1] == adminConfig.SendAlertsTo
}, 10*time.Second, 200*time.Millisecond)
}
func TestSchedule_ruleRoutine(t *testing.T) {
createSchedule := func(
evalAppliedChan chan time.Time,
) (*schedule, *store.FakeRuleStore, *store.FakeInstanceStore, *store.FakeAdminConfigStore, prometheus.Gatherer) {
) (*schedule, *store.FakeRuleStore, *store.FakeInstanceStore, *store.FakeAdminConfigStore, prometheus.Gatherer, *sender.AlertsRouter) {
ruleStore := store.NewFakeRuleStore(t)
instanceStore := &store.FakeInstanceStore{}
adminConfigStore := store.NewFakeAdminConfigStore(t)
registry := prometheus.NewPedanticRegistry()
sch, _ := setupScheduler(t, ruleStore, instanceStore, adminConfigStore, registry)
sch, _, alertsRouter := setupScheduler(t, ruleStore, instanceStore, adminConfigStore, registry)
sch.evalAppliedFunc = func(key models.AlertRuleKey, t time.Time) {
evalAppliedChan <- t
}
return sch, ruleStore, instanceStore, adminConfigStore, registry
return sch, ruleStore, instanceStore, adminConfigStore, registry, alertsRouter
}
// normal states do not include NoData and Error because currently it is not possible to perform any sensible test
@ -364,7 +364,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
t.Run(fmt.Sprintf("when rule evaluation happens (evaluation state %s)", evalState), func(t *testing.T) {
evalChan := make(chan *evaluation)
evalAppliedChan := make(chan time.Time)
sch, ruleStore, instanceStore, _, reg := createSchedule(evalAppliedChan)
sch, ruleStore, instanceStore, _, reg, _ := createSchedule(evalAppliedChan)
rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), evalState)
@ -491,7 +491,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
t.Run("should exit", func(t *testing.T) {
t.Run("when context is cancelled", func(t *testing.T) {
stoppedChan := make(chan error)
sch, _, _, _, _ := createSchedule(make(chan time.Time))
sch, _, _, _, _, _ := createSchedule(make(chan time.Time))
ctx, cancel := context.WithCancel(context.Background())
go func() {
@ -510,7 +510,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
evalAppliedChan := make(chan time.Time)
ctx := context.Background()
sch, ruleStore, _, _, _ := createSchedule(evalAppliedChan)
sch, ruleStore, _, _, _, _ := createSchedule(evalAppliedChan)
rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), randomNormalState())
@ -562,7 +562,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
evalChan := make(chan *evaluation)
evalAppliedChan := make(chan time.Time)
sch, ruleStore, _, _, _ := createSchedule(evalAppliedChan)
sch, ruleStore, _, _, _, _ := createSchedule(evalAppliedChan)
rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), randomNormalState())
@ -614,7 +614,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
evalAppliedChan := make(chan time.Time)
updateChan := make(chan struct{})
sch, ruleStore, _, _, _ := createSchedule(evalAppliedChan)
sch, ruleStore, _, _, _, _ := createSchedule(evalAppliedChan)
rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), eval.Alerting) // we want the alert to fire
@ -657,7 +657,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
evalAppliedChan := make(chan time.Time)
updateChan := make(chan struct{})
sch, ruleStore, _, _, _ := createSchedule(evalAppliedChan)
sch, ruleStore, _, _, _, _ := createSchedule(evalAppliedChan)
sch.maxAttempts = rand.Int63n(4) + 1
rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), randomNormalState())
@ -693,7 +693,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
defer fakeAM.Close()
orgID := rand.Int63()
s, err := sender.New(nil)
s, err := sender.New()
require.NoError(t, err)
adminConfig := &models.AdminConfiguration{OrgID: orgID, Alertmanagers: []string{fakeAM.Server.URL}}
err = s.ApplyConfig(adminConfig)
@ -710,8 +710,8 @@ func TestSchedule_ruleRoutine(t *testing.T) {
updateChan := make(chan struct{})
ctx := context.Background()
sch, ruleStore, _, _, _ := createSchedule(evalAppliedChan)
sch.senders[orgID] = s
sch, ruleStore, _, _, _, alertsRouter := createSchedule(evalAppliedChan)
alertsRouter.Senders[orgID] = s
var rulePtr = CreateTestAlertRule(t, ruleStore, 10, orgID, eval.Alerting) // we want the alert to fire
var rule = *rulePtr
@ -733,8 +733,15 @@ func TestSchedule_ruleRoutine(t *testing.T) {
}
sch.stateManager.Put(states)
states = sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
expectedToBeSent := FromAlertsStateToStoppedAlert(states, sch.appURL, sch.clock)
require.NotEmptyf(t, expectedToBeSent.PostableAlerts, "State manger was expected to return at least one state that can be expired")
expectedToBeSent := 0
for _, s := range states {
if s.State == eval.Normal || s.State == eval.Pending {
continue
}
expectedToBeSent++
}
require.Greaterf(t, expectedToBeSent, 0, "State manger was expected to return at least one state that can be expired")
go func() {
ctx, cancel := context.WithCancel(context.Background())
@ -769,8 +776,8 @@ func TestSchedule_ruleRoutine(t *testing.T) {
var count int
require.Eventuallyf(t, func() bool {
count = fakeAM.AlertsCount()
return count == len(expectedToBeSent.PostableAlerts)
}, 20*time.Second, 200*time.Millisecond, "Alertmanager was expected to receive %d alerts, but received only %d", len(expectedToBeSent.PostableAlerts), count)
return count == expectedToBeSent
}, 20*time.Second, 200*time.Millisecond, "Alertmanager was expected to receive %d alerts, but received only %d", expectedToBeSent, count)
for _, alert := range fakeAM.Alerts() {
require.Equalf(t, sch.clock.Now().UTC(), time.Time(alert.EndsAt).UTC(), "Alert received by Alertmanager should be expired as of now")
@ -799,7 +806,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
defer fakeAM.Close()
orgID := rand.Int63()
s, err := sender.New(nil)
s, err := sender.New()
require.NoError(t, err)
adminConfig := &models.AdminConfiguration{OrgID: orgID, Alertmanagers: []string{fakeAM.Server.URL}}
err = s.ApplyConfig(adminConfig)
@ -814,8 +821,8 @@ func TestSchedule_ruleRoutine(t *testing.T) {
evalChan := make(chan *evaluation)
evalAppliedChan := make(chan time.Time)
sch, ruleStore, _, _, _ := createSchedule(evalAppliedChan)
sch.senders[orgID] = s
sch, ruleStore, _, _, _, alertsRouter := createSchedule(evalAppliedChan)
alertsRouter.Senders[orgID] = s
// eval.Alerting makes state manager to create notifications for alertmanagers
rule := CreateTestAlertRule(t, ruleStore, 10, orgID, eval.Alerting)
@ -925,11 +932,11 @@ func setupSchedulerWithFakeStores(t *testing.T) *schedule {
ruleStore := store.NewFakeRuleStore(t)
instanceStore := &store.FakeInstanceStore{}
adminConfigStore := store.NewFakeAdminConfigStore(t)
sch, _ := setupScheduler(t, ruleStore, instanceStore, adminConfigStore, nil)
sch, _, _ := setupScheduler(t, ruleStore, instanceStore, adminConfigStore, nil)
return sch
}
func setupScheduler(t *testing.T, rs store.RuleStore, is store.InstanceStore, acs store.AdminConfigurationStore, registry *prometheus.Registry) (*schedule, *clock.Mock) {
func setupScheduler(t *testing.T, rs store.RuleStore, is store.InstanceStore, acs store.AdminConfigurationStore, registry *prometheus.Registry) (*schedule, *clock.Mock, *sender.AlertsRouter) {
t.Helper()
fakeAnnoRepo := store.NewFakeAnnotationsRepo()
@ -945,6 +952,13 @@ func setupScheduler(t *testing.T, rs store.RuleStore, is store.InstanceStore, ac
moa, err := notifier.NewMultiOrgAlertmanager(&setting.Cfg{}, &notifier.FakeConfigStore{}, &notifier.FakeOrgStore{}, &notifier.FakeKVStore{}, provisioning.NewFakeProvisioningStore(), decryptFn, m.GetMultiOrgAlertmanagerMetrics(), nil, log.New("testlogger"), secretsService)
require.NoError(t, err)
appUrl := &url.URL{
Scheme: "http",
Host: "localhost",
}
alertsRouter := sender.NewAlertsRouter(moa, acs, mockedClock, appUrl, map[int64]struct{}{}, 10*time.Minute) // do not poll in unit tests.
cfg := setting.UnifiedAlertingSettings{
BaseInterval: time.Second,
MaxAttempts: 1,
@ -952,22 +966,17 @@ func setupScheduler(t *testing.T, rs store.RuleStore, is store.InstanceStore, ac
}
schedCfg := SchedulerCfg{
Cfg: cfg,
C: mockedClock,
Evaluator: eval.NewEvaluator(&setting.Cfg{ExpressionsEnabled: true}, logger, nil, secretsService, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil)),
RuleStore: rs,
InstanceStore: is,
AdminConfigStore: acs,
MultiOrgNotifier: moa,
Logger: logger,
Metrics: m.GetSchedulerMetrics(),
Cfg: cfg,
C: mockedClock,
Evaluator: eval.NewEvaluator(&setting.Cfg{ExpressionsEnabled: true}, logger, nil, secretsService, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil)),
RuleStore: rs,
InstanceStore: is,
Logger: logger,
Metrics: m.GetSchedulerMetrics(),
AlertSender: alertsRouter,
}
st := state.NewManager(schedCfg.Logger, m.GetStateMetrics(), nil, rs, is, &dashboards.FakeDashboardService{}, &image.NoopImageService{}, clock.NewMock())
appUrl := &url.URL{
Scheme: "http",
Host: "localhost",
}
return NewScheduler(schedCfg, appUrl, st, busmock.New()), mockedClock
return NewScheduler(schedCfg, appUrl, st, busmock.New()), mockedClock, alertsRouter
}
// createTestAlertRule creates a dummy alert definition to be used by the tests.

View File

@ -0,0 +1,255 @@
package sender
import (
"context"
"errors"
"net/url"
"sync"
"time"
"github.com/benbjohnson/clock"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/notifier"
"github.com/grafana/grafana/pkg/services/ngalert/store"
)
// AlertsRouter handles alerts generated during alert rule evaluation.
// Based on rule's orgID and the configuration for that organization,
// it determines whether an alert needs to be sent to an external Alertmanager and\or internal notifier.Alertmanager
//
// After creating a AlertsRouter, you must call Run to keep the AlertsRouter's
// state synchronized with the alerting configuration.
type AlertsRouter struct {
logger log.Logger
clock clock.Clock
adminConfigStore store.AdminConfigurationStore
// Senders help us send alerts to external Alertmanagers.
AdminConfigMtx sync.RWMutex
SendAlertsTo map[int64]models.AlertmanagersChoice
Senders map[int64]*Sender
SendersCfgHash map[int64]string
MultiOrgNotifier *notifier.MultiOrgAlertmanager
appURL *url.URL
disabledOrgs map[int64]struct{}
adminConfigPollInterval time.Duration
}
func NewAlertsRouter(multiOrgNotifier *notifier.MultiOrgAlertmanager, store store.AdminConfigurationStore, clk clock.Clock, appURL *url.URL, disabledOrgs map[int64]struct{}, configPollInterval time.Duration) *AlertsRouter {
d := &AlertsRouter{
logger: log.New("alerts-router"),
clock: clk,
adminConfigStore: store,
AdminConfigMtx: sync.RWMutex{},
Senders: map[int64]*Sender{},
SendersCfgHash: map[int64]string{},
SendAlertsTo: map[int64]models.AlertmanagersChoice{},
MultiOrgNotifier: multiOrgNotifier,
appURL: appURL,
disabledOrgs: disabledOrgs,
adminConfigPollInterval: configPollInterval,
}
return d
}
// SyncAndApplyConfigFromDatabase looks for the admin configuration in the database
// and adjusts the sender(s) and alert handling mechanism accordingly.
func (d *AlertsRouter) SyncAndApplyConfigFromDatabase() error {
d.logger.Debug("start of admin configuration sync")
cfgs, err := d.adminConfigStore.GetAdminConfigurations()
if err != nil {
return err
}
d.logger.Debug("found admin configurations", "count", len(cfgs))
orgsFound := make(map[int64]struct{}, len(cfgs))
d.AdminConfigMtx.Lock()
for _, cfg := range cfgs {
_, isDisabledOrg := d.disabledOrgs[cfg.OrgID]
if isDisabledOrg {
d.logger.Debug("skipping starting sender for disabled org", "org", cfg.OrgID)
continue
}
// Update the Alertmanagers choice for the organization.
d.SendAlertsTo[cfg.OrgID] = cfg.SendAlertsTo
orgsFound[cfg.OrgID] = struct{}{} // keep track of the which senders we need to keep.
existing, ok := d.Senders[cfg.OrgID]
// We have no running sender and no Alertmanager(s) configured, no-op.
if !ok && len(cfg.Alertmanagers) == 0 {
d.logger.Debug("no external alertmanagers configured", "org", cfg.OrgID)
continue
}
// We have no running sender and alerts are handled internally, no-op.
if !ok && cfg.SendAlertsTo == models.InternalAlertmanager {
d.logger.Debug("alerts are handled internally", "org", cfg.OrgID)
continue
}
// We have a running sender but no Alertmanager(s) configured, shut it down.
if ok && len(cfg.Alertmanagers) == 0 {
d.logger.Debug("no external alertmanager(s) configured, sender will be stopped", "org", cfg.OrgID)
delete(orgsFound, cfg.OrgID)
continue
}
// We have a running sender, check if we need to apply a new config.
if ok {
if d.SendersCfgHash[cfg.OrgID] == cfg.AsSHA256() {
d.logger.Debug("sender configuration is the same as the one running, no-op", "org", cfg.OrgID, "alertmanagers", cfg.Alertmanagers)
continue
}
d.logger.Debug("applying new configuration to sender", "org", cfg.OrgID, "alertmanagers", cfg.Alertmanagers)
err := existing.ApplyConfig(cfg)
if err != nil {
d.logger.Error("failed to apply configuration", "err", err, "org", cfg.OrgID)
continue
}
d.SendersCfgHash[cfg.OrgID] = cfg.AsSHA256()
continue
}
// No sender and have Alertmanager(s) to send to - start a new one.
d.logger.Info("creating new sender for the external alertmanagers", "org", cfg.OrgID, "alertmanagers", cfg.Alertmanagers)
s, err := New()
if err != nil {
d.logger.Error("unable to start the sender", "err", err, "org", cfg.OrgID)
continue
}
d.Senders[cfg.OrgID] = s
s.Run()
err = s.ApplyConfig(cfg)
if err != nil {
d.logger.Error("failed to apply configuration", "err", err, "org", cfg.OrgID)
continue
}
d.SendersCfgHash[cfg.OrgID] = cfg.AsSHA256()
}
sendersToStop := map[int64]*Sender{}
for orgID, s := range d.Senders {
if _, exists := orgsFound[orgID]; !exists {
sendersToStop[orgID] = s
delete(d.Senders, orgID)
delete(d.SendersCfgHash, orgID)
}
}
d.AdminConfigMtx.Unlock()
// We can now stop these senders w/o having to hold a lock.
for orgID, s := range sendersToStop {
d.logger.Info("stopping sender", "org", orgID)
s.Stop()
d.logger.Info("stopped sender", "org", orgID)
}
d.logger.Debug("finish of admin configuration sync")
return nil
}
func (d *AlertsRouter) Send(key models.AlertRuleKey, alerts definitions.PostableAlerts) {
logger := d.logger.New("rule_uid", key.UID, "org", key.OrgID)
if len(alerts.PostableAlerts) == 0 {
logger.Debug("no alerts to notify about")
return
}
// Send alerts to local notifier if they need to be handled internally
// or if no external AMs have been discovered yet.
var localNotifierExist, externalNotifierExist bool
if d.SendAlertsTo[key.OrgID] == models.ExternalAlertmanagers && len(d.AlertmanagersFor(key.OrgID)) > 0 {
logger.Debug("no alerts to put in the notifier")
} else {
logger.Debug("sending alerts to local notifier", "count", len(alerts.PostableAlerts), "alerts", alerts.PostableAlerts)
n, err := d.MultiOrgNotifier.AlertmanagerFor(key.OrgID)
if err == nil {
localNotifierExist = true
if err := n.PutAlerts(alerts); err != nil {
logger.Error("failed to put alerts in the local notifier", "count", len(alerts.PostableAlerts), "err", err)
}
} else {
if errors.Is(err, notifier.ErrNoAlertmanagerForOrg) {
logger.Debug("local notifier was not found")
} else {
logger.Error("local notifier is not available", "err", err)
}
}
}
// Send alerts to external Alertmanager(s) if we have a sender for this organization
// and alerts are not being handled just internally.
d.AdminConfigMtx.RLock()
defer d.AdminConfigMtx.RUnlock()
s, ok := d.Senders[key.OrgID]
if ok && d.SendAlertsTo[key.OrgID] != models.InternalAlertmanager {
logger.Debug("sending alerts to external notifier", "count", len(alerts.PostableAlerts), "alerts", alerts.PostableAlerts)
s.SendAlerts(alerts)
externalNotifierExist = true
}
if !localNotifierExist && !externalNotifierExist {
logger.Error("no external or internal notifier - [%d] alerts not delivered", len(alerts.PostableAlerts))
}
}
// AlertmanagersFor returns all the discovered Alertmanager(s) for a particular organization.
func (d *AlertsRouter) AlertmanagersFor(orgID int64) []*url.URL {
d.AdminConfigMtx.RLock()
defer d.AdminConfigMtx.RUnlock()
s, ok := d.Senders[orgID]
if !ok {
return []*url.URL{}
}
return s.Alertmanagers()
}
// DroppedAlertmanagersFor returns all the dropped Alertmanager(s) for a particular organization.
func (d *AlertsRouter) DroppedAlertmanagersFor(orgID int64) []*url.URL {
d.AdminConfigMtx.RLock()
defer d.AdminConfigMtx.RUnlock()
s, ok := d.Senders[orgID]
if !ok {
return []*url.URL{}
}
return s.DroppedAlertmanagers()
}
// Run starts regular updates of the configuration.
func (d *AlertsRouter) Run(ctx context.Context) error {
for {
select {
case <-time.After(d.adminConfigPollInterval):
if err := d.SyncAndApplyConfigFromDatabase(); err != nil {
d.logger.Error("unable to sync admin configuration", "err", err)
}
case <-ctx.Done():
// Stop sending alerts to all external Alertmanager(s).
d.AdminConfigMtx.Lock()
for orgID, s := range d.Senders {
delete(d.Senders, orgID) // delete before we stop to make sure we don't accept any more alerts.
s.Stop()
}
d.AdminConfigMtx.Unlock()
return nil
}
}
}

View File

@ -9,7 +9,6 @@ import (
"github.com/grafana/grafana/pkg/infra/log"
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/prometheus/alertmanager/api/v2/models"
@ -38,7 +37,7 @@ type Sender struct {
sdManager *discovery.Manager
}
func New(_ *metrics.Scheduler) (*Sender, error) {
func New() (*Sender, error) {
l := log.New("sender")
sdCtx, sdCancel := context.WithCancel(context.Background())
s := &Sender{

View File

@ -342,7 +342,7 @@ func (st *Manager) annotateState(ctx context.Context, alertRule *ngModels.AlertR
panelId, err := strconv.ParseInt(panelUid, 10, 64)
if err != nil {
st.log.Error("error parsing panelUID for alert annotation", "panelUID", panelUid, "alertRuleUID", alertRule.UID, "error", err.Error())
st.log.Error("error parsing panelUID for alert annotation", "panelUID", panelUid, "alertRuleUID", alertRule.UID, "err", err.Error())
return
}