Alerting: Use a completely isolated context for state history writes (#64989)

* Add fresh context with timeout and same log properties, re-derive logger

* Unify timeout constants

* Move ctx after shortcut that got added through rebasing

* Unify timeouts

* Port opentracing's SpanFromContext and ContextFromSpan to the grafana tracing package

* Support both opentracing and otel variants

* Better document why we're creating a new ctx

* Add new func to FakeSpan which was added after rebase

* Support grafana-specific traceID key in both tracer implementations
This commit is contained in:
Alexander Weaver 2023-04-04 16:41:46 -05:00 committed by GitHub
parent f1eddbad06
commit fb520edd72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 95 additions and 9 deletions

View File

@ -320,3 +320,15 @@ func (s OpentelemetrySpan) AddEvents(keys []string, values []EventValue) {
}
}
}
func (s OpentelemetrySpan) contextWithSpan(ctx context.Context) context.Context {
if s.span != nil {
ctx = trace.ContextWithSpan(ctx, s.span)
// Grafana also manages its own separate traceID in the context in addition to what opentracing handles.
// It's derived from the span. Ensure that we propagate this too.
if traceID := s.span.SpanContext().TraceID(); traceID.IsValid() {
ctx = context.WithValue(ctx, traceKey{}, traceValue{traceID.String(), s.span.SpanContext().IsSampled()})
}
}
return ctx
}

View File

@ -92,6 +92,10 @@ func (t *FakeSpan) AddEvents(keys []string, values []EventValue) {
}
}
func (t *FakeSpan) contextWithSpan(ctx context.Context) context.Context {
return ctx
}
type FakeTracer struct {
Spans []*FakeSpan
}

View File

