grafana/pkg/services/ngalert/remote/remote_secondary_forked_alertmanager.go
Matthew Jacobson f79dd7c7f9
Alerting: Persist silence state immediately on Create/Delete (#84705)
* Alerting: Persist silence state immediately on Create/Delete

Persists the silence state to the kvstore immediately instead of waiting for the
 next maintenance run. This is used after Create/Delete to prevent silences from
 being lost when a new Alertmanager is started before the state has persisted.
 This can happen, for example, in a rolling deployment scenario.

* Fix test that requires real data

* Don't error if silence state persist fails, maintenance will correct
2024-04-09 13:39:34 -04:00

201 lines
7.4 KiB
Go

package remote
import (
"context"
"fmt"
"sync"
"time"
alertingNotify "github.com/grafana/alerting/notify"
"github.com/grafana/grafana/pkg/infra/log"
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/notifier"
)
type configStore interface {
GetLatestAlertmanagerConfiguration(ctx context.Context, orgID int64) (*models.AlertConfiguration, error)
}
//go:generate mockery --name remoteAlertmanager --structname RemoteAlertmanagerMock --with-expecter --output mock --outpkg alertmanager_mock
type remoteAlertmanager interface {
notifier.Alertmanager
CompareAndSendConfiguration(context.Context, *models.AlertConfiguration) error
CompareAndSendState(context.Context) error
}
type RemoteSecondaryForkedAlertmanager struct {
log log.Logger
orgID int64
store configStore
internal notifier.Alertmanager
remote remoteAlertmanager
lastSync time.Time
syncInterval time.Duration
}
type RemoteSecondaryConfig struct {
Logger log.Logger
OrgID int64
Store configStore
// SyncInterval determines how often we should attempt to synchronize
// state and configuration on the external Alertmanager.
SyncInterval time.Duration
}
func (c *RemoteSecondaryConfig) Validate() error {
if c.Logger == nil {
return fmt.Errorf("logger cannot be nil")
}
return nil
}
func NewRemoteSecondaryForkedAlertmanager(cfg RemoteSecondaryConfig, internal notifier.Alertmanager, remote remoteAlertmanager) (*RemoteSecondaryForkedAlertmanager, error) {
if err := cfg.Validate(); err != nil {
return nil, err
}
return &RemoteSecondaryForkedAlertmanager{
log: cfg.Logger,
orgID: cfg.OrgID,
store: cfg.Store,
internal: internal,
remote: remote,
syncInterval: cfg.SyncInterval,
}, nil
}
// ApplyConfig will only log errors for the remote Alertmanager and ensure we delegate the call to the internal Alertmanager.
// We don't care about errors in the remote Alertmanager in remote secondary mode.
func (fam *RemoteSecondaryForkedAlertmanager) ApplyConfig(ctx context.Context, config *models.AlertConfiguration) error {
var wg sync.WaitGroup
wg.Add(1)
// Figure out if we need to sync the external Alertmanager in another goroutine.
go func() {
defer wg.Done()
// If the Alertmanager has not been marked as "ready" yet, delegate the call to the remote Alertmanager.
// This will perform a readiness check and sync the Alertmanagers.
if !fam.remote.Ready() {
if err := fam.remote.ApplyConfig(ctx, config); err != nil {
fam.log.Error("Error applying config to the remote Alertmanager", "err", err)
return
}
fam.lastSync = time.Now()
return
}
// If the Alertmanager was marked as ready but the sync interval has elapsed, sync the Alertmanagers.
if time.Since(fam.lastSync) >= fam.syncInterval {
fam.log.Debug("Syncing configuration and state with the remote Alertmanager", "lastSync", fam.lastSync)
cfgErr := fam.remote.CompareAndSendConfiguration(ctx, config)
if cfgErr != nil {
fam.log.Error("Unable to upload the configuration to the remote Alertmanager", "err", cfgErr)
}
stateErr := fam.remote.CompareAndSendState(ctx)
if stateErr != nil {
fam.log.Error("Unable to upload the state to the remote Alertmanager", "err", stateErr)
}
fam.log.Debug("Finished syncing configuration and state with the remote Alertmanager")
if cfgErr == nil && stateErr == nil {
fam.lastSync = time.Now()
}
}
}()
// Call ApplyConfig on the internal Alertmanager - we only care about errors for this one.
err := fam.internal.ApplyConfig(ctx, config)
wg.Wait()
return err
}
// SaveAndApplyConfig is only called on the internal Alertmanager when running in remote secondary mode.
func (fam *RemoteSecondaryForkedAlertmanager) SaveAndApplyConfig(ctx context.Context, config *apimodels.PostableUserConfig) error {
return fam.internal.SaveAndApplyConfig(ctx, config)
}
// SaveAndApplyDefaultConfig is only called on the internal Alertmanager when running in remote secondary mode.
func (fam *RemoteSecondaryForkedAlertmanager) SaveAndApplyDefaultConfig(ctx context.Context) error {
return fam.internal.SaveAndApplyDefaultConfig(ctx)
}
func (fam *RemoteSecondaryForkedAlertmanager) GetStatus() apimodels.GettableStatus {
return fam.internal.GetStatus()
}
func (fam *RemoteSecondaryForkedAlertmanager) CreateSilence(ctx context.Context, silence *apimodels.PostableSilence) (string, error) {
return fam.internal.CreateSilence(ctx, silence)
}
func (fam *RemoteSecondaryForkedAlertmanager) DeleteSilence(ctx context.Context, id string) error {
return fam.internal.DeleteSilence(ctx, id)
}
func (fam *RemoteSecondaryForkedAlertmanager) GetSilence(ctx context.Context, id string) (apimodels.GettableSilence, error) {
return fam.internal.GetSilence(ctx, id)
}
func (fam *RemoteSecondaryForkedAlertmanager) ListSilences(ctx context.Context, filter []string) (apimodels.GettableSilences, error) {
return fam.internal.ListSilences(ctx, filter)
}
func (fam *RemoteSecondaryForkedAlertmanager) GetAlerts(ctx context.Context, active, silenced, inhibited bool, filter []string, receiver string) (apimodels.GettableAlerts, error) {
return fam.internal.GetAlerts(ctx, active, silenced, inhibited, filter, receiver)
}
func (fam *RemoteSecondaryForkedAlertmanager) GetAlertGroups(ctx context.Context, active, silenced, inhibited bool, filter []string, receiver string) (apimodels.AlertGroups, error) {
return fam.internal.GetAlertGroups(ctx, active, silenced, inhibited, filter, receiver)
}
func (fam *RemoteSecondaryForkedAlertmanager) PutAlerts(ctx context.Context, alerts apimodels.PostableAlerts) error {
return fam.internal.PutAlerts(ctx, alerts)
}
func (fam *RemoteSecondaryForkedAlertmanager) GetReceivers(ctx context.Context) ([]apimodels.Receiver, error) {
return fam.internal.GetReceivers(ctx)
}
func (fam *RemoteSecondaryForkedAlertmanager) TestReceivers(ctx context.Context, c apimodels.TestReceiversConfigBodyParams) (*notifier.TestReceiversResult, error) {
return fam.internal.TestReceivers(ctx, c)
}
func (fam *RemoteSecondaryForkedAlertmanager) TestTemplate(ctx context.Context, c apimodels.TestTemplatesConfigBodyParams) (*notifier.TestTemplatesResults, error) {
return fam.internal.TestTemplate(ctx, c)
}
func (fam *RemoteSecondaryForkedAlertmanager) SilenceState(ctx context.Context) (alertingNotify.SilenceState, error) {
return fam.internal.SilenceState(ctx)
}
func (fam *RemoteSecondaryForkedAlertmanager) StopAndWait() {
// Stop the internal Alertmanager.
fam.internal.StopAndWait()
// Stop our alert senders.
fam.remote.StopAndWait()
// Send config and state to the remote Alertmanager.
// Using context.TODO() here as we think we want to allow this operation to finish regardless of time.
ctx := context.TODO()
if err := fam.remote.CompareAndSendState(ctx); err != nil {
fam.log.Error("Error sending state to the remote Alertmanager while stopping", "err", err)
}
config, err := fam.store.GetLatestAlertmanagerConfiguration(ctx, fam.orgID)
if err != nil {
fam.log.Error("Error getting latest Alertmanager configuration while stopping", "err", err)
return
}
if err := fam.remote.CompareAndSendConfiguration(ctx, config); err != nil {
fam.log.Error("Error sending configuration to the remote Alertmanager while stopping", "err", err)
}
}
func (fam *RemoteSecondaryForkedAlertmanager) Ready() bool {
// We only care about the internal Alertmanager being ready.
return fam.internal.Ready()
}