From 239d94205ae559774c8f70a04331b64bc8e7e4b3 Mon Sep 17 00:00:00 2001 From: George Robinson Date: Tue, 24 Jan 2023 15:41:38 +0000 Subject: [PATCH] Alerting: Return chan <-error for #61811 (#61858) --- .../ngalert/state/historian/annotation.go | 15 +++++++++++---- .../ngalert/state/historian/annotation_test.go | 2 +- pkg/services/ngalert/state/historian/loki.go | 10 +++++++--- pkg/services/ngalert/state/historian/noop.go | 5 ++++- pkg/services/ngalert/state/historian/sql.go | 5 ++++- pkg/services/ngalert/state/persist.go | 6 ++++-- pkg/services/ngalert/state/testing.go | 5 ++++- 7 files changed, 35 insertions(+), 13 deletions(-) diff --git a/pkg/services/ngalert/state/historian/annotation.go b/pkg/services/ngalert/state/historian/annotation.go index 544f6a45a17..03f1b73faaa 100644 --- a/pkg/services/ngalert/state/historian/annotation.go +++ b/pkg/services/ngalert/state/historian/annotation.go @@ -40,12 +40,17 @@ func NewAnnotationBackend(annotations annotations.Repository, dashboards dashboa } // RecordStates writes a number of state transitions for a given rule to state history. -func (h *AnnotationBackend) RecordStatesAsync(ctx context.Context, rule *ngmodels.AlertRule, states []state.StateTransition) { +func (h *AnnotationBackend) RecordStatesAsync(ctx context.Context, rule *ngmodels.AlertRule, states []state.StateTransition) <-chan error { logger := h.log.FromContext(ctx) // Build annotations before starting goroutine, to make sure all data is copied and won't mutate underneath us. annotations := h.buildAnnotations(rule, states, logger) panel := parsePanelKey(rule, logger) - go h.recordAnnotationsSync(ctx, panel, annotations, logger) + errCh := make(chan error, 1) + go func() { + defer close(errCh) + errCh <- h.recordAnnotationsSync(ctx, panel, annotations, logger) + }() + return errCh } func (h *AnnotationBackend) QueryStates(ctx context.Context, query ngmodels.HistoryQuery) (*data.Frame, error) { @@ -156,12 +161,12 @@ func (h *AnnotationBackend) buildAnnotations(rule *ngmodels.AlertRule, states [] return items } -func (h *AnnotationBackend) recordAnnotationsSync(ctx context.Context, panel *panelKey, annotations []annotations.Item, logger log.Logger) { +func (h *AnnotationBackend) recordAnnotationsSync(ctx context.Context, panel *panelKey, annotations []annotations.Item, logger log.Logger) error { if panel != nil { dashID, err := h.dashboards.getID(ctx, panel.orgID, panel.dashUID) if err != nil { logger.Error("Error getting dashboard for alert annotation", "dashboardUID", panel.dashUID, "error", err) - return + return fmt.Errorf("error getting dashboard for alert annotation: %w", err) } for i := range annotations { @@ -172,9 +177,11 @@ func (h *AnnotationBackend) recordAnnotationsSync(ctx context.Context, panel *pa if err := h.annotations.SaveMany(ctx, annotations); err != nil { logger.Error("Error saving alert annotation batch", "error", err) + return fmt.Errorf("error saving alert annotation batch: %w", err) } logger.Debug("Done saving alert annotation batch") + return nil } func buildAnnotationTextAndData(rule *ngmodels.AlertRule, currentState *state.State) (string, *simplejson.Json) { diff --git a/pkg/services/ngalert/state/historian/annotation_test.go b/pkg/services/ngalert/state/historian/annotation_test.go index bee69be9d56..25497a6d6b6 100644 --- a/pkg/services/ngalert/state/historian/annotation_test.go +++ b/pkg/services/ngalert/state/historian/annotation_test.go @@ -19,7 +19,7 @@ func TestAnnotationHistorian_Integration(t *testing.T) { t.Run("alert annotations are queryable", func(t *testing.T) { anns := createTestAnnotationBackendSut(t) items := []annotations.Item{createAnnotation()} - anns.recordAnnotationsSync(context.Background(), nil, items, log.NewNopLogger()) + require.NoError(t, anns.recordAnnotationsSync(context.Background(), nil, items, log.NewNopLogger())) q := models.HistoryQuery{ RuleUID: "my-rule", diff --git a/pkg/services/ngalert/state/historian/loki.go b/pkg/services/ngalert/state/historian/loki.go index 4b1c64c41bd..dcb6dd2da67 100644 --- a/pkg/services/ngalert/state/historian/loki.go +++ b/pkg/services/ngalert/state/historian/loki.go @@ -43,10 +43,10 @@ func (h *RemoteLokiBackend) TestConnection() error { return h.client.ping() } -func (h *RemoteLokiBackend) RecordStatesAsync(ctx context.Context, rule *models.AlertRule, states []state.StateTransition) { +func (h *RemoteLokiBackend) RecordStatesAsync(ctx context.Context, rule *models.AlertRule, states []state.StateTransition) <-chan error { logger := h.log.FromContext(ctx) streams := h.statesToStreams(rule, states, logger) - h.recordStreamsAsync(ctx, streams, logger) + return h.recordStreamsAsync(ctx, streams, logger) } func (h *RemoteLokiBackend) QueryStates(ctx context.Context, query models.HistoryQuery) (*data.Frame, error) { @@ -102,12 +102,16 @@ func (h *RemoteLokiBackend) statesToStreams(rule *models.AlertRule, states []sta return result } -func (h *RemoteLokiBackend) recordStreamsAsync(ctx context.Context, streams []stream, logger log.Logger) { +func (h *RemoteLokiBackend) recordStreamsAsync(ctx context.Context, streams []stream, logger log.Logger) <-chan error { + errCh := make(chan error, 1) go func() { + defer close(errCh) if err := h.recordStreams(ctx, streams, logger); err != nil { logger.Error("Failed to save alert state history batch", "error", err) + errCh <- fmt.Errorf("failed to save alert state history batch: %w", err) } }() + return errCh } func (h *RemoteLokiBackend) recordStreams(ctx context.Context, streams []stream, logger log.Logger) error { diff --git a/pkg/services/ngalert/state/historian/noop.go b/pkg/services/ngalert/state/historian/noop.go index 76c40643772..a4b7ce1126b 100644 --- a/pkg/services/ngalert/state/historian/noop.go +++ b/pkg/services/ngalert/state/historian/noop.go @@ -14,5 +14,8 @@ func NewNopHistorian() *NoOpHistorian { return &NoOpHistorian{} } -func (f *NoOpHistorian) RecordStatesAsync(ctx context.Context, _ *models.AlertRule, _ []state.StateTransition) { +func (f *NoOpHistorian) RecordStatesAsync(ctx context.Context, _ *models.AlertRule, _ []state.StateTransition) <-chan error { + errCh := make(chan error) + close(errCh) + return errCh } diff --git a/pkg/services/ngalert/state/historian/sql.go b/pkg/services/ngalert/state/historian/sql.go index 09cfe436ac7..8e5d0cf2294 100644 --- a/pkg/services/ngalert/state/historian/sql.go +++ b/pkg/services/ngalert/state/historian/sql.go @@ -19,7 +19,10 @@ func NewSqlBackend() *SqlBackend { } } -func (h *SqlBackend) RecordStatesAsync(ctx context.Context, _ *models.AlertRule, _ []state.StateTransition) { +func (h *SqlBackend) RecordStatesAsync(ctx context.Context, _ *models.AlertRule, _ []state.StateTransition) <-chan error { + errCh := make(chan error) + close(errCh) + return errCh } func (h *SqlBackend) QueryStates(ctx context.Context, query models.HistoryQuery) (*data.Frame, error) { diff --git a/pkg/services/ngalert/state/persist.go b/pkg/services/ngalert/state/persist.go index 1dd8a0366ab..1665444d248 100644 --- a/pkg/services/ngalert/state/persist.go +++ b/pkg/services/ngalert/state/persist.go @@ -22,8 +22,10 @@ type RuleReader interface { // Historian maintains an audit log of alert state history. type Historian interface { - // RecordStates writes a number of state transitions for a given rule to state history. - RecordStatesAsync(ctx context.Context, rule *models.AlertRule, states []StateTransition) + // RecordStates writes a number of state transitions for a given rule to state history. It returns a channel that + // is closed when writing the state transitions has completed. If an error has occurred, the channel will contain a + // non-nil error. + RecordStatesAsync(ctx context.Context, rule *models.AlertRule, states []StateTransition) <-chan error } // ImageCapturer captures images. diff --git a/pkg/services/ngalert/state/testing.go b/pkg/services/ngalert/state/testing.go index 917c785eed4..199d07d7060 100644 --- a/pkg/services/ngalert/state/testing.go +++ b/pkg/services/ngalert/state/testing.go @@ -62,7 +62,10 @@ func (f *FakeRuleReader) ListAlertRules(_ context.Context, q *models.ListAlertRu type FakeHistorian struct{} -func (f *FakeHistorian) RecordStatesAsync(ctx context.Context, rule *models.AlertRule, states []StateTransition) { +func (f *FakeHistorian) RecordStatesAsync(ctx context.Context, rule *models.AlertRule, states []StateTransition) <-chan error { + errCh := make(chan error) + close(errCh) + return errCh } // NotAvailableImageService is a service that returns ErrScreenshotsUnavailable.