Prometheus: Fix some of the alerting queries that use reduce/math operation (#44380)

* Prometheus: Dont fill response with nulls for alerting queries

* Refactor based on reviews
This commit is contained in:
Ivana Huckova 2022-01-31 18:14:30 +01:00 committed by GitHub
parent 5e2280ceee
commit 780591cc12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 85 additions and 14 deletions

View File

@ -132,5 +132,5 @@ func runQuery(response []byte, query PrometheusQuery) (*backend.QueryDataRespons
}
s := Service{tracer: tracer}
return s.runQueries(context.Background(), api, []*PrometheusQuery{&query})
return s.runQueries(context.Background(), api, []*PrometheusQuery{&query}, true)
}

View File

@ -28,7 +28,7 @@ func BenchmarkJson(b *testing.B) {
b.ResetTimer()
for n := 0; n < b.N; n++ {
_, _ = s.runQueries(context.Background(), api, []*PrometheusQuery{&query})
_, _ = s.runQueries(context.Background(), api, []*PrometheusQuery{&query}, true)
}
}

View File

@ -47,7 +47,7 @@ const (
ExemplarQueryType TimeSeriesQueryType = "exemplar"
)
func (s *Service) runQueries(ctx context.Context, client apiv1.API, queries []*PrometheusQuery) (*backend.QueryDataResponse, error) {
func (s *Service) runQueries(ctx context.Context, client apiv1.API, queries []*PrometheusQuery, fillNulls bool) (*backend.QueryDataResponse, error) {
result := backend.QueryDataResponse{
Responses: backend.Responses{},
}
@ -101,7 +101,7 @@ func (s *Service) runQueries(ctx context.Context, client apiv1.API, queries []*P
}
}
frames, err := parseTimeSeriesResponse(response, query)
frames, err := parseTimeSeriesResponse(response, query, fillNulls)
if err != nil {
return &result, err
}
@ -128,7 +128,12 @@ func (s *Service) executeTimeSeriesQuery(ctx context.Context, req *backend.Query
return &result, err
}
return s.runQueries(ctx, client, queries)
fillNulls := true
if req.Headers["FromAlert"] == "true" {
fillNulls = false
}
return s.runQueries(ctx, client, queries, fillNulls)
}
func formatLegend(metric model.Metric, query *PrometheusQuery) string {
@ -202,7 +207,7 @@ func (s *Service) parseTimeSeriesQuery(queryContext *backend.QueryDataRequest, d
return qs, nil
}
func parseTimeSeriesResponse(value map[TimeSeriesQueryType]interface{}, query *PrometheusQuery) (data.Frames, error) {
func parseTimeSeriesResponse(value map[TimeSeriesQueryType]interface{}, query *PrometheusQuery, fillNulls bool) (data.Frames, error) {
var (
frames = data.Frames{}
nextFrames = data.Frames{}
@ -214,7 +219,11 @@ func parseTimeSeriesResponse(value map[TimeSeriesQueryType]interface{}, query *P
switch v := value.(type) {
case model.Matrix:
nextFrames = matrixToDataFrames(v, query, nextFrames)
if fillNulls {
nextFrames = matrixToDataFramesWithNullFill(v, query, nextFrames)
} else {
nextFrames = matrixToDataFrames(v, query, nextFrames)
}
case model.Vector:
nextFrames = vectorToDataFrames(v, query, nextFrames)
case *model.Scalar:
@ -308,7 +317,7 @@ func interpolateVariables(model *QueryModel, interval time.Duration, timeRange t
return expr
}
func matrixToDataFrames(matrix model.Matrix, query *PrometheusQuery, frames data.Frames) data.Frames {
func matrixToDataFramesWithNullFill(matrix model.Matrix, query *PrometheusQuery, frames data.Frames) data.Frames {
for _, v := range matrix {
tags := make(map[string]string, len(v.Metric))
for k, v := range v.Metric {
@ -358,6 +367,35 @@ func matrixToDataFrames(matrix model.Matrix, query *PrometheusQuery, frames data
return frames
}
func matrixToDataFrames(matrix model.Matrix, query *PrometheusQuery, frames data.Frames) data.Frames {
for _, v := range matrix {
tags := make(map[string]string, len(v.Metric))
for k, v := range v.Metric {
tags[string(k)] = string(v)
}
timeField := data.NewFieldFromFieldType(data.FieldTypeTime, len(v.Values))
valueField := data.NewFieldFromFieldType(data.FieldTypeNullableFloat64, len(v.Values))
for i, k := range v.Values {
timeField.Set(i, time.Unix(k.Timestamp.Unix(), 0).UTC())
value := float64(k.Value)
if !math.IsNaN(value) {
valueField.Set(i, &value)
}
}
name := formatLegend(v.Metric, query)
timeField.Name = data.TimeSeriesTimeFieldName
valueField.Name = data.TimeSeriesValueFieldName
valueField.Config = &data.FieldConfig{DisplayNameFromDS: name}
valueField.Labels = tags
frames = append(frames, newDataFrame(name, "matrix", timeField, valueField))
}
return frames
}
func scalarToDataFrames(scalar *model.Scalar, query *PrometheusQuery, frames data.Frames) data.Frames {
timeVector := []time.Time{time.Unix(scalar.Timestamp.Unix(), 0).UTC()}
values := []float64{float64(scalar.Value)}

View File

@ -556,7 +556,7 @@ func TestPrometheus_parseTimeSeriesResponse(t *testing.T) {
query := &PrometheusQuery{
LegendFormat: "legend {{app}}",
}
res, err := parseTimeSeriesResponse(value, query)
res, err := parseTimeSeriesResponse(value, query, true)
require.NoError(t, err)
// Test fields
@ -594,7 +594,7 @@ func TestPrometheus_parseTimeSeriesResponse(t *testing.T) {
End: time.Unix(5, 0).UTC(),
UtcOffsetSec: 0,
}
res, err := parseTimeSeriesResponse(value, query)
res, err := parseTimeSeriesResponse(value, query, true)
require.NoError(t, err)
require.Len(t, res, 1)
@ -631,7 +631,7 @@ func TestPrometheus_parseTimeSeriesResponse(t *testing.T) {
End: time.Unix(4, 0).UTC(),
UtcOffsetSec: 0,
}
res, err := parseTimeSeriesResponse(value, query)
res, err := parseTimeSeriesResponse(value, query, true)
require.NoError(t, err)
require.Len(t, res, 1)
@ -643,6 +643,39 @@ func TestPrometheus_parseTimeSeriesResponse(t *testing.T) {
require.Nil(t, res[0].Fields[1].At(2))
})
t.Run("matrix response with from alerting missed data points should be parsed correctly", func(t *testing.T) {
values := []p.SamplePair{
{Value: 1, Timestamp: 1000},
{Value: 4, Timestamp: 4000},
}
value := make(map[TimeSeriesQueryType]interface{})
value[RangeQueryType] = p.Matrix{
&p.SampleStream{
Metric: p.Metric{"app": "Application", "tag2": "tag2"},
Values: values,
},
}
query := &PrometheusQuery{
LegendFormat: "",
Step: 1 * time.Second,
Start: time.Unix(1, 0).UTC(),
End: time.Unix(4, 0).UTC(),
UtcOffsetSec: 0,
}
res, err := parseTimeSeriesResponse(value, query, false)
require.NoError(t, err)
require.Len(t, res, 1)
require.Equal(t, res[0].Name, "{app=\"Application\", tag2=\"tag2\"}")
require.Len(t, res[0].Fields, 2)
require.Len(t, res[0].Fields[0].Labels, 0)
require.Equal(t, res[0].Fields[0].Name, "Time")
require.Len(t, res[0].Fields[1].Labels, 2)
require.Equal(t, res[0].Fields[1].Labels.String(), "app=Application, tag2=tag2")
require.Equal(t, res[0].Fields[1].Name, "Value")
require.Equal(t, res[0].Fields[1].Config.DisplayNameFromDS, "{app=\"Application\", tag2=\"tag2\"}")
})
t.Run("matrix response with NaN value should be changed to null", func(t *testing.T) {
value := make(map[TimeSeriesQueryType]interface{})
value[RangeQueryType] = p.Matrix{
@ -660,7 +693,7 @@ func TestPrometheus_parseTimeSeriesResponse(t *testing.T) {
End: time.Unix(4, 0).UTC(),
UtcOffsetSec: 0,
}
res, err := parseTimeSeriesResponse(value, query)
res, err := parseTimeSeriesResponse(value, query, true)
require.NoError(t, err)
var nilPointer *float64
@ -680,7 +713,7 @@ func TestPrometheus_parseTimeSeriesResponse(t *testing.T) {
query := &PrometheusQuery{
LegendFormat: "legend {{app}}",
}
res, err := parseTimeSeriesResponse(value, query)
res, err := parseTimeSeriesResponse(value, query, true)
require.NoError(t, err)
require.Len(t, res, 1)
@ -707,7 +740,7 @@ func TestPrometheus_parseTimeSeriesResponse(t *testing.T) {
}
query := &PrometheusQuery{}
res, err := parseTimeSeriesResponse(value, query)
res, err := parseTimeSeriesResponse(value, query, true)
require.NoError(t, err)
require.Len(t, res, 1)