Prometheus: Refactor exemplar sampler (#60278)

This commit is contained in:
Todd Treece 2022-12-30 13:04:35 -05:00 committed by GitHub
parent ce8512ace7
commit 73d5aa4878
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 4880 additions and 241 deletions

View File

@ -1,6 +1,10 @@
package models
import "github.com/grafana/grafana-plugin-sdk-go/data"
import (
"time"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
type ResultType string
@ -40,3 +44,10 @@ func ResultTypeFromFrame(frame *data.Frame) ResultType {
func (r ResultType) String() string {
return string(r)
}
type Exemplar struct {
SeriesLabels map[string]string
Labels map[string]string
Value float64
Timestamp time.Time
}

View File

@ -0,0 +1,80 @@
package exemplar
import (
"time"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
var _ data.Framer = (*Framer)(nil)
type Framer struct {
frames data.Frames
sampler Sampler
labelTracker LabelTracker
meta *data.FrameMeta
refID string
}
func NewFramer(sampler Sampler, labelTracker LabelTracker) *Framer {
return &Framer{
frames: data.Frames{},
sampler: sampler,
labelTracker: labelTracker,
}
}
func (f *Framer) SetMeta(meta *data.FrameMeta) {
f.meta = meta
}
func (f *Framer) SetRefID(refID string) {
f.refID = refID
}
func (f *Framer) AddFrame(frame *data.Frame) {
f.frames = append(f.frames, frame)
}
func (f *Framer) Frames() (data.Frames, error) {
exemplars := f.sampler.Sample()
f.sampler.Reset()
if len(exemplars) == 0 {
return f.frames, nil
}
// the new exemplar frame will be a single frame in long format
// with a timestamp, metric value, and one or more label fields
exemplarFrame := data.NewFrame("exemplar")
exemplarFrame.RefID = f.refID
exemplarFrame.Meta = f.meta
// init the fields for the new exemplar frame
timeField := data.NewField(data.TimeSeriesTimeFieldName, nil, make([]time.Time, 0, len(exemplars)))
valueField := data.NewField(data.TimeSeriesValueFieldName, nil, make([]float64, 0, len(exemplars)))
exemplarFrame.Fields = append(exemplarFrame.Fields, timeField, valueField)
labelNames := f.labelTracker.GetNames()
for _, labelName := range labelNames {
exemplarFrame.Fields = append(exemplarFrame.Fields, data.NewField(labelName, nil, make([]string, 0, len(exemplars))))
}
// add the sampled exemplars to the new exemplar frame
for _, b := range exemplars {
timeField.Append(b.Timestamp)
valueField.Append(b.Value)
for i, labelName := range labelNames {
labelValue, ok := b.Labels[labelName]
if !ok {
// if the label is not present in the exemplar labels, then use the series label
labelValue = b.SeriesLabels[labelName]
}
colIdx := i + 2 // +2 to skip time and value fields
exemplarFrame.Fields[colIdx].Append(labelValue)
}
}
f.frames = append(f.frames, exemplarFrame)
return f.frames, nil
}

View File

@ -0,0 +1,42 @@
package exemplar
import "sort"
var _ LabelTracker = (*labelTracker)(nil)
type LabelTracker interface {
Add(labels map[string]string)
GetNames() []string
}
type labelTracker struct {
labelSet map[string]struct{}
}
func NewLabelTracker() LabelTracker {
return &labelTracker{
labelSet: map[string]struct{}{},
}
}
// Add saves label names that haven't been seen before
// so that they can be used to build the label fields in the exemplar frame
func (l *labelTracker) Add(labels map[string]string) {
for k := range labels {
if _, ok := l.labelSet[k]; !ok {
l.labelSet[k] = struct{}{}
}
}
}
// GetNames returns sorted unique label names
func (l *labelTracker) GetNames() []string {
labelNames := make([]string, 0, len(l.labelSet))
for k := range l.labelSet {
labelNames = append(labelNames, k)
}
sort.SliceStable(labelNames, func(i, j int) bool {
return labelNames[i] < labelNames[j]
})
return labelNames
}

View File

@ -0,0 +1,46 @@
package exemplar
import (
"sort"
"time"
"github.com/grafana/grafana/pkg/tsdb/prometheus/models"
)
type Sampler interface {
Add(models.Exemplar)
SetStep(time.Duration)
Sample() []models.Exemplar
Reset()
}
var _ Sampler = (*NoOpSampler)(nil)
type NoOpSampler struct {
exemplars []models.Exemplar
}
func NewNoOpSampler() Sampler {
return &NoOpSampler{
exemplars: []models.Exemplar{},
}
}
func (e *NoOpSampler) Add(ex models.Exemplar) {
e.exemplars = append(e.exemplars, ex)
}
func (e *NoOpSampler) SetStep(time.Duration) {
// noop
}
func (e *NoOpSampler) Sample() []models.Exemplar {
sort.SliceStable(e.exemplars, func(i, j int) bool {
return e.exemplars[i].Timestamp.Before(e.exemplars[j].Timestamp)
})
return e.exemplars
}
func (e *NoOpSampler) Reset() {
e.exemplars = []models.Exemplar{}
}

View File

@ -0,0 +1,93 @@
package exemplar
import (
"math"
"sort"
"time"
"github.com/grafana/grafana/pkg/tsdb/prometheus/models"
)
type StandardDeviationSampler struct {
step time.Duration
buckets map[time.Time][]models.Exemplar
count int
mean float64
m2 float64
}
func NewStandardDeviationSampler() Sampler {
return &StandardDeviationSampler{
buckets: map[time.Time][]models.Exemplar{},
}
}
func (e *StandardDeviationSampler) SetStep(step time.Duration) {
e.step = step
}
func (e *StandardDeviationSampler) Add(ex models.Exemplar) {
bucketTs := models.AlignTimeRange(ex.Timestamp, e.step, 0)
e.updateAggregations(ex.Value)
if _, exists := e.buckets[bucketTs]; !exists {
e.buckets[bucketTs] = []models.Exemplar{ex}
return
}
e.buckets[bucketTs] = append(e.buckets[bucketTs], ex)
}
// updateAggregations uses Welford's online algorithm for calculating the mean and variance
// https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm
func (e *StandardDeviationSampler) updateAggregations(val float64) {
e.count++
delta := val - e.mean
e.mean += delta / float64(e.count)
delta2 := val - e.mean
e.m2 += delta * delta2
}
// standardDeviation calculates the amount of varation in the data
// https://en.wikipedia.org/wiki/Standard_deviation
func (e *StandardDeviationSampler) standardDeviation() float64 {
if e.count < 2 {
return 0
}
return math.Sqrt(e.m2 / float64(e.count-1))
}
func (e *StandardDeviationSampler) Sample() []models.Exemplar {
exemplars := make([]models.Exemplar, 0, len(e.buckets))
for _, b := range e.buckets {
// sort by value in descending order
sort.SliceStable(b, func(i, j int) bool {
return b[i].Value > b[j].Value
})
sampled := []models.Exemplar{}
for _, ex := range b {
if len(sampled) == 0 {
sampled = append(sampled, ex)
continue
}
// only sample values at least 2 standard deviation distance to previously taken value
prev := sampled[len(sampled)-1]
if e.standardDeviation() != 0.0 && prev.Value-ex.Value > e.standardDeviation()*2.0 {
sampled = append(sampled, ex)
}
}
exemplars = append(exemplars, sampled...)
}
sort.SliceStable(exemplars, func(i, j int) bool {
return exemplars[i].Timestamp.Before(exemplars[j].Timestamp)
})
return exemplars
}
func (e *StandardDeviationSampler) Reset() {
e.step = 0
e.buckets = map[time.Time][]models.Exemplar{}
e.count = 0
e.mean = 0
e.m2 = 0
}

View File

@ -0,0 +1,27 @@
package exemplar_test
import (
"testing"
"time"
"github.com/grafana/grafana-plugin-sdk-go/experimental"
"github.com/grafana/grafana/pkg/tsdb/prometheus/models"
"github.com/grafana/grafana/pkg/tsdb/prometheus/querydata/exemplar"
)
func TestStdDevSampler(t *testing.T) {
sampler := exemplar.NewStandardDeviationSampler().(*exemplar.StandardDeviationSampler)
t.Run("standard deviation sampler", func(t *testing.T) {
tr := models.TimeRange{
Start: time.Unix(0, 0),
End: time.Unix(100000, 0),
}
ex := generateTestExemplars(tr)
sampler.SetStep(600 * time.Second)
for i := 0; i < len(ex); i++ {
sampler.Add(ex[i])
}
framer := exemplar.NewFramer(sampler, exemplar.NewLabelTracker())
experimental.CheckGoldenJSONFramer(t, "testdata", "stddev_sampler", framer, update)
})
}

View File

@ -0,0 +1,44 @@
package exemplar_test
import (
"testing"
"time"
"github.com/grafana/grafana-plugin-sdk-go/experimental"
"github.com/grafana/grafana/pkg/tsdb/prometheus/models"
"github.com/grafana/grafana/pkg/tsdb/prometheus/querydata/exemplar"
)
const update = true
func TestNoOpSampler(t *testing.T) {
sampler := exemplar.NewNoOpSampler().(*exemplar.NoOpSampler)
t.Run("no-op sampler", func(t *testing.T) {
tr := models.TimeRange{
Start: time.Unix(0, 0),
End: time.Unix(2000, 0),
}
ex := generateTestExemplars(tr)
for i := 0; i < len(ex); i++ {
sampler.Add(ex[i])
}
framer := exemplar.NewFramer(sampler, exemplar.NewLabelTracker())
experimental.CheckGoldenJSONFramer(t, "testdata", "noop_sampler", framer, update)
})
}
func generateTestExemplars(tr models.TimeRange) []models.Exemplar {
exemplars := []models.Exemplar{}
next := tr.Start.UTC()
for {
if next.Equal(tr.End) || next.After(tr.End) {
break
}
exemplars = append(exemplars, models.Exemplar{
Timestamp: next,
Value: float64(next.Unix()),
})
next = next.Add(time.Second).UTC()
}
return exemplars
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,392 @@
// 🌟 This was machine generated. Do not edit. 🌟
//
// Frame[0]
// Name: exemplar
// Dimensions: 2 Fields by 167 Rows
// +-------------------------------+-----------------+
// | Name: Time | Name: Value |
// | Labels: | Labels: |
// | Type: []time.Time | Type: []float64 |
// +-------------------------------+-----------------+
// | 1970-01-01 00:09:59 +0000 UTC | 599 |
// | 1970-01-01 00:19:59 +0000 UTC | 1199 |
// | 1970-01-01 00:29:59 +0000 UTC | 1799 |
// | 1970-01-01 00:39:59 +0000 UTC | 2399 |
// | 1970-01-01 00:49:59 +0000 UTC | 2999 |
// | 1970-01-01 00:59:59 +0000 UTC | 3599 |
// | 1970-01-01 01:09:59 +0000 UTC | 4199 |
// | 1970-01-01 01:19:59 +0000 UTC | 4799 |
// | 1970-01-01 01:29:59 +0000 UTC | 5399 |
// | ... | ... |
// +-------------------------------+-----------------+
//
//
// 🌟 This was machine generated. Do not edit. 🌟
{
"status": 200,
"frames": [
{
"schema": {
"name": "exemplar",
"fields": [
{
"name": "Time",
"type": "time",
"typeInfo": {
"frame": "time.Time"
}
},
{
"name": "Value",
"type": "number",
"typeInfo": {
"frame": "float64"
}
}
]
},
"data": {
"values": [
[
599000,
1199000,
1799000,
2399000,
2999000,
3599000,
4199000,
4799000,
5399000,
5999000,
6599000,
7199000,
7799000,
8399000,
8999000,
9599000,
10199000,
10799000,
11399000,
11999000,
12599000,
13199000,
13799000,
14399000,
14999000,
15599000,
16199000,
16799000,
17399000,
17999000,
18599000,
19199000,
19799000,
20399000,
20999000,
21599000,
22199000,
22799000,
23399000,
23999000,
24599000,
25199000,
25799000,
26399000,
26999000,
27599000,
28199000,
28799000,
29399000,
29999000,
30599000,
31199000,
31799000,
32399000,
32999000,
33599000,
34199000,
34799000,
35399000,
35999000,
36599000,
37199000,
37799000,
38399000,
38999000,
39599000,
40199000,
40799000,
41399000,
41999000,
42599000,
43199000,
43799000,
44399000,
44999000,
45599000,
46199000,
46799000,
47399000,
47999000,
48599000,
49199000,
49799000,
50399000,
50999000,
51599000,
52199000,
52799000,
53399000,
53999000,
54599000,
55199000,
55799000,
56399000,
56999000,
57599000,
58199000,
58799000,
59399000,
59999000,
60599000,
61199000,
61799000,
62399000,
62999000,
63599000,
64199000,
64799000,
65399000,
65999000,
66599000,
67199000,
67799000,
68399000,
68999000,
69599000,
70199000,
70799000,
71399000,
71999000,
72599000,
73199000,
73799000,
74399000,
74999000,
75599000,
76199000,
76799000,
77399000,
77999000,
78599000,
79199000,
79799000,
80399000,
80999000,
81599000,
82199000,
82799000,
83399000,
83999000,
84599000,
85199000,
85799000,
86399000,
86999000,
87599000,
88199000,
88799000,
89399000,
89999000,
90599000,
91199000,
91799000,
92399000,
92999000,
93599000,
94199000,
94799000,
95399000,
95999000,
96599000,
97199000,
97799000,
98399000,
98999000,
99599000,
99999000
],
[
599,
1199,
1799,
2399,
2999,
3599,
4199,
4799,
5399,
5999,
6599,
7199,
7799,
8399,
8999,
9599,
10199,
10799,
11399,
11999,
12599,
13199,
13799,
14399,
14999,
15599,
16199,
16799,
17399,
17999,
18599,
19199,
19799,
20399,
20999,
21599,
22199,
22799,
23399,
23999,
24599,
25199,
25799,
26399,
26999,
27599,
28199,
28799,
29399,
29999,
30599,
31199,
31799,
32399,
32999,
33599,
34199,
34799,
35399,
35999,
36599,
37199,
37799,
38399,
38999,
39599,
40199,
40799,
41399,
41999,
42599,
43199,
43799,
44399,
44999,
45599,
46199,
46799,
47399,
47999,
48599,
49199,
49799,
50399,
50999,
51599,
52199,
52799,
53399,
53999,
54599,
55199,
55799,
56399,
56999,
57599,
58199,
58799,
59399,
59999,
60599,
61199,
61799,
62399,
62999,
63599,
64199,
64799,
65399,
65999,
66599,
67199,
67799,
68399,
68999,
69599,
70199,
70799,
71399,
71999,
72599,
73199,
73799,
74399,
74999,
75599,
76199,
76799,
77399,
77999,
78599,
79199,
79799,
80399,
80999,
81599,
82199,
82799,
83399,
83999,
84599,
85199,
85799,
86399,
86999,
87599,
88199,
88799,
89399,
89999,
90599,
91199,
91799,
92399,
92999,
93599,
94199,
94799,
95399,
95999,
96599,
97199,
97799,
98399,
98999,
99599,
99999
]
]
}
}
]
}

View File

@ -1,138 +0,0 @@
package querydata
import (
"math"
"sort"
"time"
"github.com/grafana/grafana/pkg/tsdb/prometheus/models"
)
type exemplar struct {
seriesLabels map[string]string
labels map[string]string
val float64
ts time.Time
}
type exemplarSampler struct {
buckets map[time.Time][]exemplar
labelSet map[string]struct{}
disableSampling bool
count int
mean float64
m2 float64
}
func newExemplarSampler(disableSampling bool) *exemplarSampler {
return &exemplarSampler{
buckets: map[time.Time][]exemplar{},
labelSet: map[string]struct{}{},
disableSampling: disableSampling,
}
}
func (e *exemplarSampler) update(step time.Duration, ts time.Time, val float64, seriesLabels, labels map[string]string) {
bucketTs := models.AlignTimeRange(ts, step, 0)
e.trackNewLabels(seriesLabels, labels)
e.updateAggregations(val)
ex := exemplar{
val: val,
ts: ts,
labels: labels,
seriesLabels: seriesLabels,
}
if _, exists := e.buckets[bucketTs]; !exists {
e.buckets[bucketTs] = []exemplar{ex}
return
}
e.buckets[bucketTs] = append(e.buckets[bucketTs], ex)
}
// updateAggregations uses Welford's online algorithm for calculating the mean and variance
// https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm
func (e *exemplarSampler) updateAggregations(val float64) {
e.count++
delta := val - e.mean
e.mean += delta / float64(e.count)
delta2 := val - e.mean
e.m2 += delta * delta2
}
// standardDeviation calculates the amount of varation in the data
// https://en.wikipedia.org/wiki/Standard_deviation
func (e *exemplarSampler) standardDeviation() float64 {
if e.count < 2 {
return 0
}
return math.Sqrt(e.m2 / float64(e.count-1))
}
// trackNewLabels saves label names that haven't been seen before
// so that they can be used to build the label fields in the exemplar frame
func (e *exemplarSampler) trackNewLabels(seriesLabels, labels map[string]string) {
for k := range labels {
if _, ok := e.labelSet[k]; !ok {
e.labelSet[k] = struct{}{}
}
}
for k := range seriesLabels {
if _, ok := e.labelSet[k]; !ok {
e.labelSet[k] = struct{}{}
}
}
}
// getLabelNames returns sorted unique label names
func (e *exemplarSampler) getLabelNames() []string {
labelNames := make([]string, 0, len(e.labelSet))
for k := range e.labelSet {
labelNames = append(labelNames, k)
}
sort.SliceStable(labelNames, func(i, j int) bool {
return labelNames[i] < labelNames[j]
})
return labelNames
}
// getSampledExemplars returns the exemplars sorted by timestamp
func (e *exemplarSampler) getSampledExemplars() []exemplar {
var exemplars []exemplar
if e.disableSampling {
for _, bucket := range e.buckets {
exemplars = append(exemplars, bucket...)
}
return exemplars
}
exemplars = make([]exemplar, 0, len(e.buckets))
for _, b := range e.buckets {
// sort by value in descending order
sort.SliceStable(b, func(i, j int) bool {
return b[i].val > b[j].val
})
sampled := []exemplar{}
for _, ex := range b {
if len(sampled) == 0 {
sampled = append(sampled, ex)
continue
}
// only sample values at least 2 standard deviation distance to previously taken value
prev := sampled[len(sampled)-1]
if e.standardDeviation() != 0.0 && prev.val-ex.val > e.standardDeviation()*2.0 {
sampled = append(sampled, ex)
}
}
exemplars = append(exemplars, sampled...)
}
sort.SliceStable(exemplars, func(i, j int) bool {
return exemplars[i].ts.Before(exemplars[j].ts)
})
return exemplars
}

View File

@ -14,10 +14,10 @@ import (
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/featuremgmt"
ngalertmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/tsdb/intervalv2"
"github.com/grafana/grafana/pkg/tsdb/prometheus/client"
"github.com/grafana/grafana/pkg/tsdb/prometheus/models"
"github.com/grafana/grafana/pkg/tsdb/prometheus/querydata/exemplar"
"github.com/grafana/grafana/pkg/tsdb/prometheus/utils"
"github.com/grafana/grafana/pkg/util/maputil"
)
@ -35,15 +35,15 @@ type ExemplarEvent struct {
// QueryData handles querying but different from buffered package uses a custom client instead of default Go Prom
// client.
type QueryData struct {
intervalCalculator intervalv2.Calculator
tracer tracing.Tracer
client *client.Client
log log.Logger
ID int64
URL string
TimeInterval string
enableWideSeries bool
disablePrometheusExemplarSampling bool
intervalCalculator intervalv2.Calculator
tracer tracing.Tracer
client *client.Client
log log.Logger
ID int64
URL string
TimeInterval string
enableWideSeries bool
exemplarSampler exemplar.Sampler
}
func New(
@ -66,21 +66,28 @@ func New(
promClient := client.NewClient(httpClient, httpMethod, settings.URL)
// standard deviation sampler is the default for backwards compatibility
exemplarSampler := exemplar.NewStandardDeviationSampler()
if features.IsEnabled(featuremgmt.FlagDisablePrometheusExemplarSampling) {
exemplarSampler = exemplar.NewNoOpSampler()
}
return &QueryData{
intervalCalculator: intervalv2.NewCalculator(),
tracer: tracer,
log: plog,
client: promClient,
TimeInterval: timeInterval,
ID: settings.ID,
URL: settings.URL,
enableWideSeries: features.IsEnabled(featuremgmt.FlagPrometheusWideSeries),
disablePrometheusExemplarSampling: features.IsEnabled(featuremgmt.FlagDisablePrometheusExemplarSampling),
intervalCalculator: intervalv2.NewCalculator(),
tracer: tracer,
log: plog,
client: promClient,
TimeInterval: timeInterval,
ID: settings.ID,
URL: settings.URL,
enableWideSeries: features.IsEnabled(featuremgmt.FlagPrometheusWideSeries),
exemplarSampler: exemplarSampler,
}, nil
}
func (s *QueryData) Execute(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
fromAlert := req.Headers[ngalertmodels.FromAlertHeaderName] == "true"
fromAlert := req.Headers["FromAlert"] == "true"
result := backend.QueryDataResponse{
Responses: backend.Responses{},
}
@ -90,7 +97,7 @@ func (s *QueryData) Execute(ctx context.Context, req *backend.QueryDataRequest)
if err != nil {
return &result, err
}
r, err := s.fetch(ctx, s.client, query)
r, err := s.fetch(ctx, s.client, query, req.Headers)
if err != nil {
return &result, err
}
@ -104,7 +111,7 @@ func (s *QueryData) Execute(ctx context.Context, req *backend.QueryDataRequest)
return &result, nil
}
func (s *QueryData) fetch(ctx context.Context, client *client.Client, q *models.Query) (*backend.DataResponse, error) {
func (s *QueryData) fetch(ctx context.Context, client *client.Client, q *models.Query, headers map[string]string) (*backend.DataResponse, error) {
traceCtx, end := s.trace(ctx, q)
defer end()
@ -117,7 +124,7 @@ func (s *QueryData) fetch(ctx context.Context, client *client.Client, q *models.
}
if q.InstantQuery {
res, err := s.instantQuery(traceCtx, client, q)
res, err := s.instantQuery(traceCtx, client, q, headers)
if err != nil {
return nil, err
}
@ -126,7 +133,7 @@ func (s *QueryData) fetch(ctx context.Context, client *client.Client, q *models.
}
if q.RangeQuery {
res, err := s.rangeQuery(traceCtx, client, q)
res, err := s.rangeQuery(traceCtx, client, q, headers)
if err != nil {
return nil, err
}
@ -141,7 +148,7 @@ func (s *QueryData) fetch(ctx context.Context, client *client.Client, q *models.
}
if q.ExemplarQuery {
res, err := s.exemplarQuery(traceCtx, client, q)
res, err := s.exemplarQuery(traceCtx, client, q, headers)
if err != nil {
// If exemplar query returns error, we want to only log it and
// continue with other results processing
@ -155,7 +162,7 @@ func (s *QueryData) fetch(ctx context.Context, client *client.Client, q *models.
return response, nil
}
func (s *QueryData) rangeQuery(ctx context.Context, c *client.Client, q *models.Query) (*backend.DataResponse, error) {
func (s *QueryData) rangeQuery(ctx context.Context, c *client.Client, q *models.Query, headers map[string]string) (*backend.DataResponse, error) {
res, err := c.QueryRange(ctx, q)
if err != nil {
return nil, err
@ -163,7 +170,7 @@ func (s *QueryData) rangeQuery(ctx context.Context, c *client.Client, q *models.
return s.parseResponse(ctx, q, res)
}
func (s *QueryData) instantQuery(ctx context.Context, c *client.Client, q *models.Query) (*backend.DataResponse, error) {
func (s *QueryData) instantQuery(ctx context.Context, c *client.Client, q *models.Query, headers map[string]string) (*backend.DataResponse, error) {
res, err := c.QueryInstant(ctx, q)
if err != nil {
return nil, err
@ -171,7 +178,7 @@ func (s *QueryData) instantQuery(ctx context.Context, c *client.Client, q *model
return s.parseResponse(ctx, q, res)
}
func (s *QueryData) exemplarQuery(ctx context.Context, c *client.Client, q *models.Query) (*backend.DataResponse, error) {
func (s *QueryData) exemplarQuery(ctx context.Context, c *client.Client, q *models.Query, headers map[string]string) (*backend.DataResponse, error) {
res, err := c.QueryExemplars(ctx, q)
if err != nil {
return nil, err

View File

@ -13,6 +13,7 @@ import (
jsoniter "github.com/json-iterator/go"
"github.com/grafana/grafana/pkg/tsdb/prometheus/models"
"github.com/grafana/grafana/pkg/tsdb/prometheus/querydata/exemplar"
"github.com/grafana/grafana/pkg/util/converter"
)
@ -41,10 +42,59 @@ func (s *QueryData) parseResponse(ctx context.Context, q *models.Query, res *htt
}
}
r = processExemplars(q, r, s.disablePrometheusExemplarSampling)
r = s.processExemplars(q, r)
return r, nil
}
func (s *QueryData) processExemplars(q *models.Query, dr *backend.DataResponse) *backend.DataResponse {
s.exemplarSampler.Reset()
labelTracker := exemplar.NewLabelTracker()
// we are moving from a multi-frame response returned
// by the converter to a single exemplar frame,
// so we need to build a new frame array with the
// old exemplar frames filtered out
framer := exemplar.NewFramer(s.exemplarSampler, labelTracker)
for _, frame := range dr.Frames {
// we don't need to process non-exemplar frames
// so they can be added to the response
if !isExemplarFrame(frame) {
framer.AddFrame(frame)
continue
}
// copy the current exemplar frame metadata
framer.SetMeta(frame.Meta)
framer.SetRefID(frame.RefID)
step := time.Duration(frame.Fields[0].Config.Interval) * time.Millisecond
s.exemplarSampler.SetStep(step)
seriesLabels := getSeriesLabels(frame)
labelTracker.Add(seriesLabels)
for rowIdx := 0; rowIdx < frame.Fields[0].Len(); rowIdx++ {
row := frame.RowCopy(rowIdx)
labels := getLabels(frame, row)
labelTracker.Add(labels)
ex := models.Exemplar{
Labels: labels,
Value: row[1].(float64),
Timestamp: row[0].(time.Time),
SeriesLabels: seriesLabels,
}
s.exemplarSampler.Add(ex)
}
}
frames, err := framer.Frames()
return &backend.DataResponse{
Frames: frames,
Error: err,
}
}
func addMetadataToMultiFrame(q *models.Query, frame *data.Frame) {
if frame.Meta == nil {
frame.Meta = &data.FrameMeta{}
@ -136,79 +186,6 @@ func getName(q *models.Query, field *data.Field) string {
return legend
}
func processExemplars(q *models.Query, dr *backend.DataResponse, disableSampling bool) *backend.DataResponse {
sampler := newExemplarSampler(disableSampling)
// we are moving from a multi-frame response returned
// by the converter to a single exemplar frame,
// so we need to build a new frame array with the
// old exemplar frames filtered out
frames := []*data.Frame{}
// the new exemplar frame will be a single frame in long format
// with a timestamp, metric value, and one or more label fields
exemplarFrame := data.NewFrame("exemplar")
for _, frame := range dr.Frames {
// we don't need to process non-exemplar frames
// so they can be added to the response
if !isExemplarFrame(frame) {
frames = append(frames, frame)
continue
}
// copy the frame metadata to the new exemplar frame
exemplarFrame.Meta = frame.Meta
exemplarFrame.RefID = frame.RefID
step := time.Duration(frame.Fields[0].Config.Interval) * time.Millisecond
seriesLabels := getSeriesLabels(frame)
for rowIdx := 0; rowIdx < frame.Fields[0].Len(); rowIdx++ {
row := frame.RowCopy(rowIdx)
ts := row[0].(time.Time)
val := row[1].(float64)
labels := getLabels(frame, row)
sampler.update(step, ts, val, seriesLabels, labels)
}
}
exemplars := sampler.getSampledExemplars()
if len(exemplars) == 0 {
return dr
}
// init the fields for the new exemplar frame
timeField := data.NewField(data.TimeSeriesTimeFieldName, nil, make([]time.Time, 0, len(exemplars)))
valueField := data.NewField(data.TimeSeriesValueFieldName, nil, make([]float64, 0, len(exemplars)))
exemplarFrame.Fields = append(exemplarFrame.Fields, timeField, valueField)
labelNames := sampler.getLabelNames()
for _, labelName := range labelNames {
exemplarFrame.Fields = append(exemplarFrame.Fields, data.NewField(labelName, nil, make([]string, 0, len(exemplars))))
}
// add the sampled exemplars to the new exemplar frame
for _, b := range exemplars {
timeField.Append(b.ts)
valueField.Append(b.val)
for i, labelName := range labelNames {
labelValue, ok := b.labels[labelName]
if !ok {
// if the label is not present in the exemplar labels, then use the series label
labelValue = b.seriesLabels[labelName]
}
colIdx := i + 2 // +2 to skip time and value fields
exemplarFrame.Fields[colIdx].Append(labelValue)
}
}
frames = append(frames, exemplarFrame)
return &backend.DataResponse{
Frames: frames,
Error: dr.Error,
}
}
func isExemplarFrame(frame *data.Frame) bool {
rt := models.ResultTypeFromFrame(frame)
return rt == models.ResultTypeExemplar