@ -76,6 +76,10 @@ type Span interface {
//
// Panics if the length of keys is shorter than the length of values.
AddEvents(keys []string, values []EventValue)
// contextWithSpan returns a context.Context that holds the parent
// context plus a reference to this span.
contextWithSpan(ctx context.Context) context.Context
}
func ProvideService(cfg *setting.Cfg) (Tracer, error) {
@ -146,6 +150,29 @@ func TraceIDFromContext(c context.Context, requireSampled bool) string {
return ""
}
// SpanFromContext returns the Span previously associated with ctx, or nil, if no such span could be found.
// It is the equivalent of opentracing.SpanFromContext and trace.SpanFromContext.
func SpanFromContext(ctx context.Context) Span {
// Look for both opentracing and opentelemetry spans.
if span := opentracing.SpanFromContext(ctx); span != nil {
return OpentracingSpan{span: span}
}
if span := trace.SpanFromContext(ctx); span != nil {
return OpentelemetrySpan{span: span}
}
return nil
}
// ContextWithSpan returns a new context.Context that holds a reference to the given span.
// If span is nil, a new context without an active span is returned.
// It is the equivalent of opentracing.ContextWithSpan and trace.ContextWithSpan.
func ContextWithSpan(ctx context.Context, span Span) context.Context {
if span != nil {
return span.contextWithSpan(ctx)
}
return ctx
}
type Opentracing struct {
enabled bool
address string
@ -324,6 +351,18 @@ func (s OpentracingSpan) AddEvents(keys []string, values []EventValue) {
s.span.LogFields(fields...)
}
func (s OpentracingSpan) contextWithSpan(ctx context.Context) context.Context {
if s.span != nil {
ctx = opentracing.ContextWithSpan(ctx, s.span)
// Grafana also manages its own separate traceID in the context in addition to what opentracing handles.
// It's derived from the span. Ensure that we propagate this too.
if sctx, ok := s.span.Context().(jaeger.SpanContext); ok {
ctx = context.WithValue(ctx, traceKey{}, traceValue{sctx.TraceID().String(), sctx.IsSampled()})
}
}
return ctx
}
func splitTagSettings(input string) map[string]string {
res := map[string]string{}

View File

@ -14,6 +14,7 @@ import (
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/annotations"
"github.com/grafana/grafana/pkg/services/dashboards"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
@ -67,10 +68,23 @@ func (h *AnnotationBackend) Record(ctx context.Context, rule history_model.RuleM
return errCh
}
go func() {
// This is a new background job, so let's create a brand new context for it.
// We want it to be isolated, i.e. we don't want grafana shutdowns to interrupt this work
// immediately but rather try to flush writes.
// This also prevents timeouts or other lingering objects (like transactions) from being
// incorrectly propagated here from other areas.
writeCtx := context.Background()
writeCtx, cancel := context.WithTimeout(writeCtx, StateHistoryWriteTimeout)
writeCtx = history_model.WithRuleData(writeCtx, rule)
writeCtx = tracing.ContextWithSpan(writeCtx, tracing.SpanFromContext(ctx))
go func(ctx context.Context) {
defer cancel()
defer close(errCh)
logger := h.log.FromContext(ctx)
errCh <- h.recordAnnotations(ctx, panel, annotations, rule.OrgID, logger)
}()
}(writeCtx)
return errCh
}

View File

@ -2,6 +2,7 @@ package historian
import (
"strings"
"time"
"github.com/grafana/grafana-plugin-sdk-go/data"
@ -12,6 +13,8 @@ import (
history_model "github.com/grafana/grafana/pkg/services/ngalert/state/historian/model"
)
const StateHistoryWriteTimeout = time.Minute
func shouldRecord(transition state.StateTransition) bool {
if !transition.Changed() {
return false

View File

@ -14,6 +14,7 @@ import (
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
"github.com/grafana/grafana/pkg/services/ngalert/models"
@ -80,8 +81,20 @@ func (h *RemoteLokiBackend) Record(ctx context.Context, rule history_model.RuleM
return errCh
}
go func() {
// This is a new background job, so let's create a brand new context for it.
// We want it to be isolated, i.e. we don't want grafana shutdowns to interrupt this work
// immediately but rather try to flush writes.
// This also prevents timeouts or other lingering objects (like transactions) from being
// incorrectly propagated here from other areas.
writeCtx := context.Background()
writeCtx, cancel := context.WithTimeout(writeCtx, StateHistoryWriteTimeout)
writeCtx = history_model.WithRuleData(writeCtx, rule)
writeCtx = tracing.ContextWithSpan(writeCtx, tracing.SpanFromContext(ctx))
go func(ctx context.Context) {
defer cancel()
defer close(errCh)
logger := h.log.FromContext(ctx)
org := fmt.Sprint(rule.OrgID)
h.metrics.WritesTotal.WithLabelValues(org, "loki").Inc()
@ -93,7 +106,7 @@ func (h *RemoteLokiBackend) Record(ctx context.Context, rule history_model.RuleM
h.metrics.TransitionsFailed.WithLabelValues(org).Add(float64(len(logStream.Values)))
errCh <- fmt.Errorf("failed to save alert state history batch: %w", err)
}
}()
}(writeCtx)
return errCh
}

View File

@ -17,12 +17,8 @@ import (
"github.com/weaveworks/common/http/client"
)
const defaultClientTimeout = 30 * time.Second
func NewRequester() client.Requester {
return &http.Client{
Timeout: defaultClientTimeout,
}
return &http.Client{}
}
// encoder serializes log streams to some byte format.

View File

@ -1,6 +1,7 @@
package model
import (
"context"
"strconv"
"github.com/grafana/grafana/pkg/cmd/grafana-cli/logger"
@ -44,3 +45,7 @@ func NewRuleMeta(r *models.AlertRule, log log.Logger) RuleMeta {
PanelID: panelID,
}
}
func WithRuleData(ctx context.Context, rule RuleMeta) context.Context {
return models.WithRuleKey(ctx, models.AlertRuleKey{OrgID: rule.OrgID, UID: rule.UID})
}