mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Alerting: Add time-based convergence in remote secondary mode (#78809)
* Alerting: Add a sync interval for ApplyConfig in remote secondary mode * add routine to sync states and configs * pass a cancellable context to syncRoutine(), remove tests for ApplyConfig, cache last config in memory * extract logic to update config and state in the remote Alertmanager * get latest config from the database * avoid using separate goroutine for updating state and config * clean up PR * refactor, comments, tests * update tests * add config struct for remote secondary forked Alertmanager * use errgroups for sync operations * use waitgroup instead of errgroup * remove helper method to sync AMs * check for errors instead of bool syncErr
This commit is contained in:
@@ -2,6 +2,9 @@ package remote
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
|
||||
@@ -9,28 +12,92 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/notifier"
|
||||
)
|
||||
|
||||
//go:generate mockery --name remoteAlertmanager --structname RemoteAlertmanagerMock --with-expecter --output mock --outpkg alertmanager_mock
|
||||
type remoteAlertmanager interface {
|
||||
notifier.Alertmanager
|
||||
CompareAndSendConfiguration(context.Context, *models.AlertConfiguration) error
|
||||
CompareAndSendState(context.Context) error
|
||||
}
|
||||
|
||||
type RemoteSecondaryForkedAlertmanager struct {
|
||||
log log.Logger
|
||||
|
||||
internal notifier.Alertmanager
|
||||
remote notifier.Alertmanager
|
||||
remote remoteAlertmanager
|
||||
|
||||
lastSync time.Time
|
||||
syncInterval time.Duration
|
||||
}
|
||||
|
||||
func NewRemoteSecondaryForkedAlertmanager(l log.Logger, internal, remote notifier.Alertmanager) *RemoteSecondaryForkedAlertmanager {
|
||||
return &RemoteSecondaryForkedAlertmanager{
|
||||
log: l,
|
||||
internal: internal,
|
||||
remote: remote,
|
||||
type RemoteSecondaryConfig struct {
|
||||
// SyncInterval determines how often we should attempt to synchronize
|
||||
// state and configuration on the external Alertmanager.
|
||||
SyncInterval time.Duration
|
||||
Logger log.Logger
|
||||
}
|
||||
|
||||
func (c *RemoteSecondaryConfig) Validate() error {
|
||||
if c.Logger == nil {
|
||||
return fmt.Errorf("logger cannot be nil")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewRemoteSecondaryForkedAlertmanager(cfg RemoteSecondaryConfig, internal notifier.Alertmanager, remote remoteAlertmanager) (*RemoteSecondaryForkedAlertmanager, error) {
|
||||
if err := cfg.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &RemoteSecondaryForkedAlertmanager{
|
||||
log: cfg.Logger,
|
||||
internal: internal,
|
||||
remote: remote,
|
||||
syncInterval: cfg.SyncInterval,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ApplyConfig will only log errors for the remote Alertmanager and ensure we delegate the call to the internal Alertmanager.
|
||||
// We don't care about errors in the remote Alertmanager in remote secondary mode.
|
||||
func (fam *RemoteSecondaryForkedAlertmanager) ApplyConfig(ctx context.Context, config *models.AlertConfiguration) error {
|
||||
if err := fam.remote.ApplyConfig(ctx, config); err != nil {
|
||||
fam.log.Error("Error applying config to the remote Alertmanager", "err", err)
|
||||
}
|
||||
return fam.internal.ApplyConfig(ctx, config)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
// Figure out if we need to sync the external Alertmanager in another goroutine.
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
// If the Alertmanager has not been marked as "ready" yet, delegate the call to the remote Alertmanager.
|
||||
// This will perform a readiness check and sync the Alertmanagers.
|
||||
if !fam.remote.Ready() {
|
||||
if err := fam.remote.ApplyConfig(ctx, config); err != nil {
|
||||
fam.log.Error("Error applying config to the remote Alertmanager", "err", err)
|
||||
return
|
||||
}
|
||||
fam.lastSync = time.Now()
|
||||
return
|
||||
}
|
||||
|
||||
// If the Alertmanager was marked as ready but the sync interval has elapsed, sync the Alertmanagers.
|
||||
if time.Since(fam.lastSync) >= fam.syncInterval {
|
||||
fam.log.Debug("Syncing configuration and state with the remote Alertmanager", "lastSync", fam.lastSync)
|
||||
cfgErr := fam.remote.CompareAndSendConfiguration(ctx, config)
|
||||
if cfgErr != nil {
|
||||
fam.log.Error("Unable to upload the configuration to the remote Alertmanager", "err", cfgErr)
|
||||
}
|
||||
|
||||
stateErr := fam.remote.CompareAndSendState(ctx)
|
||||
if stateErr != nil {
|
||||
fam.log.Error("Unable to upload the state to the remote Alertmanager", "err", stateErr)
|
||||
}
|
||||
fam.log.Debug("Finished syncing configuration and state with the remote Alertmanager")
|
||||
|
||||
if cfgErr == nil && stateErr == nil {
|
||||
fam.lastSync = time.Now()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Call ApplyConfig on the internal Alertmanager - we only care about errors for this one.
|
||||
err := fam.internal.ApplyConfig(ctx, config)
|
||||
wg.Wait()
|
||||
return err
|
||||
}
|
||||
|
||||
// SaveAndApplyConfig is only called on the internal Alertmanager when running in remote secondary mode.
|
||||
@@ -95,6 +162,7 @@ func (fam *RemoteSecondaryForkedAlertmanager) CleanUp() {
|
||||
func (fam *RemoteSecondaryForkedAlertmanager) StopAndWait() {
|
||||
fam.internal.StopAndWait()
|
||||
fam.remote.StopAndWait()
|
||||
// TODO: send config and state on shutdown.
|
||||
}
|
||||
|
||||
func (fam *RemoteSecondaryForkedAlertmanager) Ready() bool {
|
||||
|
||||
Reference in New Issue
Block a user