diff --git a/pkg/tsdb/prometheus/time_series_query.go b/pkg/tsdb/prometheus/time_series_query.go index 64cd8d77847..733f2c72912 100644 --- a/pkg/tsdb/prometheus/time_series_query.go +++ b/pkg/tsdb/prometheus/time_series_query.go @@ -73,8 +73,8 @@ func (s *Service) executeTimeSeriesQuery(ctx context.Context, req *backend.Query timeRange := apiv1.Range{ Step: query.Step, // Align query range to step. It rounds start and end down to a multiple of step. - Start: time.Unix(int64(math.Floor((float64(query.Start.Unix()+query.UtcOffsetSec)/query.Step.Seconds()))*query.Step.Seconds()-float64(query.UtcOffsetSec)), 0), - End: time.Unix(int64(math.Floor((float64(query.End.Unix()+query.UtcOffsetSec)/query.Step.Seconds()))*query.Step.Seconds()-float64(query.UtcOffsetSec)), 0), + Start: alignTimeRange(query.Start, query.Step, query.UtcOffsetSec), + End: alignTimeRange(query.End, query.Step, query.UtcOffsetSec), } if query.RangeQuery { @@ -299,15 +299,30 @@ func matrixToDataFrames(matrix model.Matrix, query *PrometheusQuery, frames data tags[string(k)] = string(v) } - timeField := data.NewFieldFromFieldType(data.FieldTypeTime, len(v.Values)) - valueField := data.NewFieldFromFieldType(data.FieldTypeNullableFloat64, len(v.Values)) + baseTimestamp := alignTimeRange(query.Start, query.Step, query.UtcOffsetSec).UnixMilli() + endTimestamp := alignTimeRange(query.End, query.Step, query.UtcOffsetSec).UnixMilli() + // For each step we create 1 data point. This results in range / step + 1 data points. + datapointsCount := int((endTimestamp-baseTimestamp)/query.Step.Milliseconds()) + 1 - for i, k := range v.Values { - timeField.Set(i, time.Unix(k.Timestamp.Unix(), 0).UTC()) - value := float64(k.Value) - if !math.IsNaN(value) { - valueField.Set(i, &value) + timeField := data.NewFieldFromFieldType(data.FieldTypeTime, datapointsCount) + valueField := data.NewFieldFromFieldType(data.FieldTypeNullableFloat64, datapointsCount) + idx := 0 + + for _, pair := range v.Values { + timestamp := int64(pair.Timestamp) + value := float64(pair.Value) + + for t := baseTimestamp; t < timestamp; t += query.Step.Milliseconds() { + timeField.Set(idx, time.Unix(0, t*1000000).UTC()) + idx++ } + + timeField.Set(idx, time.Unix(pair.Timestamp.Unix(), 0).UTC()) + if !math.IsNaN(value) { + valueField.Set(idx, &value) + } + baseTimestamp = timestamp + query.Step.Milliseconds() + idx++ } name := formatLegend(v.Metric, query) @@ -503,3 +518,7 @@ func newDataFrame(name string, typ string, fields ...*data.Field) *data.Frame { return frame } + +func alignTimeRange(t time.Time, step time.Duration, offset int64) time.Time { + return time.Unix(int64(math.Floor((float64(t.Unix()+offset)/step.Seconds()))*step.Seconds()-float64(offset)), 0) +} diff --git a/pkg/tsdb/prometheus/time_series_query_test.go b/pkg/tsdb/prometheus/time_series_query_test.go index 36aa40e045c..b4b363acf2d 100644 --- a/pkg/tsdb/prometheus/time_series_query_test.go +++ b/pkg/tsdb/prometheus/time_series_query_test.go @@ -567,6 +567,10 @@ func TestPrometheus_parseTimeSeriesResponse(t *testing.T) { } query := &PrometheusQuery{ LegendFormat: "legend {{app}}", + Step: 1 * time.Second, + Start: time.Unix(1, 0).UTC(), + End: time.Unix(5, 0).UTC(), + UtcOffsetSec: 0, } res, err := parseTimeSeriesResponse(value, query) require.NoError(t, err) @@ -586,6 +590,37 @@ func TestPrometheus_parseTimeSeriesResponse(t *testing.T) { require.Equal(t, "UTC", testValue.(time.Time).Location().String()) }) + t.Run("matrix response with missed data points should be parsed correctly", func(t *testing.T) { + values := []p.SamplePair{ + {Value: 1, Timestamp: 1000}, + {Value: 4, Timestamp: 4000}, + } + value := make(map[TimeSeriesQueryType]interface{}) + value[RangeQueryType] = p.Matrix{ + &p.SampleStream{ + Metric: p.Metric{"app": "Application", "tag2": "tag2"}, + Values: values, + }, + } + query := &PrometheusQuery{ + LegendFormat: "", + Step: 1 * time.Second, + Start: time.Unix(1, 0).UTC(), + End: time.Unix(4, 0).UTC(), + UtcOffsetSec: 0, + } + res, err := parseTimeSeriesResponse(value, query) + + require.NoError(t, err) + require.Len(t, res, 1) + require.Equal(t, res[0].Fields[0].Len(), 4) + require.Equal(t, res[0].Fields[0].At(1), time.Unix(2, 0).UTC()) + require.Equal(t, res[0].Fields[0].At(2), time.Unix(3, 0).UTC()) + require.Equal(t, res[0].Fields[1].Len(), 4) + require.Nil(t, res[0].Fields[1].At(1)) + require.Nil(t, res[0].Fields[1].At(2)) + }) + t.Run("matrix response with NaN value should be changed to null", func(t *testing.T) { value := make(map[TimeSeriesQueryType]interface{}) value[RangeQueryType] = p.Matrix{ @@ -598,6 +633,10 @@ func TestPrometheus_parseTimeSeriesResponse(t *testing.T) { } query := &PrometheusQuery{ LegendFormat: "", + Step: 1 * time.Second, + Start: time.Unix(1, 0).UTC(), + End: time.Unix(4, 0).UTC(), + UtcOffsetSec: 0, } res, err := parseTimeSeriesResponse(value, query) require.NoError(t, err)