mirror of
https://github.com/grafana/grafana.git
synced 2024-11-29 04:04:00 -06:00
Alerting: Improve logging in scheduler and states (#91003)
* handle metadata map nil * remove double context * clean up logging in scheduler * do not reuse loggers from previous ticks * log the dropped tick * log tick instead of ticknum * replace with processing tick logs * log sending notifications * update logging in persister to fetch context * logs to historian moved them upstream to be able to log when store is overridden
This commit is contained in:
parent
1b4b1af9b7
commit
8323b688c6
@ -333,8 +333,10 @@ func ParseStateString(repr string) (State, error) {
|
||||
func buildDatasourceHeaders(ctx context.Context, metadata map[string]string) map[string]string {
|
||||
headers := make(map[string]string, len(metadata)+3)
|
||||
|
||||
for key, value := range metadata {
|
||||
headers[fmt.Sprintf("http_X-Rule-%s", key)] = url.QueryEscape(value)
|
||||
if len(metadata) > 0 {
|
||||
for key, value := range metadata {
|
||||
headers[fmt.Sprintf("http_X-Rule-%s", key)] = url.QueryEscape(value)
|
||||
}
|
||||
}
|
||||
|
||||
// Many data sources check this in query method as sometimes alerting needs special considerations.
|
||||
|
@ -1,7 +1,7 @@
|
||||
package schedule
|
||||
|
||||
import (
|
||||
context "context"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
@ -227,8 +227,7 @@ func (a *alertRule) Stop(reason error) {
|
||||
|
||||
func (a *alertRule) Run() error {
|
||||
grafanaCtx := a.ctx
|
||||
logger := a.logger
|
||||
logger.Debug("Alert rule routine started")
|
||||
a.logger.Debug("Alert rule routine started")
|
||||
|
||||
var currentFingerprint fingerprint
|
||||
defer a.stopApplied()
|
||||
@ -237,20 +236,23 @@ func (a *alertRule) Run() error {
|
||||
// used by external services (API) to notify that rule is updated.
|
||||
case ctx := <-a.updateCh:
|
||||
if currentFingerprint == ctx.Fingerprint {
|
||||
logger.Info("Rule's fingerprint has not changed. Skip resetting the state", "currentFingerprint", currentFingerprint)
|
||||
a.logger.Info("Rule's fingerprint has not changed. Skip resetting the state", "currentFingerprint", currentFingerprint)
|
||||
continue
|
||||
}
|
||||
|
||||
logger.Info("Clearing the state of the rule because it was updated", "isPaused", ctx.IsPaused, "fingerprint", ctx.Fingerprint)
|
||||
a.logger.Info("Clearing the state of the rule because it was updated", "isPaused", ctx.IsPaused, "fingerprint", ctx.Fingerprint)
|
||||
// clear the state. So the next evaluation will start from the scratch.
|
||||
a.resetState(grafanaCtx, ctx.IsPaused)
|
||||
currentFingerprint = ctx.Fingerprint
|
||||
// evalCh - used by the scheduler to signal that evaluation is needed.
|
||||
case ctx, ok := <-a.evalCh:
|
||||
if !ok {
|
||||
logger.Debug("Evaluation channel has been closed. Exiting")
|
||||
a.logger.Debug("Evaluation channel has been closed. Exiting")
|
||||
return nil
|
||||
}
|
||||
f := ctx.Fingerprint()
|
||||
logger := a.logger.New("version", ctx.rule.Version, "fingerprint", f, "now", ctx.scheduledAt)
|
||||
logger.Debug("Processing tick")
|
||||
|
||||
func() {
|
||||
orgID := fmt.Sprint(a.key.OrgID)
|
||||
@ -265,11 +267,11 @@ func (a *alertRule) Run() error {
|
||||
|
||||
for attempt := int64(1); attempt <= a.maxAttempts; attempt++ {
|
||||
isPaused := ctx.rule.IsPaused
|
||||
f := ctx.Fingerprint()
|
||||
|
||||
// Do not clean up state if the eval loop has just started.
|
||||
var needReset bool
|
||||
if currentFingerprint != 0 && currentFingerprint != f {
|
||||
logger.Debug("Got a new version of alert rule. Clear up the state", "fingerprint", f)
|
||||
logger.Debug("Got a new version of alert rule. Clear up the state", "current_fingerprint", currentFingerprint, "fingerprint", f)
|
||||
needReset = true
|
||||
}
|
||||
// We need to reset state if the loop has started and the alert is already paused. It can happen,
|
||||
@ -307,20 +309,20 @@ func (a *alertRule) Run() error {
|
||||
logger.Error("Skip evaluation and updating the state because the context has been cancelled", "version", ctx.rule.Version, "fingerprint", f, "attempt", attempt, "now", ctx.scheduledAt)
|
||||
return
|
||||
}
|
||||
|
||||
retry := attempt < a.maxAttempts
|
||||
err := a.evaluate(tracingCtx, f, attempt, ctx, span, retry)
|
||||
err := a.evaluate(tracingCtx, ctx, span, retry, logger)
|
||||
// This is extremely confusing - when we exhaust all retry attempts, or we have no retryable errors
|
||||
// we return nil - so technically, this is meaningless to know whether the evaluation has errors or not.
|
||||
span.End()
|
||||
if err == nil {
|
||||
logger.Debug("Tick processed", "attempt", attempt, "duration", a.clock.Now().Sub(evalStart))
|
||||
return
|
||||
}
|
||||
|
||||
logger.Error("Failed to evaluate rule", "version", ctx.rule.Version, "fingerprint", f, "attempt", attempt, "now", ctx.scheduledAt, "error", err)
|
||||
logger.Error("Failed to evaluate rule", "attempt", attempt, "error", err)
|
||||
select {
|
||||
case <-tracingCtx.Done():
|
||||
logger.Error("Context has been cancelled while backing off", "version", ctx.rule.Version, "fingerprint", f, "attempt", attempt, "now", ctx.scheduledAt)
|
||||
logger.Error("Context has been cancelled while backing off", "attempt", attempt)
|
||||
return
|
||||
case <-time.After(retryDelay):
|
||||
continue
|
||||
@ -339,13 +341,13 @@ func (a *alertRule) Run() error {
|
||||
states := a.stateManager.DeleteStateByRuleUID(ngmodels.WithRuleKey(ctx, a.key), a.key, ngmodels.StateReasonRuleDeleted)
|
||||
a.expireAndSend(grafanaCtx, states)
|
||||
}
|
||||
logger.Debug("Stopping alert rule routine")
|
||||
a.logger.Debug("Stopping alert rule routine")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (a *alertRule) evaluate(ctx context.Context, f fingerprint, attempt int64, e *Evaluation, span trace.Span, retry bool) error {
|
||||
func (a *alertRule) evaluate(ctx context.Context, e *Evaluation, span trace.Span, retry bool, logger log.Logger) error {
|
||||
orgID := fmt.Sprint(a.key.OrgID)
|
||||
evalAttemptTotal := a.metrics.EvalAttemptTotal.WithLabelValues(orgID)
|
||||
evalAttemptFailures := a.metrics.EvalAttemptFailures.WithLabelValues(orgID)
|
||||
@ -353,7 +355,6 @@ func (a *alertRule) evaluate(ctx context.Context, f fingerprint, attempt int64,
|
||||
processDuration := a.metrics.ProcessDuration.WithLabelValues(orgID)
|
||||
sendDuration := a.metrics.SendDuration.WithLabelValues(orgID)
|
||||
|
||||
logger := a.logger.FromContext(ctx).New("version", e.rule.Version, "fingerprint", f, "attempt", attempt, "now", e.scheduledAt).FromContext(ctx)
|
||||
start := a.clock.Now()
|
||||
|
||||
evalCtx := eval.NewContextWithPreviousResults(ctx, SchedulerUserFor(e.rule.OrgID), a.newLoadedMetricsReader(e.rule))
|
||||
@ -430,7 +431,7 @@ func (a *alertRule) evaluate(ctx context.Context, f fingerprint, attempt int64,
|
||||
state.GetRuleExtraLabels(logger, e.rule, e.folderTitle, !a.disableGrafanaFolder),
|
||||
func(ctx context.Context, statesToSend state.StateTransitions) {
|
||||
start := a.clock.Now()
|
||||
alerts := a.send(ctx, statesToSend)
|
||||
alerts := a.send(ctx, logger, statesToSend)
|
||||
span.AddEvent("results sent", trace.WithAttributes(
|
||||
attribute.Int64("alerts_sent", int64(len(alerts.PostableAlerts))),
|
||||
))
|
||||
@ -443,13 +444,14 @@ func (a *alertRule) evaluate(ctx context.Context, f fingerprint, attempt int64,
|
||||
}
|
||||
|
||||
// send sends alerts for the given state transitions.
|
||||
func (a *alertRule) send(ctx context.Context, states state.StateTransitions) definitions.PostableAlerts {
|
||||
func (a *alertRule) send(ctx context.Context, logger log.Logger, states state.StateTransitions) definitions.PostableAlerts {
|
||||
alerts := definitions.PostableAlerts{PostableAlerts: make([]models.PostableAlert, 0, len(states))}
|
||||
for _, alertState := range states {
|
||||
alerts.PostableAlerts = append(alerts.PostableAlerts, *state.StateToPostableAlert(alertState, a.appURL))
|
||||
}
|
||||
|
||||
if len(alerts.PostableAlerts) > 0 {
|
||||
logger.Debug("Sending transitions to notifier", "transitions", len(alerts.PostableAlerts))
|
||||
a.sender.Send(ctx, a.key, alerts)
|
||||
}
|
||||
return alerts
|
||||
|
@ -309,7 +309,7 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.
|
||||
}
|
||||
|
||||
if isReadyToRun {
|
||||
logger.Debug("Rule is ready to run on the current tick", "tick", tickNum, "frequency", itemFrequency, "offset", offset)
|
||||
logger.Debug("Rule is ready to run on the current tick", "tick", tick, "frequency", itemFrequency, "offset", offset)
|
||||
readyToRun = append(readyToRun, readyToRunItem{ruleRoutine: ruleRoutine, Evaluation: Evaluation{
|
||||
scheduledAt: tick,
|
||||
rule: item,
|
||||
@ -358,7 +358,7 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.
|
||||
return
|
||||
}
|
||||
if dropped != nil {
|
||||
sch.log.Warn("Tick dropped because alert rule evaluation is too slow", append(key.LogContext(), "time", tick)...)
|
||||
sch.log.Warn("Tick dropped because alert rule evaluation is too slow", append(key.LogContext(), "time", tick, "droppedTick", dropped.scheduledAt)...)
|
||||
orgID := fmt.Sprint(key.OrgID)
|
||||
sch.metrics.EvaluationMissed.WithLabelValues(orgID, item.rule.Title).Inc()
|
||||
}
|
||||
|
@ -95,8 +95,15 @@ func (h *AnnotationBackend) Record(ctx context.Context, rule history_model.RuleM
|
||||
defer cancel()
|
||||
defer close(errCh)
|
||||
logger := h.log.FromContext(ctx)
|
||||
logger.Debug("Saving state history batch", "samples", len(annotations))
|
||||
|
||||
errCh <- h.store.Save(ctx, panel, annotations, rule.OrgID, logger)
|
||||
err := h.store.Save(ctx, panel, annotations, rule.OrgID, logger)
|
||||
if err != nil {
|
||||
logger.Error("Failed to save history batch", len(annotations), "err", err)
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
logger.Debug("Done saving history batch", "samples", len(annotations))
|
||||
}(writeCtx)
|
||||
return errCh
|
||||
}
|
||||
|
@ -47,13 +47,10 @@ func (s *AnnotationServiceStore) Save(ctx context.Context, panel *PanelKey, anno
|
||||
s.metrics.WritesTotal.WithLabelValues(org, "annotations").Inc()
|
||||
s.metrics.TransitionsTotal.WithLabelValues(org).Add(float64(len(annotations)))
|
||||
if err := s.svc.SaveMany(ctx, annotations); err != nil {
|
||||
logger.Error("Error saving alert annotation batch", "error", err)
|
||||
s.metrics.WritesFailed.WithLabelValues(org, "annotations").Inc()
|
||||
s.metrics.TransitionsFailed.WithLabelValues(org).Add(float64(len(annotations)))
|
||||
return fmt.Errorf("error saving alert annotation batch: %w", err)
|
||||
}
|
||||
|
||||
logger.Debug("Done saving alert annotation batch")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -123,12 +123,12 @@ func (h *RemoteLokiBackend) Record(ctx context.Context, rule history_model.RuleM
|
||||
defer cancel()
|
||||
defer close(errCh)
|
||||
logger := h.log.FromContext(ctx)
|
||||
|
||||
logger.Debug("Saving state history batch", "samples", len(logStream.Values))
|
||||
org := fmt.Sprint(rule.OrgID)
|
||||
h.metrics.WritesTotal.WithLabelValues(org, "loki").Inc()
|
||||
h.metrics.TransitionsTotal.WithLabelValues(org).Add(float64(len(logStream.Values)))
|
||||
|
||||
if err := h.recordStreams(ctx, []Stream{logStream}, logger); err != nil {
|
||||
if err := h.recordStreams(ctx, logStream, logger); err != nil {
|
||||
logger.Error("Failed to save alert state history batch", "error", err)
|
||||
h.metrics.WritesFailed.WithLabelValues(org, "loki").Inc()
|
||||
h.metrics.TransitionsFailed.WithLabelValues(org).Add(float64(len(logStream.Values)))
|
||||
@ -336,12 +336,12 @@ func StatesToStream(rule history_model.RuleMeta, states []state.StateTransition,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *RemoteLokiBackend) recordStreams(ctx context.Context, streams []Stream, logger log.Logger) error {
|
||||
if err := h.client.Push(ctx, streams); err != nil {
|
||||
func (h *RemoteLokiBackend) recordStreams(ctx context.Context, stream Stream, logger log.Logger) error {
|
||||
if err := h.client.Push(ctx, []Stream{stream}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Debug("Done saving alert state history batch")
|
||||
logger.Debug("Done saving alert state history batch", "samples", len(stream.Values))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -52,8 +52,8 @@ func (a *SyncStatePersister) deleteAlertStates(ctx context.Context, states []Sta
|
||||
if a.store == nil || len(states) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
a.log.Debug("Deleting alert states", "count", len(states))
|
||||
logger := a.log.FromContext(ctx)
|
||||
logger.Debug("Deleting alert states", "count", len(states))
|
||||
toDelete := make([]ngModels.AlertInstanceKey, 0, len(states))
|
||||
|
||||
for _, s := range states {
|
||||
@ -67,7 +67,7 @@ func (a *SyncStatePersister) deleteAlertStates(ctx context.Context, states []Sta
|
||||
|
||||
err := a.store.DeleteAlertInstances(ctx, toDelete...)
|
||||
if err != nil {
|
||||
a.log.Error("Failed to delete stale states", "error", err)
|
||||
logger.Error("Failed to delete stale states", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -75,7 +75,7 @@ func (a *SyncStatePersister) saveAlertStates(ctx context.Context, states ...Stat
|
||||
if a.store == nil || len(states) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
logger := a.log.FromContext(ctx)
|
||||
saveState := func(ctx context.Context, idx int) error {
|
||||
s := states[idx]
|
||||
|
||||
@ -91,7 +91,7 @@ func (a *SyncStatePersister) saveAlertStates(ctx context.Context, states ...Stat
|
||||
|
||||
key, err := s.GetAlertInstanceKey()
|
||||
if err != nil {
|
||||
a.log.Error("Failed to create a key for alert state to save it to database. The state will be ignored ", "cacheID", s.CacheID, "error", err, "labels", s.Labels.String())
|
||||
logger.Error("Failed to create a key for alert state to save it to database. The state will be ignored ", "cacheID", s.CacheID, "error", err, "labels", s.Labels.String())
|
||||
return nil
|
||||
}
|
||||
instance := ngModels.AlertInstance{
|
||||
@ -108,14 +108,14 @@ func (a *SyncStatePersister) saveAlertStates(ctx context.Context, states ...Stat
|
||||
|
||||
err = a.store.SaveAlertInstance(ctx, instance)
|
||||
if err != nil {
|
||||
a.log.Error("Failed to save alert state", "labels", s.Labels.String(), "state", s.State, "error", err)
|
||||
logger.Error("Failed to save alert state", "labels", s.Labels.String(), "state", s.State, "error", err)
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
a.log.Debug("Saving alert states", "count", len(states), "max_state_save_concurrency", a.maxStateSaveConcurrency)
|
||||
logger.Debug("Saving alert states", "count", len(states), "max_state_save_concurrency", a.maxStateSaveConcurrency)
|
||||
_ = concurrency.ForEachJob(ctx, len(states), a.maxStateSaveConcurrency, saveState)
|
||||
a.log.Debug("Saving alert states done", "count", len(states), "max_state_save_concurrency", a.maxStateSaveConcurrency, "duration", time.Since(start))
|
||||
logger.Debug("Saving alert states done", "count", len(states), "max_state_save_concurrency", a.maxStateSaveConcurrency, "duration", time.Since(start))
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user