Prometheus: Refactor backend for scalability for other query types (#40921)

* Prometheus: Refactor backend for scalability for other query types

* Fix linting
This commit is contained in:
Ivana Huckova 2021-10-26 14:47:40 +02:00 committed by GitHub
parent 22b428836e
commit 92cd44940a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 510 additions and 481 deletions

View File

@ -5,37 +5,19 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"regexp"
"sort"
"strconv"
"strings"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
sdkhttpclient "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/infra/httpclient"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/plugins/backendplugin"
"github.com/grafana/grafana/pkg/plugins/backendplugin/coreplugin"
"github.com/grafana/grafana/pkg/tsdb/intervalv2"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/api"
apiv1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
)
// Internal interval and range variables
const (
varInterval = "$__interval"
varIntervalMs = "$__interval_ms"
varRange = "$__range"
varRangeS = "$__range_s"
varRangeMs = "$__range_ms"
varRateInterval = "$__rate_interval"
)
var (
@ -116,87 +98,26 @@ func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.Inst
}
}
//nolint: staticcheck // plugins.DataResponse deprecated
func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
if len(req.Queries) == 0 {
return &backend.QueryDataResponse{}, fmt.Errorf("query contains no queries")
}
q := req.Queries[0]
dsInfo, err := s.getDSInfo(req.PluginContext)
if err != nil {
return nil, err
}
client := dsInfo.promClient
result := backend.QueryDataResponse{
Responses: backend.Responses{},
var result *backend.QueryDataResponse
switch q.QueryType {
case "timeSeriesQuery":
fallthrough
default:
result, err = s.executeTimeSeriesQuery(ctx, req, dsInfo)
}
queries, err := s.parseQuery(req, dsInfo)
if err != nil {
return &result, err
}
for _, query := range queries {
plog.Debug("Sending query", "start", query.Start, "end", query.End, "step", query.Step, "query", query.Expr)
span, ctx := opentracing.StartSpanFromContext(ctx, "datasource.prometheus")
span.SetTag("expr", query.Expr)
span.SetTag("start_unixnano", query.Start.UnixNano())
span.SetTag("stop_unixnano", query.End.UnixNano())
defer span.Finish()
response := make(map[PrometheusQueryType]interface{})
timeRange := apiv1.Range{
Step: query.Step,
// Align query range to step. It rounds start and end down to a multiple of step.
Start: time.Unix(int64(math.Floor((float64(query.Start.Unix()+query.UtcOffsetSec)/query.Step.Seconds()))*query.Step.Seconds()-float64(query.UtcOffsetSec)), 0),
End: time.Unix(int64(math.Floor((float64(query.End.Unix()+query.UtcOffsetSec)/query.Step.Seconds()))*query.Step.Seconds()-float64(query.UtcOffsetSec)), 0),
}
if query.RangeQuery {
rangeResponse, _, err := client.QueryRange(ctx, query.Expr, timeRange)
if err != nil {
plog.Error("Range query", query.Expr, "failed with", err)
result.Responses[query.RefId] = backend.DataResponse{Error: err}
} else {
response[RangeQueryType] = rangeResponse
}
}
if query.InstantQuery {
instantResponse, _, err := client.Query(ctx, query.Expr, query.End)
if err != nil {
plog.Error("Instant query", query.Expr, "failed with", err)
result.Responses[query.RefId] = backend.DataResponse{Error: err}
} else {
response[InstantQueryType] = instantResponse
}
}
if query.ExemplarQuery {
exemplarResponse, err := client.QueryExemplars(ctx, query.Expr, timeRange.Start, timeRange.End)
if err != nil {
plog.Error("Exemplar query", query.Expr, "failed with", err)
result.Responses[query.RefId] = backend.DataResponse{Error: err}
} else {
response[ExemplarQueryType] = exemplarResponse
}
}
frames, err := parseResponse(response, query)
if err != nil {
return &result, err
}
result.Responses[query.RefId] = backend.DataResponse{
Frames: frames,
}
}
return &result, nil
return result, err
}
func createClient(url string, httpOpts sdkhttpclient.Options, clientProvider httpclient.Provider) (apiv1.API, error) {
@ -232,137 +153,6 @@ func (s *Service) getDSInfo(pluginCtx backend.PluginContext) (*DatasourceInfo, e
return &instance, nil
}
func formatLegend(metric model.Metric, query *PrometheusQuery) string {
var legend string
if query.LegendFormat == "" {
legend = metric.String()
} else {
result := legendFormat.ReplaceAllFunc([]byte(query.LegendFormat), func(in []byte) []byte {
labelName := strings.Replace(string(in), "{{", "", 1)
labelName = strings.Replace(labelName, "}}", "", 1)
labelName = strings.TrimSpace(labelName)
if val, exists := metric[model.LabelName(labelName)]; exists {
return []byte(val)
}
return []byte{}
})
legend = string(result)
}
// If legend is empty brackets, use query expression
if legend == "{}" {
legend = query.Expr
}
return legend
}
func (s *Service) parseQuery(queryContext *backend.QueryDataRequest, dsInfo *DatasourceInfo) ([]*PrometheusQuery, error) {
qs := []*PrometheusQuery{}
for _, query := range queryContext.Queries {
model := &QueryModel{}
err := json.Unmarshal(query.JSON, model)
if err != nil {
return nil, err
}
//Final interval value
var interval time.Duration
//Calculate interval
queryInterval := model.Interval
//If we are using variable or interval/step, we will replace it with calculated interval
if queryInterval == varInterval || queryInterval == varIntervalMs || queryInterval == varRateInterval {
queryInterval = ""
}
minInterval, err := intervalv2.GetIntervalFrom(dsInfo.TimeInterval, queryInterval, model.IntervalMS, 15*time.Second)
if err != nil {
return nil, err
}
calculatedInterval := s.intervalCalculator.Calculate(query.TimeRange, minInterval, query.MaxDataPoints)
safeInterval := s.intervalCalculator.CalculateSafeInterval(query.TimeRange, int64(safeRes))
adjustedInterval := safeInterval.Value
if calculatedInterval.Value > safeInterval.Value {
adjustedInterval = calculatedInterval.Value
}
if queryInterval == varRateInterval {
// Rate interval is final and is not affected by resolution
interval = calculateRateInterval(adjustedInterval, dsInfo.TimeInterval, s.intervalCalculator)
} else {
intervalFactor := model.IntervalFactor
if intervalFactor == 0 {
intervalFactor = 1
}
interval = time.Duration(int64(adjustedInterval) * intervalFactor)
}
intervalMs := int64(interval / time.Millisecond)
rangeS := query.TimeRange.To.Unix() - query.TimeRange.From.Unix()
// Interpolate variables in expr
expr := model.Expr
expr = strings.ReplaceAll(expr, varIntervalMs, strconv.FormatInt(intervalMs, 10))
expr = strings.ReplaceAll(expr, varInterval, intervalv2.FormatDuration(interval))
expr = strings.ReplaceAll(expr, varRangeMs, strconv.FormatInt(rangeS*1000, 10))
expr = strings.ReplaceAll(expr, varRangeS, strconv.FormatInt(rangeS, 10))
expr = strings.ReplaceAll(expr, varRange, strconv.FormatInt(rangeS, 10)+"s")
expr = strings.ReplaceAll(expr, varRateInterval, intervalv2.FormatDuration(calculateRateInterval(interval, dsInfo.TimeInterval, s.intervalCalculator)))
rangeQuery := model.RangeQuery
if !model.InstantQuery && !model.RangeQuery {
// In older dashboards, we were not setting range query param and !range && !instant was run as range query
rangeQuery = true
}
qs = append(qs, &PrometheusQuery{
Expr: expr,
Step: interval,
LegendFormat: model.LegendFormat,
Start: query.TimeRange.From,
End: query.TimeRange.To,
RefId: query.RefID,
InstantQuery: model.InstantQuery,
RangeQuery: rangeQuery,
ExemplarQuery: model.ExemplarQuery,
UtcOffsetSec: model.UtcOffsetSec,
})
}
return qs, nil
}
func parseResponse(value map[PrometheusQueryType]interface{}, query *PrometheusQuery) (data.Frames, error) {
var (
frames = data.Frames{}
nextFrames = data.Frames{}
)
for _, value := range value {
// Zero out the slice to prevent data corruption.
nextFrames = nextFrames[:0]
switch v := value.(type) {
case model.Matrix:
nextFrames = matrixToDataFrames(v, query, nextFrames)
case model.Vector:
nextFrames = vectorToDataFrames(v, query, nextFrames)
case *model.Scalar:
nextFrames = scalarToDataFrames(v, query, nextFrames)
case []apiv1.ExemplarQueryResult:
nextFrames = exemplarToDataFrames(v, query, nextFrames)
default:
plog.Error("Query", query.Expr, "returned unexpected result type", v)
continue
}
frames = append(frames, nextFrames...)
}
return frames, nil
}
// IsAPIError returns whether err is or wraps a Prometheus error.
func IsAPIError(err error) bool {
// Check if the right error type is in err's chain.
@ -377,230 +167,3 @@ func ConvertAPIError(err error) error {
}
return err
}
func calculateRateInterval(interval time.Duration, scrapeInterval string, intervalCalculator intervalv2.Calculator) time.Duration {
scrape := scrapeInterval
if scrape == "" {
scrape = "15s"
}
scrapeIntervalDuration, err := intervalv2.ParseIntervalStringToTimeDuration(scrape)
if err != nil {
return time.Duration(0)
}
rateInterval := time.Duration(int(math.Max(float64(interval+scrapeIntervalDuration), float64(4)*float64(scrapeIntervalDuration))))
return rateInterval
}
func matrixToDataFrames(matrix model.Matrix, query *PrometheusQuery, frames data.Frames) data.Frames {
for _, v := range matrix {
tags := make(map[string]string, len(v.Metric))
for k, v := range v.Metric {
tags[string(k)] = string(v)
}
timeField := data.NewFieldFromFieldType(data.FieldTypeTime, len(v.Values))
valueField := data.NewFieldFromFieldType(data.FieldTypeNullableFloat64, len(v.Values))
for i, k := range v.Values {
timeField.Set(i, time.Unix(k.Timestamp.Unix(), 0).UTC())
value := float64(k.Value)
if !math.IsNaN(value) {
valueField.Set(i, &value)
}
}
name := formatLegend(v.Metric, query)
timeField.Name = data.TimeSeriesTimeFieldName
valueField.Name = data.TimeSeriesValueFieldName
valueField.Config = &data.FieldConfig{DisplayNameFromDS: name}
valueField.Labels = tags
frames = append(frames, newDataFrame(name, "matrix", timeField, valueField))
}
return frames
}
func scalarToDataFrames(scalar *model.Scalar, query *PrometheusQuery, frames data.Frames) data.Frames {
timeVector := []time.Time{time.Unix(scalar.Timestamp.Unix(), 0).UTC()}
values := []float64{float64(scalar.Value)}
name := fmt.Sprintf("%g", values[0])
return append(
frames,
newDataFrame(
name,
"scalar",
data.NewField("Time", nil, timeVector),
data.NewField("Value", nil, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: name}),
),
)
}
func vectorToDataFrames(vector model.Vector, query *PrometheusQuery, frames data.Frames) data.Frames {
for _, v := range vector {
name := formatLegend(v.Metric, query)
tags := make(map[string]string, len(v.Metric))
timeVector := []time.Time{time.Unix(v.Timestamp.Unix(), 0).UTC()}
values := []float64{float64(v.Value)}
for k, v := range v.Metric {
tags[string(k)] = string(v)
}
frames = append(
frames,
newDataFrame(
name,
"vector",
data.NewField("Time", nil, timeVector),
data.NewField("Value", tags, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: name}),
),
)
}
return frames
}
func exemplarToDataFrames(response []apiv1.ExemplarQueryResult, query *PrometheusQuery, frames data.Frames) data.Frames {
// TODO: this preallocation is very naive.
// We should figure out a better approximation here.
events := make([]ExemplarEvent, 0, len(response)*2)
for _, exemplarData := range response {
for _, exemplar := range exemplarData.Exemplars {
event := ExemplarEvent{}
exemplarTime := time.Unix(exemplar.Timestamp.Unix(), 0).UTC()
event.Time = exemplarTime
event.Value = float64(exemplar.Value)
event.Labels = make(map[string]string)
for label, value := range exemplar.Labels {
event.Labels[string(label)] = string(value)
}
for seriesLabel, seriesValue := range exemplarData.SeriesLabels {
event.Labels[string(seriesLabel)] = string(seriesValue)
}
events = append(events, event)
}
}
// Sampling of exemplars
bucketedExemplars := make(map[string][]ExemplarEvent)
values := make([]float64, 0, len(events))
// Create bucketed exemplars based on aligned timestamp
for _, event := range events {
alignedTs := fmt.Sprintf("%.0f", math.Floor(float64(event.Time.Unix())/query.Step.Seconds())*query.Step.Seconds())
_, ok := bucketedExemplars[alignedTs]
if !ok {
bucketedExemplars[alignedTs] = make([]ExemplarEvent, 0)
}
bucketedExemplars[alignedTs] = append(bucketedExemplars[alignedTs], event)
values = append(values, event.Value)
}
// Calculate standard deviation
standardDeviation := deviation(values)
// Create slice with all of the bucketed exemplars
sampledBuckets := make([]string, len(bucketedExemplars))
for bucketTimes := range bucketedExemplars {
sampledBuckets = append(sampledBuckets, bucketTimes)
}
sort.Strings(sampledBuckets)
// Sample exemplars based ona value, so we are not showing too many of them
sampleExemplars := make([]ExemplarEvent, 0, len(sampledBuckets))
for _, bucket := range sampledBuckets {
exemplarsInBucket := bucketedExemplars[bucket]
if len(exemplarsInBucket) == 1 {
sampleExemplars = append(sampleExemplars, exemplarsInBucket[0])
} else {
bucketValues := make([]float64, len(exemplarsInBucket))
for _, exemplar := range exemplarsInBucket {
bucketValues = append(bucketValues, exemplar.Value)
}
sort.Slice(bucketValues, func(i, j int) bool {
return bucketValues[i] > bucketValues[j]
})
sampledBucketValues := make([]float64, 0)
for _, value := range bucketValues {
if len(sampledBucketValues) == 0 {
sampledBucketValues = append(sampledBucketValues, value)
} else {
// Then take values only when at least 2 standard deviation distance to previously taken value
prev := sampledBucketValues[len(sampledBucketValues)-1]
if standardDeviation != 0 && prev-value >= float64(2)*standardDeviation {
sampledBucketValues = append(sampledBucketValues, value)
}
}
}
for _, valueBucket := range sampledBucketValues {
for _, exemplar := range exemplarsInBucket {
if exemplar.Value == valueBucket {
sampleExemplars = append(sampleExemplars, exemplar)
}
}
}
}
}
// Create DF from sampled exemplars
timeField := data.NewFieldFromFieldType(data.FieldTypeTime, len(sampleExemplars))
timeField.Name = "Time"
valueField := data.NewFieldFromFieldType(data.FieldTypeFloat64, len(sampleExemplars))
valueField.Name = "Value"
labelsVector := make(map[string][]string, len(sampleExemplars))
for i, exemplar := range sampleExemplars {
timeField.Set(i, exemplar.Time)
valueField.Set(i, exemplar.Value)
for label, value := range exemplar.Labels {
if labelsVector[label] == nil {
labelsVector[label] = make([]string, 0)
}
labelsVector[label] = append(labelsVector[label], value)
}
}
dataFields := make([]*data.Field, 0, len(labelsVector)+2)
dataFields = append(dataFields, timeField, valueField)
for label, vector := range labelsVector {
dataFields = append(dataFields, data.NewField(label, nil, vector))
}
return append(frames, newDataFrame("exemplar", "exemplar", dataFields...))
}
func deviation(values []float64) float64 {
var sum, mean, sd float64
valuesLen := float64(len(values))
for _, value := range values {
sum += value
}
mean = sum / valuesLen
for j := 0; j < len(values); j++ {
sd += math.Pow(values[j]-mean, 2)
}
return math.Sqrt(sd / (valuesLen - 1))
}
func newDataFrame(name string, typ string, fields ...*data.Field) *data.Frame {
frame := data.NewFrame(name, fields...)
frame.Meta = &data.FrameMeta{
Custom: map[string]string{
"resultType": typ,
},
}
return frame
}

