diff --git a/pkg/tsdb/prometheus/prometheus.go b/pkg/tsdb/prometheus/prometheus.go index 6043861773e..5366b1a2339 100644 --- a/pkg/tsdb/prometheus/prometheus.go +++ b/pkg/tsdb/prometheus/prometheus.go @@ -161,9 +161,9 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) if err != nil { plog.Error("Range query", query.Expr, "failed with", err) result.Responses[query.RefId] = backend.DataResponse{Error: err} - continue + } else { + response[RangeQueryType] = rangeResponse } - response[RangeQueryType] = rangeResponse } if query.InstantQuery { @@ -171,9 +171,9 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) if err != nil { plog.Error("Instant query", query.Expr, "failed with", err) result.Responses[query.RefId] = backend.DataResponse{Error: err} - continue + } else { + response[InstantQueryType] = instantResponse } - response[InstantQueryType] = instantResponse } if query.ExemplarQuery { @@ -334,37 +334,32 @@ func (s *Service) parseQuery(queryContext *backend.QueryDataRequest, dsInfo *Dat } func parseResponse(value map[PrometheusQueryType]interface{}, query *PrometheusQuery) (data.Frames, error) { - frames := data.Frames{} + var ( + frames = data.Frames{} + nextFrames = data.Frames{} + ) for _, value := range value { - matrix, ok := value.(model.Matrix) - if ok { - matrixFrames := matrixToDataFrames(matrix, query) - frames = append(frames, matrixFrames...) + // Zero out the slice to prevent data corruption. + nextFrames = nextFrames[:0] + + switch v := value.(type) { + case model.Matrix: + nextFrames = matrixToDataFrames(v, query, nextFrames) + case model.Vector: + nextFrames = vectorToDataFrames(v, query, nextFrames) + case *model.Scalar: + nextFrames = scalarToDataFrames(v, query, nextFrames) + case []apiv1.ExemplarQueryResult: + nextFrames = exemplarToDataFrames(v, query, nextFrames) + default: + plog.Error("Query", query.Expr, "returned unexpected result type", v) continue } - vector, ok := value.(model.Vector) - if ok { - vectorFrames := vectorToDataFrames(vector, query) - frames = append(frames, vectorFrames...) - continue - } - - scalar, ok := value.(*model.Scalar) - if ok { - scalarFrames := scalarToDataFrames(scalar, query) - frames = append(frames, scalarFrames...) - continue - } - - exemplar, ok := value.([]apiv1.ExemplarQueryResult) - if ok { - exemplarFrames := exemplarToDataFrames(exemplar, query) - frames = append(frames, exemplarFrames...) - continue - } + frames = append(frames, nextFrames...) } + return frames, nil } @@ -398,9 +393,7 @@ func calculateRateInterval(interval time.Duration, scrapeInterval string, interv return rateInterval } -func matrixToDataFrames(matrix model.Matrix, query *PrometheusQuery) data.Frames { - frames := data.Frames{} - +func matrixToDataFrames(matrix model.Matrix, query *PrometheusQuery, frames data.Frames) data.Frames { for _, v := range matrix { tags := make(map[string]string, len(v.Metric)) for k, v := range v.Metric { @@ -424,61 +417,58 @@ func matrixToDataFrames(matrix model.Matrix, query *PrometheusQuery) data.Frames valueField.Config = &data.FieldConfig{DisplayNameFromDS: name} valueField.Labels = tags - frame := data.NewFrame(name, timeField, valueField) - frame.Meta = &data.FrameMeta{ - Custom: map[string]string{ - "resultType": "matrix", - }, - } - frames = append(frames, frame) + frames = append(frames, newDataFrame(name, "matrix", timeField, valueField)) } + return frames } -func scalarToDataFrames(scalar *model.Scalar, query *PrometheusQuery) data.Frames { +func scalarToDataFrames(scalar *model.Scalar, query *PrometheusQuery, frames data.Frames) data.Frames { timeVector := []time.Time{time.Unix(scalar.Timestamp.Unix(), 0).UTC()} values := []float64{float64(scalar.Value)} name := fmt.Sprintf("%g", values[0]) - frame := data.NewFrame(name, - data.NewField("Time", nil, timeVector), - data.NewField("Value", nil, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: name})) - frame.Meta = &data.FrameMeta{ - Custom: map[string]string{ - "resultType": "scalar", - }, - } - frames := data.Frames{frame} - return frames + return append( + frames, + newDataFrame( + name, + "scalar", + data.NewField("Time", nil, timeVector), + data.NewField("Value", nil, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: name}), + ), + ) } -func vectorToDataFrames(vector model.Vector, query *PrometheusQuery) data.Frames { - frames := data.Frames{} +func vectorToDataFrames(vector model.Vector, query *PrometheusQuery, frames data.Frames) data.Frames { for _, v := range vector { name := formatLegend(v.Metric, query) tags := make(map[string]string, len(v.Metric)) timeVector := []time.Time{time.Unix(v.Timestamp.Unix(), 0).UTC()} values := []float64{float64(v.Value)} + for k, v := range v.Metric { tags[string(k)] = string(v) } - frame := data.NewFrame(name, - data.NewField("Time", nil, timeVector), - data.NewField("Value", tags, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: name})) - frame.Meta = &data.FrameMeta{ - Custom: map[string]string{ - "resultType": "vector", - }, - } - frames = append(frames, frame) + + frames = append( + frames, + newDataFrame( + name, + "vector", + data.NewField("Time", nil, timeVector), + data.NewField("Value", tags, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: name}), + ), + ) } return frames } -func exemplarToDataFrames(response []apiv1.ExemplarQueryResult, query *PrometheusQuery) data.Frames { - frames := data.Frames{} - events := make([]ExemplarEvent, 0) +func exemplarToDataFrames(response []apiv1.ExemplarQueryResult, query *PrometheusQuery, frames data.Frames) data.Frames { + // TODO: this preallocation is very naive. + // We should figure out a better approximation here. + events := make([]ExemplarEvent, 0, len(response)*2) + for _, exemplarData := range response { for _, exemplar := range exemplarData.Exemplars { event := ExemplarEvent{} @@ -486,9 +476,11 @@ func exemplarToDataFrames(response []apiv1.ExemplarQueryResult, query *Prometheu event.Time = exemplarTime event.Value = float64(exemplar.Value) event.Labels = make(map[string]string) + for label, value := range exemplar.Labels { event.Labels[string(label)] = string(value) } + for seriesLabel, seriesValue := range exemplarData.SeriesLabels { event.Labels[string(seriesLabel)] = string(seriesValue) } @@ -497,11 +489,11 @@ func exemplarToDataFrames(response []apiv1.ExemplarQueryResult, query *Prometheu } } - //Sampling of exemplars + // Sampling of exemplars bucketedExemplars := make(map[string][]ExemplarEvent) - values := make([]float64, 0) + values := make([]float64, 0, len(events)) - //Create bucketed exemplars based on aligned timestamp + // Create bucketed exemplars based on aligned timestamp for _, event := range events { alignedTs := fmt.Sprintf("%.0f", math.Floor(float64(event.Time.Unix())/query.Step.Seconds())*query.Step.Seconds()) _, ok := bucketedExemplars[alignedTs] @@ -513,18 +505,18 @@ func exemplarToDataFrames(response []apiv1.ExemplarQueryResult, query *Prometheu values = append(values, event.Value) } - //Calculate standard deviation + // Calculate standard deviation standardDeviation := deviation(values) - //Create slice with all of the bucketed exemplars + // Create slice with all of the bucketed exemplars sampledBuckets := make([]string, len(bucketedExemplars)) for bucketTimes := range bucketedExemplars { sampledBuckets = append(sampledBuckets, bucketTimes) } sort.Strings(sampledBuckets) - //Sample exemplars based ona value, so we are not showing too many of them - sampleExemplars := make([]ExemplarEvent, 0) + // Sample exemplars based ona value, so we are not showing too many of them + sampleExemplars := make([]ExemplarEvent, 0, len(sampledBuckets)) for _, bucket := range sampledBuckets { exemplarsInBucket := bucketedExemplars[bucket] if len(exemplarsInBucket) == 1 { @@ -561,13 +553,15 @@ func exemplarToDataFrames(response []apiv1.ExemplarQueryResult, query *Prometheu } // Create DF from sampled exemplars - timeVector := make([]time.Time, 0, len(sampleExemplars)) - valuesVector := make([]float64, 0, len(sampleExemplars)) + timeField := data.NewFieldFromFieldType(data.FieldTypeTime, len(sampleExemplars)) + timeField.Name = "Time" + valueField := data.NewFieldFromFieldType(data.FieldTypeFloat64, len(sampleExemplars)) + valueField.Name = "Value" labelsVector := make(map[string][]string, len(sampleExemplars)) - for _, exemplar := range sampleExemplars { - timeVector = append(timeVector, exemplar.Time) - valuesVector = append(valuesVector, exemplar.Value) + for i, exemplar := range sampleExemplars { + timeField.Set(i, exemplar.Time) + valueField.Set(i, exemplar.Value) for label, value := range exemplar.Labels { if labelsVector[label] == nil { @@ -578,22 +572,13 @@ func exemplarToDataFrames(response []apiv1.ExemplarQueryResult, query *Prometheu } } - frame := data.NewFrame("exemplar", - data.NewField("Time", nil, timeVector), - data.NewField("Value", nil, valuesVector)) - + dataFields := make([]*data.Field, 0, len(labelsVector)+2) + dataFields = append(dataFields, timeField, valueField) for label, vector := range labelsVector { - frame.Fields = append(frame.Fields, data.NewField(label, nil, vector)) + dataFields = append(dataFields, data.NewField(label, nil, vector)) } - frame.Meta = &data.FrameMeta{ - Custom: map[string]PrometheusQueryType{ - "resultType": "exemplar", - }, - } - - frames = append(frames, frame) - return frames + return append(frames, newDataFrame("exemplar", "exemplar", dataFields...)) } func deviation(values []float64) float64 { @@ -608,3 +593,14 @@ func deviation(values []float64) float64 { } return math.Sqrt(sd / (valuesLen - 1)) } + +func newDataFrame(name string, typ string, fields ...*data.Field) *data.Frame { + frame := data.NewFrame(name, fields...) + frame.Meta = &data.FrameMeta{ + Custom: map[string]string{ + "resultType": typ, + }, + } + + return frame +}