SQL: Fix SQL dataframe resampling (fill mode + time intervals) (#36937)

* Refactor resample logic

* Adjust test to have one more timestamp out of range

* adjust test + ensure filling

* revert flag flip

* Undo logic - should be timeseries only

* change data calculation based on previous interval

* fix the logics

* fix typo

* fix resample start time, to reuse what sql api returned

* calculate the start point with from truncate by interval

Co-authored-by: Will Browne <will.browne@grafana.com>
Co-authored-by: Will Browne <wbrowne@users.noreply.github.com>
Co-authored-by: Ying WANG <ying.wang@grafana.com>
This commit is contained in:
idafurjes 2021-07-29 08:38:41 +02:00 committed by GitHub
parent 78f46e28c7
commit 180b1973e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 67 additions and 236 deletions

View File

@ -486,6 +486,58 @@ func TestPostgres(t *testing.T) {
})
})
t.Run("Given a table with one data point", func(t *testing.T) {
type metric struct {
Time time.Time
Value int64
}
startTime := time.Now().UTC().Add(-time.Minute * 5)
series := []*metric{
{
Time: startTime,
Value: 33,
},
}
_, err = sess.InsertMulti(series)
require.NoError(t, err)
t.Run("querying with time group with default value", func(t *testing.T) {
query := plugins.DataQuery{
Queries: []plugins.DataSubQuery{
{
Model: simplejson.NewFromAny(map[string]interface{}{
"rawSql": "WITH data AS (SELECT now()-'3m'::interval AS ts, 42 AS n) SELECT $__timeGroup(ts, '1m', 0), n FROM data",
"format": "time_series",
}),
RefID: "A",
},
},
TimeRange: &plugins.DataTimeRange{
From: fmt.Sprintf("%v", startTime.Unix()*1000),
To: fmt.Sprintf("%v", startTime.Add(5*time.Minute).Unix()*1000),
},
}
resp, err := exe.DataQuery(context.Background(), nil, query)
require.NoError(t, err)
queryResult := resp.Results["A"]
require.NoError(t, queryResult.Error)
frames, _ := queryResult.Dataframes.Decoded()
require.Equal(t, 1, len(frames))
require.Equal(t, "Time", frames[0].Fields[0].Name)
require.Equal(t, "n", frames[0].Fields[1].Name)
require.Equal(t, float64(0), *frames[0].Fields[1].At(0).(*float64))
require.Equal(t, float64(0), *frames[0].Fields[1].At(1).(*float64))
require.Equal(t, float64(42), *frames[0].Fields[1].At(2).(*float64))
require.Equal(t, float64(0), *frames[0].Fields[1].At(3).(*float64))
require.Equal(t, float64(0), *frames[0].Fields[1].At(4).(*float64))
require.Equal(t, float64(0), *frames[0].Fields[1].At(5).(*float64))
})
})
t.Run("When doing a metric query using timeGroup with previous fill enabled", func(t *testing.T) {
query := plugins.DataQuery{
Queries: []plugins.DataSubQuery{

View File

@ -90,7 +90,10 @@ func resample(f *data.Frame, qm dataQueryModel) (*data.Frame, error) {
lastSeenRowIdx := -1
timeField := f.Fields[tsSchema.TimeIndex]
for currentTime := qm.TimeRange.From; !currentTime.After(qm.TimeRange.To); currentTime = currentTime.Add(qm.Interval) {
startUnixTime := qm.TimeRange.From.Unix() / int64(qm.Interval.Seconds()) * int64(qm.Interval.Seconds())
startTime := time.Unix(startUnixTime, 0)
for currentTime := startTime; !currentTime.After(qm.TimeRange.To); currentTime = currentTime.Add(qm.Interval) {
initialRowIdx := 0
if lastSeenRowIdx > 0 {
initialRowIdx = lastSeenRowIdx + 1
@ -110,17 +113,16 @@ func resample(f *data.Frame, qm dataQueryModel) (*data.Frame, error) {
return f, fmt.Errorf("time point is nil")
}
if t.(time.Time).After(currentTime) {
nextTime := currentTime.Add(qm.Interval)
if t.(time.Time).Before(nextTime) {
// take the last element of the period current - interval <-> current, use it as value for current data point value
previousTime := currentTime.Add(-qm.Interval)
if t.(time.Time).After(previousTime) {
if !t.(time.Time).After(currentTime) {
intermediateRows = append(intermediateRows, initialRowIdx)
lastSeenRowIdx = initialRowIdx
initialRowIdx++
} else {
break
}
break
}
intermediateRows = append(intermediateRows, initialRowIdx)
lastSeenRowIdx = initialRowIdx
initialRowIdx++
}

View File

@ -230,16 +230,16 @@ func TestResampleWide(t *testing.T) {
time.Date(2020, 1, 2, 3, 4, 26, 0, time.UTC),
}),
data.NewField("Values Ints", nil, []*int64{
pointer.Int64(10),
pointer.Int64(12),
nil,
nil,
pointer.Int64(15),
nil,
}),
data.NewField(`Values Floats`, data.Labels{"Animal Factor": "sloth"}, []*float64{
pointer.Float64(10.5),
pointer.Float64(12.5),
nil,
nil,
pointer.Float64(15.0),
nil,
})),
@ -257,16 +257,19 @@ func TestResampleWide(t *testing.T) {
time.Date(2020, 1, 2, 3, 4, 19, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 20, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 24, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 27, 0, time.UTC),
}),
data.NewField("Values Ints", nil, []*int64{
pointer.Int64(10),
pointer.Int64(12),
pointer.Int64(15),
pointer.Int64(18),
}),
data.NewField(`Values Floats`, data.Labels{"Animal Factor": "sloth"}, []*float64{
pointer.Float64(10.5),
pointer.Float64(12.5),
pointer.Float64(15.0),
pointer.Float64(17.5),
})),
output: data.NewFrame("wide_test",
data.NewField("Time", nil, []time.Time{

View File

@ -335,10 +335,6 @@ func (e *dataPlugin) executeQuery(query plugins.DataSubQuery, wg *sync.WaitGroup
e.log.Error("Failed to resample dataframe", "err", err)
frame.AppendNotices(data.Notice{Text: "Failed to resample dataframe", Severity: data.NoticeSeverityWarning})
}
if err := trim(frame, *qm); err != nil {
e.log.Error("Failed to trim dataframe", "err", err)
frame.AppendNotices(data.Notice{Text: "Failed to trim dataframe", Severity: data.NoticeSeverityWarning})
}
}
}

View File

@ -1,51 +0,0 @@
package sqleng
import (
"fmt"
"time"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
// trim trims rows that are outside the qm.TimeRange.
func trim(f *data.Frame, qm dataQueryModel) error {
tsSchema := f.TimeSeriesSchema()
if tsSchema.Type == data.TimeSeriesTypeNot {
return fmt.Errorf("can not trim non-timeseries frame")
}
timeField := f.Fields[tsSchema.TimeIndex]
if timeField.Len() == 0 {
return nil
}
// Trim rows after end
for i := timeField.Len() - 1; i >= 0; i-- {
t, ok := timeField.ConcreteAt(i)
if !ok {
return fmt.Errorf("time point is nil")
}
if !t.(time.Time).After(qm.TimeRange.To) {
break
}
f.DeleteRow(i)
}
// Trim rows before start
for timeField.Len() > 0 {
t, ok := timeField.ConcreteAt(0)
if !ok {
return fmt.Errorf("time point is nil")
}
if !t.(time.Time).Before(qm.TimeRange.From) {
break
}
f.DeleteRow(0)
}
return nil
}

View File

@ -1,171 +0,0 @@
package sqleng
import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/stretchr/testify/require"
"github.com/xorcare/pointer"
)
func TestTrimWide(t *testing.T) {
tests := []struct {
name string
input *data.Frame
timeRange backend.TimeRange
output *data.Frame
}{
{
name: "needs trimming",
timeRange: backend.TimeRange{
From: time.Date(2020, 1, 2, 3, 4, 20, 0, time.UTC),
To: time.Date(2020, 1, 2, 3, 4, 24, 0, time.UTC),
},
input: data.NewFrame("wide_test",
data.NewField("Time", nil, []time.Time{
time.Date(2020, 1, 2, 3, 4, 18, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 19, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 20, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 21, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 22, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 23, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 24, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 25, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 26, 0, time.UTC),
}),
data.NewField("Values Ints", nil, []*int64{
nil,
pointer.Int64(10),
pointer.Int64(12),
nil,
nil,
nil,
pointer.Int64(15),
nil,
nil,
}),
data.NewField(`Values Floats`, data.Labels{"Animal Factor": "sloth"}, []*float64{
nil,
pointer.Float64(10.5),
pointer.Float64(12.5),
nil,
nil,
nil,
pointer.Float64(15.0),
nil,
nil,
})),
output: data.NewFrame("wide_test",
data.NewField("Time", nil, []time.Time{
time.Date(2020, 1, 2, 3, 4, 20, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 21, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 22, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 23, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 24, 0, time.UTC),
}),
data.NewField("Values Ints", nil, []*int64{
pointer.Int64(12),
nil,
nil,
nil,
pointer.Int64(15),
}),
data.NewField(`Values Floats`, data.Labels{"Animal Factor": "sloth"}, []*float64{
pointer.Float64(12.5),
nil,
nil,
nil,
pointer.Float64(15.0),
})),
},
{
name: "does not need trimming",
timeRange: backend.TimeRange{
From: time.Date(2020, 1, 2, 3, 4, 15, 0, time.UTC),
To: time.Date(2020, 1, 2, 3, 4, 30, 0, time.UTC),
},
input: data.NewFrame("wide_test",
data.NewField("Time", nil, []time.Time{
time.Date(2020, 1, 2, 3, 4, 18, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 19, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 20, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 21, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 22, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 23, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 24, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 25, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 26, 0, time.UTC),
}),
data.NewField("Values Ints", nil, []*int64{
nil,
pointer.Int64(10),
pointer.Int64(12),
nil,
nil,
nil,
pointer.Int64(15),
nil,
nil,
}),
data.NewField(`Values Floats`, data.Labels{"Animal Factor": "sloth"}, []*float64{
nil,
pointer.Float64(10.5),
pointer.Float64(12.5),
nil,
nil,
nil,
pointer.Float64(15.0),
nil,
nil,
})),
output: data.NewFrame("wide_test",
data.NewField("Time", nil, []time.Time{
time.Date(2020, 1, 2, 3, 4, 18, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 19, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 20, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 21, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 22, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 23, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 24, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 25, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 26, 0, time.UTC),
}),
data.NewField("Values Ints", nil, []*int64{
nil,
pointer.Int64(10),
pointer.Int64(12),
nil,
nil,
nil,
pointer.Int64(15),
nil,
nil,
}),
data.NewField(`Values Floats`, data.Labels{"Animal Factor": "sloth"}, []*float64{
nil,
pointer.Float64(10.5),
pointer.Float64(12.5),
nil,
nil,
nil,
pointer.Float64(15.0),
nil,
nil,
})),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := trim(tt.input, dataQueryModel{
TimeRange: tt.timeRange,
})
require.NoError(t, err)
if diff := cmp.Diff(tt.output, tt.input, data.FrameTestCompareOptions()...); diff != "" {
t.Errorf("Result mismatch (-want +got):\n%s", diff)
}
})
}
}