View File

@ -0,0 +1,468 @@
package prometheus
import (
"context"
"encoding/json"
"fmt"
"math"
"sort"
"strconv"
"strings"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/tsdb/intervalv2"
"github.com/opentracing/opentracing-go"
apiv1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
)
// Internal interval and range variables
const (
varInterval = "$__interval"
varIntervalMs = "$__interval_ms"
varRange = "$__range"
varRangeS = "$__range_s"
varRangeMs = "$__range_ms"
varRateInterval = "$__rate_interval"
)
type TimeSeriesQueryType string
const (
RangeQueryType TimeSeriesQueryType = "range"
InstantQueryType TimeSeriesQueryType = "instant"
ExemplarQueryType TimeSeriesQueryType = "exemplar"
)
func (s *Service) executeTimeSeriesQuery(ctx context.Context, req *backend.QueryDataRequest, dsInfo *DatasourceInfo) (*backend.QueryDataResponse, error) {
client := dsInfo.promClient
result := backend.QueryDataResponse{
Responses: backend.Responses{},
}
queries, err := s.parseTimeSeriesQuery(req, dsInfo)
if err != nil {
return &result, err
}
for _, query := range queries {
plog.Debug("Sending query", "start", query.Start, "end", query.End, "step", query.Step, "query", query.Expr)
span, ctx := opentracing.StartSpanFromContext(ctx, "datasource.prometheus")
span.SetTag("expr", query.Expr)
span.SetTag("start_unixnano", query.Start.UnixNano())
span.SetTag("stop_unixnano", query.End.UnixNano())
defer span.Finish()
response := make(map[TimeSeriesQueryType]interface{})
timeRange := apiv1.Range{
Step: query.Step,
// Align query range to step. It rounds start and end down to a multiple of step.
Start: time.Unix(int64(math.Floor((float64(query.Start.Unix()+query.UtcOffsetSec)/query.Step.Seconds()))*query.Step.Seconds()-float64(query.UtcOffsetSec)), 0),
End: time.Unix(int64(math.Floor((float64(query.End.Unix()+query.UtcOffsetSec)/query.Step.Seconds()))*query.Step.Seconds()-float64(query.UtcOffsetSec)), 0),
}
if query.RangeQuery {
rangeResponse, _, err := client.QueryRange(ctx, query.Expr, timeRange)
if err != nil {
plog.Error("Range query", query.Expr, "failed with", err)
result.Responses[query.RefId] = backend.DataResponse{Error: err}
} else {
response[RangeQueryType] = rangeResponse
}
}
if query.InstantQuery {
instantResponse, _, err := client.Query(ctx, query.Expr, query.End)
if err != nil {
plog.Error("Instant query", query.Expr, "failed with", err)
result.Responses[query.RefId] = backend.DataResponse{Error: err}
} else {
response[InstantQueryType] = instantResponse
}
}
if query.ExemplarQuery {
exemplarResponse, err := client.QueryExemplars(ctx, query.Expr, timeRange.Start, timeRange.End)
if err != nil {
plog.Error("Exemplar query", query.Expr, "failed with", err)
result.Responses[query.RefId] = backend.DataResponse{Error: err}
} else {
response[ExemplarQueryType] = exemplarResponse
}
}
frames, err := parseTimeSeriesResponse(response, query)
if err != nil {
return &result, err
}
result.Responses[query.RefId] = backend.DataResponse{
Frames: frames,
}
}
return &result, nil
}
func formatLegend(metric model.Metric, query *PrometheusQuery) string {
var legend string
if query.LegendFormat == "" {
legend = metric.String()
} else {
result := legendFormat.ReplaceAllFunc([]byte(query.LegendFormat), func(in []byte) []byte {
labelName := strings.Replace(string(in), "{{", "", 1)
labelName = strings.Replace(labelName, "}}", "", 1)
labelName = strings.TrimSpace(labelName)
if val, exists := metric[model.LabelName(labelName)]; exists {
return []byte(val)
}
return []byte{}
})
legend = string(result)
}
// If legend is empty brackets, use query expression
if legend == "{}" {
legend = query.Expr
}
return legend
}
func (s *Service) parseTimeSeriesQuery(queryContext *backend.QueryDataRequest, dsInfo *DatasourceInfo) ([]*PrometheusQuery, error) {
qs := []*PrometheusQuery{}
for _, query := range queryContext.Queries {
model := &QueryModel{}
err := json.Unmarshal(query.JSON, model)
if err != nil {
return nil, err
}
//Final interval value
var interval time.Duration
//Calculate interval
queryInterval := model.Interval
//If we are using variable or interval/step, we will replace it with calculated interval
if queryInterval == varInterval || queryInterval == varIntervalMs || queryInterval == varRateInterval {
queryInterval = ""
}
minInterval, err := intervalv2.GetIntervalFrom(dsInfo.TimeInterval, queryInterval, model.IntervalMS, 15*time.Second)
if err != nil {
return nil, err
}
calculatedInterval := s.intervalCalculator.Calculate(query.TimeRange, minInterval, query.MaxDataPoints)
safeInterval := s.intervalCalculator.CalculateSafeInterval(query.TimeRange, int64(safeRes))
adjustedInterval := safeInterval.Value
if calculatedInterval.Value > safeInterval.Value {
adjustedInterval = calculatedInterval.Value
}
if queryInterval == varRateInterval {
// Rate interval is final and is not affected by resolution
interval = calculateRateInterval(adjustedInterval, dsInfo.TimeInterval, s.intervalCalculator)
} else {
intervalFactor := model.IntervalFactor
if intervalFactor == 0 {
intervalFactor = 1
}
interval = time.Duration(int64(adjustedInterval) * intervalFactor)
}
intervalMs := int64(interval / time.Millisecond)
rangeS := query.TimeRange.To.Unix() - query.TimeRange.From.Unix()
// Interpolate variables in expr
expr := model.Expr
expr = strings.ReplaceAll(expr, varIntervalMs, strconv.FormatInt(intervalMs, 10))
expr = strings.ReplaceAll(expr, varInterval, intervalv2.FormatDuration(interval))
expr = strings.ReplaceAll(expr, varRangeMs, strconv.FormatInt(rangeS*1000, 10))
expr = strings.ReplaceAll(expr, varRangeS, strconv.FormatInt(rangeS, 10))
expr = strings.ReplaceAll(expr, varRange, strconv.FormatInt(rangeS, 10)+"s")
expr = strings.ReplaceAll(expr, varRateInterval, intervalv2.FormatDuration(calculateRateInterval(interval, dsInfo.TimeInterval, s.intervalCalculator)))
rangeQuery := model.RangeQuery
if !model.InstantQuery && !model.RangeQuery {
// In older dashboards, we were not setting range query param and !range && !instant was run as range query
rangeQuery = true
}
qs = append(qs, &PrometheusQuery{
Expr: expr,
Step: interval,
LegendFormat: model.LegendFormat,
Start: query.TimeRange.From,
End: query.TimeRange.To,
RefId: query.RefID,
InstantQuery: model.InstantQuery,
RangeQuery: rangeQuery,
ExemplarQuery: model.ExemplarQuery,
UtcOffsetSec: model.UtcOffsetSec,
})
}
return qs, nil
}
func parseTimeSeriesResponse(value map[TimeSeriesQueryType]interface{}, query *PrometheusQuery) (data.Frames, error) {
var (
frames = data.Frames{}
nextFrames = data.Frames{}
)
for _, value := range value {
// Zero out the slice to prevent data corruption.
nextFrames = nextFrames[:0]
switch v := value.(type) {
case model.Matrix:
nextFrames = matrixToDataFrames(v, query, nextFrames)
case model.Vector:
nextFrames = vectorToDataFrames(v, query, nextFrames)
case *model.Scalar:
nextFrames = scalarToDataFrames(v, query, nextFrames)
case []apiv1.ExemplarQueryResult:
nextFrames = exemplarToDataFrames(v, query, nextFrames)
default:
plog.Error("Query", query.Expr, "returned unexpected result type", v)
continue
}
frames = append(frames, nextFrames...)
}
return frames, nil
}
func calculateRateInterval(interval time.Duration, scrapeInterval string, intervalCalculator intervalv2.Calculator) time.Duration {
scrape := scrapeInterval
if scrape == "" {
scrape = "15s"
}
scrapeIntervalDuration, err := intervalv2.ParseIntervalStringToTimeDuration(scrape)
if err != nil {
return time.Duration(0)
}
rateInterval := time.Duration(int(math.Max(float64(interval+scrapeIntervalDuration), float64(4)*float64(scrapeIntervalDuration))))
return rateInterval
}
func matrixToDataFrames(matrix model.Matrix, query *PrometheusQuery, frames data.Frames) data.Frames {
for _, v := range matrix {
tags := make(map[string]string, len(v.Metric))
for k, v := range v.Metric {
tags[string(k)] = string(v)
}
timeField := data.NewFieldFromFieldType(data.FieldTypeTime, len(v.Values))
valueField := data.NewFieldFromFieldType(data.FieldTypeNullableFloat64, len(v.Values))
for i, k := range v.Values {
timeField.Set(i, time.Unix(k.Timestamp.Unix(), 0).UTC())
value := float64(k.Value)
if !math.IsNaN(value) {
valueField.Set(i, &value)
}
}
name := formatLegend(v.Metric, query)
timeField.Name = data.TimeSeriesTimeFieldName
valueField.Name = data.TimeSeriesValueFieldName
valueField.Config = &data.FieldConfig{DisplayNameFromDS: name}
valueField.Labels = tags
frames = append(frames, newDataFrame(name, "matrix", timeField, valueField))
}
return frames
}
func scalarToDataFrames(scalar *model.Scalar, query *PrometheusQuery, frames data.Frames) data.Frames {
timeVector := []time.Time{time.Unix(scalar.Timestamp.Unix(), 0).UTC()}
values := []float64{float64(scalar.Value)}
name := fmt.Sprintf("%g", values[0])
return append(
frames,
newDataFrame(
name,
"scalar",
data.NewField("Time", nil, timeVector),
data.NewField("Value", nil, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: name}),
),
)
}
func vectorToDataFrames(vector model.Vector, query *PrometheusQuery, frames data.Frames) data.Frames {
for _, v := range vector {
name := formatLegend(v.Metric, query)
tags := make(map[string]string, len(v.Metric))
timeVector := []time.Time{time.Unix(v.Timestamp.Unix(), 0).UTC()}
values := []float64{float64(v.Value)}
for k, v := range v.Metric {
tags[string(k)] = string(v)
}
frames = append(
frames,
newDataFrame(
name,
"vector",
data.NewField("Time", nil, timeVector),
data.NewField("Value", tags, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: name}),
),
)
}
return frames
}
func exemplarToDataFrames(response []apiv1.ExemplarQueryResult, query *PrometheusQuery, frames data.Frames) data.Frames {
// TODO: this preallocation is very naive.
// We should figure out a better approximation here.
events := make([]ExemplarEvent, 0, len(response)*2)
for _, exemplarData := range response {
for _, exemplar := range exemplarData.Exemplars {
event := ExemplarEvent{}
exemplarTime := time.Unix(exemplar.Timestamp.Unix(), 0).UTC()
event.Time = exemplarTime
event.Value = float64(exemplar.Value)
event.Labels = make(map[string]string)
for label, value := range exemplar.Labels {
event.Labels[string(label)] = string(value)
}
for seriesLabel, seriesValue := range exemplarData.SeriesLabels {
event.Labels[string(seriesLabel)] = string(seriesValue)
}
events = append(events, event)
}
}
// Sampling of exemplars
bucketedExemplars := make(map[string][]ExemplarEvent)
values := make([]float64, 0, len(events))
// Create bucketed exemplars based on aligned timestamp
for _, event := range events {
alignedTs := fmt.Sprintf("%.0f", math.Floor(float64(event.Time.Unix())/query.Step.Seconds())*query.Step.Seconds())
_, ok := bucketedExemplars[alignedTs]
if !ok {
bucketedExemplars[alignedTs] = make([]ExemplarEvent, 0)
}
bucketedExemplars[alignedTs] = append(bucketedExemplars[alignedTs], event)
values = append(values, event.Value)
}
// Calculate standard deviation
standardDeviation := deviation(values)
// Create slice with all of the bucketed exemplars
sampledBuckets := make([]string, len(bucketedExemplars))
for bucketTimes := range bucketedExemplars {
sampledBuckets = append(sampledBuckets, bucketTimes)
}
sort.Strings(sampledBuckets)
// Sample exemplars based ona value, so we are not showing too many of them
sampleExemplars := make([]ExemplarEvent, 0, len(sampledBuckets))
for _, bucket := range sampledBuckets {
exemplarsInBucket := bucketedExemplars[bucket]
if len(exemplarsInBucket) == 1 {
sampleExemplars = append(sampleExemplars, exemplarsInBucket[0])
} else {
bucketValues := make([]float64, len(exemplarsInBucket))
for _, exemplar := range exemplarsInBucket {
bucketValues = append(bucketValues, exemplar.Value)
}
sort.Slice(bucketValues, func(i, j int) bool {
return bucketValues[i] > bucketValues[j]
})
sampledBucketValues := make([]float64, 0)
for _, value := range bucketValues {
if len(sampledBucketValues) == 0 {
sampledBucketValues = append(sampledBucketValues, value)
} else {
// Then take values only when at least 2 standard deviation distance to previously taken value
prev := sampledBucketValues[len(sampledBucketValues)-1]
if standardDeviation != 0 && prev-value >= float64(2)*standardDeviation {
sampledBucketValues = append(sampledBucketValues, value)
}
}
}
for _, valueBucket := range sampledBucketValues {
for _, exemplar := range exemplarsInBucket {
if exemplar.Value == valueBucket {
sampleExemplars = append(sampleExemplars, exemplar)
}
}
}
}
}
// Create DF from sampled exemplars
timeField := data.NewFieldFromFieldType(data.FieldTypeTime, len(sampleExemplars))
timeField.Name = "Time"
valueField := data.NewFieldFromFieldType(data.FieldTypeFloat64, len(sampleExemplars))
valueField.Name = "Value"
labelsVector := make(map[string][]string, len(sampleExemplars))
for i, exemplar := range sampleExemplars {
timeField.Set(i, exemplar.Time)
valueField.Set(i, exemplar.Value)
for label, value := range exemplar.Labels {
if labelsVector[label] == nil {
labelsVector[label] = make([]string, 0)
}
labelsVector[label] = append(labelsVector[label], value)
}
}
dataFields := make([]*data.Field, 0, len(labelsVector)+2)
dataFields = append(dataFields, timeField, valueField)
for label, vector := range labelsVector {
dataFields = append(dataFields, data.NewField(label, nil, vector))
}
return append(frames, newDataFrame("exemplar", "exemplar", dataFields...))
}
func deviation(values []float64) float64 {
var sum, mean, sd float64
valuesLen := float64(len(values))
for _, value := range values {
sum += value
}
mean = sum / valuesLen
for j := 0; j < len(values); j++ {
sd += math.Pow(values[j]-mean, 2)
}
return math.Sqrt(sd / (valuesLen - 1))
}
func newDataFrame(name string, typ string, fields ...*data.Field) *data.Frame {
frame := data.NewFrame(name, fields...)
frame.Meta = &data.FrameMeta{
Custom: map[string]string{
"resultType": typ,
},
}
return frame
}

