Prometheus: Fix concurrency issue with exemplar sampler initialization (#61281)

This commit is contained in:
Todd Treece 2023-01-11 08:27:47 -05:00 committed by GitHub
parent f2ffce4351
commit 19ca93d5ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 7 additions and 7 deletions

View File

@ -43,7 +43,7 @@ type QueryData struct {
URL string URL string
TimeInterval string TimeInterval string
enableWideSeries bool enableWideSeries bool
exemplarSampler exemplar.Sampler exemplarSampler func() exemplar.Sampler
} }
func New( func New(
@ -67,10 +67,10 @@ func New(
promClient := client.NewClient(httpClient, httpMethod, settings.URL) promClient := client.NewClient(httpClient, httpMethod, settings.URL)
// standard deviation sampler is the default for backwards compatibility // standard deviation sampler is the default for backwards compatibility
exemplarSampler := exemplar.NewStandardDeviationSampler() exemplarSampler := exemplar.NewStandardDeviationSampler
if features.IsEnabled(featuremgmt.FlagDisablePrometheusExemplarSampling) { if features.IsEnabled(featuremgmt.FlagDisablePrometheusExemplarSampling) {
exemplarSampler = exemplar.NewNoOpSampler() exemplarSampler = exemplar.NewNoOpSampler
} }
return &QueryData{ return &QueryData{

View File

@ -47,14 +47,14 @@ func (s *QueryData) parseResponse(ctx context.Context, q *models.Query, res *htt
} }
func (s *QueryData) processExemplars(q *models.Query, dr *backend.DataResponse) *backend.DataResponse { func (s *QueryData) processExemplars(q *models.Query, dr *backend.DataResponse) *backend.DataResponse {
s.exemplarSampler.Reset() sampler := s.exemplarSampler()
labelTracker := exemplar.NewLabelTracker() labelTracker := exemplar.NewLabelTracker()
// we are moving from a multi-frame response returned // we are moving from a multi-frame response returned
// by the converter to a single exemplar frame, // by the converter to a single exemplar frame,
// so we need to build a new frame array with the // so we need to build a new frame array with the
// old exemplar frames filtered out // old exemplar frames filtered out
framer := exemplar.NewFramer(s.exemplarSampler, labelTracker) framer := exemplar.NewFramer(sampler, labelTracker)
for _, frame := range dr.Frames { for _, frame := range dr.Frames {
// we don't need to process non-exemplar frames // we don't need to process non-exemplar frames
@ -69,7 +69,7 @@ func (s *QueryData) processExemplars(q *models.Query, dr *backend.DataResponse)
framer.SetRefID(frame.RefID) framer.SetRefID(frame.RefID)
step := time.Duration(frame.Fields[0].Config.Interval) * time.Millisecond step := time.Duration(frame.Fields[0].Config.Interval) * time.Millisecond
s.exemplarSampler.SetStep(step) sampler.SetStep(step)
seriesLabels := getSeriesLabels(frame) seriesLabels := getSeriesLabels(frame)
labelTracker.Add(seriesLabels) labelTracker.Add(seriesLabels)
@ -83,7 +83,7 @@ func (s *QueryData) processExemplars(q *models.Query, dr *backend.DataResponse)
Timestamp: row[0].(time.Time), Timestamp: row[0].(time.Time),
SeriesLabels: seriesLabels, SeriesLabels: seriesLabels,
} }
s.exemplarSampler.Add(ex) sampler.Add(ex)
} }
} }