Prometheus: Fill missing steps with null values (#43622)

* Prometheus: Add empty points when data points missing

* Remove newline

* Add comments

* Improve/Fix test

* Remove unused variable
This commit is contained in:
Ivana Huckova 2022-01-05 11:40:32 +01:00 committed by GitHub
parent 3625c617fa
commit d7d6c10664
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 67 additions and 9 deletions

View File

@ -73,8 +73,8 @@ func (s *Service) executeTimeSeriesQuery(ctx context.Context, req *backend.Query
timeRange := apiv1.Range{
Step: query.Step,
// Align query range to step. It rounds start and end down to a multiple of step.
Start: time.Unix(int64(math.Floor((float64(query.Start.Unix()+query.UtcOffsetSec)/query.Step.Seconds()))*query.Step.Seconds()-float64(query.UtcOffsetSec)), 0),
End: time.Unix(int64(math.Floor((float64(query.End.Unix()+query.UtcOffsetSec)/query.Step.Seconds()))*query.Step.Seconds()-float64(query.UtcOffsetSec)), 0),
Start: alignTimeRange(query.Start, query.Step, query.UtcOffsetSec),
End: alignTimeRange(query.End, query.Step, query.UtcOffsetSec),
}
if query.RangeQuery {
@ -299,15 +299,30 @@ func matrixToDataFrames(matrix model.Matrix, query *PrometheusQuery, frames data
tags[string(k)] = string(v)
}
timeField := data.NewFieldFromFieldType(data.FieldTypeTime, len(v.Values))
valueField := data.NewFieldFromFieldType(data.FieldTypeNullableFloat64, len(v.Values))
baseTimestamp := alignTimeRange(query.Start, query.Step, query.UtcOffsetSec).UnixMilli()
endTimestamp := alignTimeRange(query.End, query.Step, query.UtcOffsetSec).UnixMilli()
// For each step we create 1 data point. This results in range / step + 1 data points.
datapointsCount := int((endTimestamp-baseTimestamp)/query.Step.Milliseconds()) + 1
for i, k := range v.Values {
timeField.Set(i, time.Unix(k.Timestamp.Unix(), 0).UTC())
value := float64(k.Value)
if !math.IsNaN(value) {
valueField.Set(i, &value)
timeField := data.NewFieldFromFieldType(data.FieldTypeTime, datapointsCount)
valueField := data.NewFieldFromFieldType(data.FieldTypeNullableFloat64, datapointsCount)
idx := 0
for _, pair := range v.Values {
timestamp := int64(pair.Timestamp)
value := float64(pair.Value)
for t := baseTimestamp; t < timestamp; t += query.Step.Milliseconds() {
timeField.Set(idx, time.Unix(0, t*1000000).UTC())
idx++
}
timeField.Set(idx, time.Unix(pair.Timestamp.Unix(), 0).UTC())
if !math.IsNaN(value) {
valueField.Set(idx, &value)
}
baseTimestamp = timestamp + query.Step.Milliseconds()
idx++
}
name := formatLegend(v.Metric, query)
@ -503,3 +518,7 @@ func newDataFrame(name string, typ string, fields ...*data.Field) *data.Frame {
return frame
}
func alignTimeRange(t time.Time, step time.Duration, offset int64) time.Time {
return time.Unix(int64(math.Floor((float64(t.Unix()+offset)/step.Seconds()))*step.Seconds()-float64(offset)), 0)
}

View File

@ -567,6 +567,10 @@ func TestPrometheus_parseTimeSeriesResponse(t *testing.T) {
}
query := &PrometheusQuery{
LegendFormat: "legend {{app}}",
Step: 1 * time.Second,
Start: time.Unix(1, 0).UTC(),
End: time.Unix(5, 0).UTC(),
UtcOffsetSec: 0,
}
res, err := parseTimeSeriesResponse(value, query)
require.NoError(t, err)
@ -586,6 +590,37 @@ func TestPrometheus_parseTimeSeriesResponse(t *testing.T) {
require.Equal(t, "UTC", testValue.(time.Time).Location().String())
})
t.Run("matrix response with missed data points should be parsed correctly", func(t *testing.T) {
values := []p.SamplePair{
{Value: 1, Timestamp: 1000},
{Value: 4, Timestamp: 4000},
}
value := make(map[TimeSeriesQueryType]interface{})
value[RangeQueryType] = p.Matrix{
&p.SampleStream{
Metric: p.Metric{"app": "Application", "tag2": "tag2"},
Values: values,
},
}
query := &PrometheusQuery{
LegendFormat: "",
Step: 1 * time.Second,
Start: time.Unix(1, 0).UTC(),
End: time.Unix(4, 0).UTC(),
UtcOffsetSec: 0,
}
res, err := parseTimeSeriesResponse(value, query)
require.NoError(t, err)
require.Len(t, res, 1)
require.Equal(t, res[0].Fields[0].Len(), 4)
require.Equal(t, res[0].Fields[0].At(1), time.Unix(2, 0).UTC())
require.Equal(t, res[0].Fields[0].At(2), time.Unix(3, 0).UTC())
require.Equal(t, res[0].Fields[1].Len(), 4)
require.Nil(t, res[0].Fields[1].At(1))
require.Nil(t, res[0].Fields[1].At(2))
})
t.Run("matrix response with NaN value should be changed to null", func(t *testing.T) {
value := make(map[TimeSeriesQueryType]interface{})
value[RangeQueryType] = p.Matrix{
@ -598,6 +633,10 @@ func TestPrometheus_parseTimeSeriesResponse(t *testing.T) {
}
query := &PrometheusQuery{
LegendFormat: "",
Step: 1 * time.Second,
Start: time.Unix(1, 0).UTC(),
End: time.Unix(4, 0).UTC(),
UtcOffsetSec: 0,
}
res, err := parseTimeSeriesResponse(value, query)
require.NoError(t, err)