View File

@ -14,7 +14,7 @@ import (
var now = time.Now()
func TestPrometheus_formatLeged(t *testing.T) {
func TestPrometheus_timeSeriesQuery_formatLeged(t *testing.T) {
t.Run("converting metric name", func(t *testing.T) {
metric := map[p.LabelName]p.LabelValue{
p.LabelName("app"): p.LabelValue("backend"),
@ -54,7 +54,7 @@ func TestPrometheus_formatLeged(t *testing.T) {
})
}
func TestPrometheus_parseQuery(t *testing.T) {
func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) {
service := Service{
intervalCalculator: intervalv2.NewCalculator(),
}
@ -72,7 +72,7 @@ func TestPrometheus_parseQuery(t *testing.T) {
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseQuery(query, dsInfo)
models, err := service.parseTimeSeriesQuery(query, dsInfo)
require.NoError(t, err)
require.Equal(t, time.Second*30, models[0].Step)
})
@ -91,7 +91,7 @@ func TestPrometheus_parseQuery(t *testing.T) {
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseQuery(query, dsInfo)
models, err := service.parseTimeSeriesQuery(query, dsInfo)
require.NoError(t, err)
require.Equal(t, time.Second*15, models[0].Step)
})
@ -110,7 +110,7 @@ func TestPrometheus_parseQuery(t *testing.T) {
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseQuery(query, dsInfo)
models, err := service.parseTimeSeriesQuery(query, dsInfo)
require.NoError(t, err)
require.Equal(t, time.Minute*20, models[0].Step)
})
@ -129,7 +129,7 @@ func TestPrometheus_parseQuery(t *testing.T) {
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseQuery(query, dsInfo)
models, err := service.parseTimeSeriesQuery(query, dsInfo)
require.NoError(t, err)
require.Equal(t, time.Minute*2, models[0].Step)
})
@ -150,7 +150,7 @@ func TestPrometheus_parseQuery(t *testing.T) {
dsInfo := &DatasourceInfo{
TimeInterval: "240s",
}
models, err := service.parseQuery(query, dsInfo)
models, err := service.parseTimeSeriesQuery(query, dsInfo)
require.NoError(t, err)
require.Equal(t, time.Minute*4, models[0].Step)
})
@ -169,7 +169,7 @@ func TestPrometheus_parseQuery(t *testing.T) {
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseQuery(query, dsInfo)
models, err := service.parseTimeSeriesQuery(query, dsInfo)
require.NoError(t, err)
require.Equal(t, "rate(ALERTS{job=\"test\" [2m]})", models[0].Expr)
})
@ -188,7 +188,7 @@ func TestPrometheus_parseQuery(t *testing.T) {
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseQuery(query, dsInfo)
models, err := service.parseTimeSeriesQuery(query, dsInfo)
require.NoError(t, err)
require.Equal(t, "rate(ALERTS{job=\"test\" [120000]})", models[0].Expr)
})
@ -207,7 +207,7 @@ func TestPrometheus_parseQuery(t *testing.T) {
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseQuery(query, dsInfo)
models, err := service.parseTimeSeriesQuery(query, dsInfo)
require.NoError(t, err)
require.Equal(t, "rate(ALERTS{job=\"test\" [120000]}) + rate(ALERTS{job=\"test\" [2m]})", models[0].Expr)
})
@ -226,7 +226,7 @@ func TestPrometheus_parseQuery(t *testing.T) {
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseQuery(query, dsInfo)
models, err := service.parseTimeSeriesQuery(query, dsInfo)
require.NoError(t, err)
require.Equal(t, "rate(ALERTS{job=\"test\" [172800s]})", models[0].Expr)
})
@ -245,7 +245,7 @@ func TestPrometheus_parseQuery(t *testing.T) {
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseQuery(query, dsInfo)
models, err := service.parseTimeSeriesQuery(query, dsInfo)
require.NoError(t, err)
require.Equal(t, "rate(ALERTS{job=\"test\" [172800]})", models[0].Expr)
})
@ -264,7 +264,7 @@ func TestPrometheus_parseQuery(t *testing.T) {
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseQuery(query, dsInfo)
models, err := service.parseTimeSeriesQuery(query, dsInfo)
require.NoError(t, err)
require.Equal(t, "rate(ALERTS{job=\"test\" [172800000]})", models[0].Expr)
})
@ -283,7 +283,7 @@ func TestPrometheus_parseQuery(t *testing.T) {
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseQuery(query, dsInfo)
models, err := service.parseTimeSeriesQuery(query, dsInfo)
require.NoError(t, err)
require.Equal(t, "rate(ALERTS{job=\"test\" [1m]})", models[0].Expr)
})
@ -303,7 +303,7 @@ func TestPrometheus_parseQuery(t *testing.T) {
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseQuery(query, dsInfo)
models, err := service.parseTimeSeriesQuery(query, dsInfo)
require.NoError(t, err)
require.Equal(t, true, models[0].RangeQuery)
})
@ -324,7 +324,7 @@ func TestPrometheus_parseQuery(t *testing.T) {
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseQuery(query, dsInfo)
models, err := service.parseTimeSeriesQuery(query, dsInfo)
require.NoError(t, err)
require.Equal(t, true, models[0].RangeQuery)
require.Equal(t, true, models[0].InstantQuery)
@ -344,15 +344,15 @@ func TestPrometheus_parseQuery(t *testing.T) {
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseQuery(query, dsInfo)
models, err := service.parseTimeSeriesQuery(query, dsInfo)
require.NoError(t, err)
require.Equal(t, true, models[0].RangeQuery)
})
}
func TestPrometheus_parseResponse(t *testing.T) {
func TestPrometheus_parseTimeSeriesResponse(t *testing.T) {
t.Run("exemplars response should be sampled and parsed normally", func(t *testing.T) {
value := make(map[PrometheusQueryType]interface{})
value := make(map[TimeSeriesQueryType]interface{})
exemplars := []apiv1.ExemplarQueryResult{
{
SeriesLabels: p.LabelSet{
@ -389,7 +389,7 @@ func TestPrometheus_parseResponse(t *testing.T) {
query := &PrometheusQuery{
LegendFormat: "legend {{app}}",
}
res, err := parseResponse(value, query)
res, err := parseTimeSeriesResponse(value, query)
require.NoError(t, err)
// Test fields
@ -413,7 +413,7 @@ func TestPrometheus_parseResponse(t *testing.T) {
{Value: 4, Timestamp: 4000},
{Value: 5, Timestamp: 5000},
}
value := make(map[PrometheusQueryType]interface{})
value := make(map[TimeSeriesQueryType]interface{})
value[RangeQueryType] = p.Matrix{
&p.SampleStream{
Metric: p.Metric{"app": "Application", "tag2": "tag2"},
@ -423,7 +423,7 @@ func TestPrometheus_parseResponse(t *testing.T) {
query := &PrometheusQuery{
LegendFormat: "legend {{app}}",
}
res, err := parseResponse(value, query)
res, err := parseTimeSeriesResponse(value, query)
require.NoError(t, err)
require.Len(t, res, 1)
@ -442,7 +442,7 @@ func TestPrometheus_parseResponse(t *testing.T) {
})
t.Run("matrix response with NaN value should be changed to null", func(t *testing.T) {
value := make(map[PrometheusQueryType]interface{})
value := make(map[TimeSeriesQueryType]interface{})
value[RangeQueryType] = p.Matrix{
&p.SampleStream{
Metric: p.Metric{"app": "Application"},
@ -454,7 +454,7 @@ func TestPrometheus_parseResponse(t *testing.T) {
query := &PrometheusQuery{
LegendFormat: "",
}
res, err := parseResponse(value, query)
res, err := parseTimeSeriesResponse(value, query)
require.NoError(t, err)
var nilPointer *float64
@ -463,7 +463,7 @@ func TestPrometheus_parseResponse(t *testing.T) {
})
t.Run("vector response should be parsed normally", func(t *testing.T) {
value := make(map[PrometheusQueryType]interface{})
value := make(map[TimeSeriesQueryType]interface{})
value[RangeQueryType] = p.Vector{
&p.Sample{
Metric: p.Metric{"app": "Application", "tag2": "tag2"},
@ -474,7 +474,7 @@ func TestPrometheus_parseResponse(t *testing.T) {
query := &PrometheusQuery{
LegendFormat: "legend {{app}}",
}
res, err := parseResponse(value, query)
res, err := parseTimeSeriesResponse(value, query)
require.NoError(t, err)
require.Len(t, res, 1)
@ -494,14 +494,14 @@ func TestPrometheus_parseResponse(t *testing.T) {
})
t.Run("scalar response should be parsed normally", func(t *testing.T) {
value := make(map[PrometheusQueryType]interface{})
value := make(map[TimeSeriesQueryType]interface{})
value[RangeQueryType] = &p.Scalar{
Value: 1,
Timestamp: 1000,
}
query := &PrometheusQuery{}
res, err := parseResponse(value, query)
res, err := parseTimeSeriesResponse(value, query)
require.NoError(t, err)
require.Len(t, res, 1)

View File

@ -45,11 +45,3 @@ type QueryModel struct {
IntervalFactor int64 `json:"intervalFactor"`
UtcOffsetSec int64 `json:"utcOffsetSec"`
}
type PrometheusQueryType string
const (
RangeQueryType PrometheusQueryType = "range"
InstantQueryType PrometheusQueryType = "instant"
ExemplarQueryType PrometheusQueryType = "exemplar"
)

View File

@ -36,6 +36,7 @@ import {
PromOptions,
PromQuery,
PromQueryRequest,
PromQueryType,
PromScalarData,
PromVectorData,
} from './types';
@ -309,6 +310,7 @@ export class PrometheusDatasource extends DataSourceWithBackend<PromQuery, PromO
processTargetV2(target: PromQuery, request: DataQueryRequest<PromQuery>) {
const processedTarget = {
...target,
queryType: PromQueryType.timeSeriesQuery,
exemplar: this.shouldRunExemplarQuery(target),
requestId: request.panelId + target.refId,
// We need to pass utcOffsetSec to backend to calculate aligned range

View File

@ -29,6 +29,10 @@ export interface PromOptions extends DataSourceJsonData {
exemplarTraceIdDestinations?: ExemplarTraceIdDestination[];
}
export enum PromQueryType {
timeSeriesQuery = 'timeSeriesQuery',
}
export type ExemplarTraceIdDestination = {
name: string;
url?: string;