Refactor Prometheus response handling and reduce allocations (#40765)

Signed-off-by: Igor Suleymanov <igor.suleymanov@grafana.com>
This commit is contained in:
Igor Suleymanov 2021-10-22 13:34:21 +03:00 committed by GitHub
parent a091d35f11
commit 3f5dcbfdb5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -161,9 +161,9 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
if err != nil {
plog.Error("Range query", query.Expr, "failed with", err)
result.Responses[query.RefId] = backend.DataResponse{Error: err}
continue
} else {
response[RangeQueryType] = rangeResponse
}
response[RangeQueryType] = rangeResponse
}
if query.InstantQuery {
@ -171,9 +171,9 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
if err != nil {
plog.Error("Instant query", query.Expr, "failed with", err)
result.Responses[query.RefId] = backend.DataResponse{Error: err}
continue
} else {
response[InstantQueryType] = instantResponse
}
response[InstantQueryType] = instantResponse
}
if query.ExemplarQuery {
@ -334,37 +334,32 @@ func (s *Service) parseQuery(queryContext *backend.QueryDataRequest, dsInfo *Dat
}
func parseResponse(value map[PrometheusQueryType]interface{}, query *PrometheusQuery) (data.Frames, error) {
frames := data.Frames{}
var (
frames = data.Frames{}
nextFrames = data.Frames{}
)
for _, value := range value {
matrix, ok := value.(model.Matrix)
if ok {
matrixFrames := matrixToDataFrames(matrix, query)
frames = append(frames, matrixFrames...)
// Zero out the slice to prevent data corruption.
nextFrames = nextFrames[:0]
switch v := value.(type) {
case model.Matrix:
nextFrames = matrixToDataFrames(v, query, nextFrames)
case model.Vector:
nextFrames = vectorToDataFrames(v, query, nextFrames)
case *model.Scalar:
nextFrames = scalarToDataFrames(v, query, nextFrames)
case []apiv1.ExemplarQueryResult:
nextFrames = exemplarToDataFrames(v, query, nextFrames)
default:
plog.Error("Query", query.Expr, "returned unexpected result type", v)
continue
}
vector, ok := value.(model.Vector)
if ok {
vectorFrames := vectorToDataFrames(vector, query)
frames = append(frames, vectorFrames...)
continue
}
scalar, ok := value.(*model.Scalar)
if ok {
scalarFrames := scalarToDataFrames(scalar, query)
frames = append(frames, scalarFrames...)
continue
}
exemplar, ok := value.([]apiv1.ExemplarQueryResult)
if ok {
exemplarFrames := exemplarToDataFrames(exemplar, query)
frames = append(frames, exemplarFrames...)
continue
}
frames = append(frames, nextFrames...)
}
return frames, nil
}
@ -398,9 +393,7 @@ func calculateRateInterval(interval time.Duration, scrapeInterval string, interv
return rateInterval
}
func matrixToDataFrames(matrix model.Matrix, query *PrometheusQuery) data.Frames {
frames := data.Frames{}
func matrixToDataFrames(matrix model.Matrix, query *PrometheusQuery, frames data.Frames) data.Frames {
for _, v := range matrix {
tags := make(map[string]string, len(v.Metric))
for k, v := range v.Metric {
@ -424,61 +417,58 @@ func matrixToDataFrames(matrix model.Matrix, query *PrometheusQuery) data.Frames
valueField.Config = &data.FieldConfig{DisplayNameFromDS: name}
valueField.Labels = tags
frame := data.NewFrame(name, timeField, valueField)
frame.Meta = &data.FrameMeta{
Custom: map[string]string{
"resultType": "matrix",
},
}
frames = append(frames, frame)
frames = append(frames, newDataFrame(name, "matrix", timeField, valueField))
}
return frames
}
func scalarToDataFrames(scalar *model.Scalar, query *PrometheusQuery) data.Frames {
func scalarToDataFrames(scalar *model.Scalar, query *PrometheusQuery, frames data.Frames) data.Frames {
timeVector := []time.Time{time.Unix(scalar.Timestamp.Unix(), 0).UTC()}
values := []float64{float64(scalar.Value)}
name := fmt.Sprintf("%g", values[0])
frame := data.NewFrame(name,
data.NewField("Time", nil, timeVector),
data.NewField("Value", nil, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: name}))
frame.Meta = &data.FrameMeta{
Custom: map[string]string{
"resultType": "scalar",
},
}
frames := data.Frames{frame}
return frames
return append(
frames,
newDataFrame(
name,
"scalar",
data.NewField("Time", nil, timeVector),
data.NewField("Value", nil, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: name}),
),
)
}
func vectorToDataFrames(vector model.Vector, query *PrometheusQuery) data.Frames {
frames := data.Frames{}
func vectorToDataFrames(vector model.Vector, query *PrometheusQuery, frames data.Frames) data.Frames {
for _, v := range vector {
name := formatLegend(v.Metric, query)
tags := make(map[string]string, len(v.Metric))
timeVector := []time.Time{time.Unix(v.Timestamp.Unix(), 0).UTC()}
values := []float64{float64(v.Value)}
for k, v := range v.Metric {
tags[string(k)] = string(v)
}
frame := data.NewFrame(name,
data.NewField("Time", nil, timeVector),
data.NewField("Value", tags, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: name}))
frame.Meta = &data.FrameMeta{
Custom: map[string]string{
"resultType": "vector",
},
}
frames = append(frames, frame)
frames = append(
frames,
newDataFrame(
name,
"vector",
data.NewField("Time", nil, timeVector),
data.NewField("Value", tags, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: name}),
),
)
}
return frames
}
func exemplarToDataFrames(response []apiv1.ExemplarQueryResult, query *PrometheusQuery) data.Frames {
frames := data.Frames{}
events := make([]ExemplarEvent, 0)
func exemplarToDataFrames(response []apiv1.ExemplarQueryResult, query *PrometheusQuery, frames data.Frames) data.Frames {
// TODO: this preallocation is very naive.
// We should figure out a better approximation here.
events := make([]ExemplarEvent, 0, len(response)*2)
for _, exemplarData := range response {
for _, exemplar := range exemplarData.Exemplars {
event := ExemplarEvent{}
@ -486,9 +476,11 @@ func exemplarToDataFrames(response []apiv1.ExemplarQueryResult, query *Prometheu
event.Time = exemplarTime
event.Value = float64(exemplar.Value)
event.Labels = make(map[string]string)
for label, value := range exemplar.Labels {
event.Labels[string(label)] = string(value)
}
for seriesLabel, seriesValue := range exemplarData.SeriesLabels {
event.Labels[string(seriesLabel)] = string(seriesValue)
}
@ -497,11 +489,11 @@ func exemplarToDataFrames(response []apiv1.ExemplarQueryResult, query *Prometheu
}
}
//Sampling of exemplars
// Sampling of exemplars
bucketedExemplars := make(map[string][]ExemplarEvent)
values := make([]float64, 0)
values := make([]float64, 0, len(events))
//Create bucketed exemplars based on aligned timestamp
// Create bucketed exemplars based on aligned timestamp
for _, event := range events {
alignedTs := fmt.Sprintf("%.0f", math.Floor(float64(event.Time.Unix())/query.Step.Seconds())*query.Step.Seconds())
_, ok := bucketedExemplars[alignedTs]
@ -513,18 +505,18 @@ func exemplarToDataFrames(response []apiv1.ExemplarQueryResult, query *Prometheu
values = append(values, event.Value)
}
//Calculate standard deviation
// Calculate standard deviation
standardDeviation := deviation(values)
//Create slice with all of the bucketed exemplars
// Create slice with all of the bucketed exemplars
sampledBuckets := make([]string, len(bucketedExemplars))
for bucketTimes := range bucketedExemplars {
sampledBuckets = append(sampledBuckets, bucketTimes)
}
sort.Strings(sampledBuckets)
//Sample exemplars based ona value, so we are not showing too many of them
sampleExemplars := make([]ExemplarEvent, 0)
// Sample exemplars based ona value, so we are not showing too many of them
sampleExemplars := make([]ExemplarEvent, 0, len(sampledBuckets))
for _, bucket := range sampledBuckets {
exemplarsInBucket := bucketedExemplars[bucket]
if len(exemplarsInBucket) == 1 {
@ -561,13 +553,15 @@ func exemplarToDataFrames(response []apiv1.ExemplarQueryResult, query *Prometheu
}
// Create DF from sampled exemplars
timeVector := make([]time.Time, 0, len(sampleExemplars))
valuesVector := make([]float64, 0, len(sampleExemplars))
timeField := data.NewFieldFromFieldType(data.FieldTypeTime, len(sampleExemplars))
timeField.Name = "Time"
valueField := data.NewFieldFromFieldType(data.FieldTypeFloat64, len(sampleExemplars))
valueField.Name = "Value"
labelsVector := make(map[string][]string, len(sampleExemplars))
for _, exemplar := range sampleExemplars {
timeVector = append(timeVector, exemplar.Time)
valuesVector = append(valuesVector, exemplar.Value)
for i, exemplar := range sampleExemplars {
timeField.Set(i, exemplar.Time)
valueField.Set(i, exemplar.Value)
for label, value := range exemplar.Labels {
if labelsVector[label] == nil {
@ -578,22 +572,13 @@ func exemplarToDataFrames(response []apiv1.ExemplarQueryResult, query *Prometheu
}
}
frame := data.NewFrame("exemplar",
data.NewField("Time", nil, timeVector),
data.NewField("Value", nil, valuesVector))
dataFields := make([]*data.Field, 0, len(labelsVector)+2)
dataFields = append(dataFields, timeField, valueField)
for label, vector := range labelsVector {
frame.Fields = append(frame.Fields, data.NewField(label, nil, vector))
dataFields = append(dataFields, data.NewField(label, nil, vector))
}
frame.Meta = &data.FrameMeta{
Custom: map[string]PrometheusQueryType{
"resultType": "exemplar",
},
}
frames = append(frames, frame)
return frames
return append(frames, newDataFrame("exemplar", "exemplar", dataFields...))
}
func deviation(values []float64) float64 {
@ -608,3 +593,14 @@ func deviation(values []float64) float64 {
}
return math.Sqrt(sd / (valuesLen - 1))
}
func newDataFrame(name string, typ string, fields ...*data.Field) *data.Frame {
frame := data.NewFrame(name, fields...)
frame.Meta = &data.FrameMeta{
Custom: map[string]string{
"resultType": typ,
},
}
return frame
}