sql: use resample from grafana-plugin-sdk-go (#85599)

* copied files

* add copy of Pointer

* fix the API

* forward resample to the namespaced code

* moved the aligning-code to the parent

* call namespaced resample directly

* lint fix

* lint fix

* switch to plugin-sdk-go resample

* adjusted import path
This commit is contained in:
Gábor Farkas
2024-04-12 15:22:28 +02:00
committed by GitHub
parent b295f09af6
commit ee1ece1cf8
3 changed files with 8 additions and 451 deletions

View File

@@ -1,138 +0,0 @@
package sqleng
import (
"fmt"
"time"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
// getRowFillValues populates a slice of values corresponding to the provided data.Frame fields.
// Uses data.FillMissing settings to fill in values that are missing. Values are normally missing
// due to that the selected query interval doesn't match the intervals of the data returned from
// the query and therefore needs to be resampled.
func getRowFillValues(f *data.Frame, tsSchema data.TimeSeriesSchema, currentTime time.Time,
fillMissing *data.FillMissing, intermediateRows []int, lastSeenRowIdx int) []interface{} {
vals := make([]interface{}, 0, len(f.Fields))
for i, field := range f.Fields {
// if the current field is the time index of the series
// set the new value to be added to the new timestamp
if i == tsSchema.TimeIndex {
switch f.Fields[tsSchema.TimeIndex].Type() {
case data.FieldTypeTime:
vals = append(vals, currentTime)
default:
vals = append(vals, &currentTime)
}
continue
}
isValueField := false
for _, idx := range tsSchema.ValueIndices {
if i == idx {
isValueField = true
break
}
}
// if the current field is value Field
// set the new value to the last seen field value (if such exists)
// otherwise set the appropriate value according to the fillMissing mode
// if the current field is string field)
// set the new value to be added to the last seen value (if such exists)
// if the Frame is wide then there should not be any string fields
var newVal interface{}
if isValueField {
if len(intermediateRows) > 0 {
// instead of setting the last seen
// we could set avg, sum, min or max
// of the intermediate values for each field
newVal = f.At(i, intermediateRows[len(intermediateRows)-1])
} else {
val, err := data.GetMissing(fillMissing, field, lastSeenRowIdx)
if err == nil {
newVal = val
}
}
} else if lastSeenRowIdx >= 0 {
newVal = f.At(i, lastSeenRowIdx)
}
vals = append(vals, newVal)
}
return vals
}
// resample resample provided time-series data.Frame.
// This is needed in the case of the selected query interval doesn't
// match the intervals of the time-series field in the data.Frame and
// therefore needs to be resampled.
func resample(f *data.Frame, qm dataQueryModel) (*data.Frame, error) {
tsSchema := f.TimeSeriesSchema()
if tsSchema.Type == data.TimeSeriesTypeNot {
return f, fmt.Errorf("can not fill missing, not timeseries frame")
}
if qm.Interval == 0 {
return f, nil
}
newFields := make([]*data.Field, 0, len(f.Fields))
for _, field := range f.Fields {
newField := data.NewFieldFromFieldType(field.Type(), 0)
newField.Name = field.Name
newField.Labels = field.Labels
newFields = append(newFields, newField)
}
resampledFrame := data.NewFrame(f.Name, newFields...)
resampledFrame.Meta = f.Meta
resampledRowidx := 0
lastSeenRowIdx := -1
timeField := f.Fields[tsSchema.TimeIndex]
startUnixTime := qm.TimeRange.From.Unix() / int64(qm.Interval.Seconds()) * int64(qm.Interval.Seconds())
startTime := time.Unix(startUnixTime, 0)
for currentTime := startTime; !currentTime.After(qm.TimeRange.To); currentTime = currentTime.Add(qm.Interval) {
initialRowIdx := 0
if lastSeenRowIdx > 0 {
initialRowIdx = lastSeenRowIdx + 1
}
intermediateRows := make([]int, 0)
for {
rowLen, err := f.RowLen()
if err != nil {
return f, err
}
if initialRowIdx == rowLen {
break
}
t, ok := timeField.ConcreteAt(initialRowIdx)
if !ok {
return f, fmt.Errorf("time point is nil")
}
// take the last element of the period current - interval <-> current, use it as value for current data point value
previousTime := currentTime.Add(-qm.Interval)
if t.(time.Time).After(previousTime) {
if !t.(time.Time).After(currentTime) {
intermediateRows = append(intermediateRows, initialRowIdx)
} else {
break
}
}
lastSeenRowIdx = initialRowIdx
initialRowIdx++
}
// no intermediate points; set values following fill missing mode
fieldVals := getRowFillValues(f, tsSchema, currentTime, qm.FillMissing, intermediateRows, lastSeenRowIdx)
resampledFrame.InsertRow(resampledRowidx, fieldVals...)
resampledRowidx++
}
return resampledFrame, nil
}

View File

@@ -1,312 +0,0 @@
package sqleng
import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/tsdb/sqleng/util"
"github.com/stretchr/testify/require"
)
func TestResampleWide(t *testing.T) {
tests := []struct {
name string
input *data.Frame
fillMissing *data.FillMissing
timeRange backend.TimeRange
interval time.Duration
output *data.Frame
}{
{
name: "interval 1s; fill null",
fillMissing: &data.FillMissing{Mode: data.FillModeNull},
timeRange: backend.TimeRange{
From: time.Date(2020, 1, 2, 3, 4, 18, 0, time.UTC),
To: time.Date(2020, 1, 2, 3, 4, 26, 0, time.UTC),
},
interval: time.Second,
input: data.NewFrame("wide_test",
data.NewField("Time", nil, []time.Time{
time.Date(2020, 1, 2, 3, 4, 19, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 20, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 24, 0, time.UTC),
}),
data.NewField("Values Ints", nil, []*int64{
util.Pointer(int64(10)),
util.Pointer(int64(12)),
util.Pointer(int64(15)),
}),
data.NewField(`Values Floats`, data.Labels{"Animal Factor": "sloth"}, []*float64{
util.Pointer(10.5),
util.Pointer(12.5),
util.Pointer(15.0),
})),
output: data.NewFrame("wide_test",
data.NewField("Time", nil, []time.Time{
time.Date(2020, 1, 2, 3, 4, 18, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 19, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 20, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 21, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 22, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 23, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 24, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 25, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 26, 0, time.UTC),
}),
data.NewField("Values Ints", nil, []*int64{
nil,
util.Pointer(int64(10)),
util.Pointer(int64(12)),
nil,
nil,
nil,
util.Pointer(int64(15)),
nil,
nil,
}),
data.NewField(`Values Floats`, data.Labels{"Animal Factor": "sloth"}, []*float64{
nil,
util.Pointer(10.5),
util.Pointer(12.5),
nil,
nil,
nil,
util.Pointer(15.0),
nil,
nil,
})),
},
{
name: "interval 1s; fill value",
fillMissing: &data.FillMissing{Mode: data.FillModeValue, Value: -1},
timeRange: backend.TimeRange{
From: time.Date(2020, 1, 2, 3, 4, 18, 0, time.UTC),
To: time.Date(2020, 1, 2, 3, 4, 26, 0, time.UTC),
},
interval: time.Second,
input: data.NewFrame("wide_test",
data.NewField("Time", nil, []time.Time{
time.Date(2020, 1, 2, 3, 4, 19, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 20, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 24, 0, time.UTC),
}),
data.NewField("Values Ints", nil, []*int64{
util.Pointer(int64(10)),
util.Pointer(int64(12)),
util.Pointer(int64(15)),
}),
data.NewField(`Values Floats`, data.Labels{"Animal Factor": "sloth"}, []*float64{
util.Pointer(10.5),
util.Pointer(12.5),
util.Pointer(15.0),
})),
output: data.NewFrame("wide_test",
data.NewField("Time", nil, []time.Time{
time.Date(2020, 1, 2, 3, 4, 18, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 19, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 20, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 21, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 22, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 23, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 24, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 25, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 26, 0, time.UTC),
}),
data.NewField("Values Ints", nil, []*int64{
util.Pointer(int64(-1)),
util.Pointer(int64(10)),
util.Pointer(int64(12)),
util.Pointer(int64(-1)),
util.Pointer(int64(-1)),
util.Pointer(int64(-1)),
util.Pointer(int64(15)),
util.Pointer(int64(-1)),
util.Pointer(int64(-1)),
}),
data.NewField(`Values Floats`, data.Labels{"Animal Factor": "sloth"}, []*float64{
util.Pointer(-1.0),
util.Pointer(10.5),
util.Pointer(12.5),
util.Pointer(-1.0),
util.Pointer(-1.0),
util.Pointer(-1.0),
util.Pointer(15.0),
util.Pointer(-1.0),
util.Pointer(-1.0),
})),
},
{
name: "interval 1s; fill previous",
fillMissing: &data.FillMissing{Mode: data.FillModePrevious},
timeRange: backend.TimeRange{
From: time.Date(2020, 1, 2, 3, 4, 18, 0, time.UTC),
To: time.Date(2020, 1, 2, 3, 4, 26, 0, time.UTC),
},
interval: time.Second,
input: data.NewFrame("wide_test",
data.NewField("Time", nil, []time.Time{
time.Date(2020, 1, 2, 3, 4, 19, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 20, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 24, 0, time.UTC),
}),
data.NewField("Values Ints", nil, []*int64{
util.Pointer(int64(10)),
util.Pointer(int64(12)),
util.Pointer(int64(15)),
}),
data.NewField(`Values Floats`, data.Labels{"Animal Factor": "sloth"}, []*float64{
util.Pointer(10.5),
util.Pointer(12.5),
util.Pointer(15.0),
})),
output: data.NewFrame("wide_test",
data.NewField("Time", nil, []time.Time{
time.Date(2020, 1, 2, 3, 4, 18, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 19, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 20, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 21, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 22, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 23, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 24, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 25, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 26, 0, time.UTC),
}),
data.NewField("Values Ints", nil, []*int64{
nil,
util.Pointer(int64(10)),
util.Pointer(int64(12)),
util.Pointer(int64(12)),
util.Pointer(int64(12)),
util.Pointer(int64(12)),
util.Pointer(int64(15)),
util.Pointer(int64(15)),
util.Pointer(int64(15)),
}),
data.NewField(`Values Floats`, data.Labels{"Animal Factor": "sloth"}, []*float64{
nil,
util.Pointer(10.5),
util.Pointer(12.5),
util.Pointer(12.5),
util.Pointer(12.5),
util.Pointer(12.5),
util.Pointer(15.0),
util.Pointer(15.0),
util.Pointer(15.0),
})),
},
{
name: "interval 2s; fill null",
fillMissing: &data.FillMissing{Mode: data.FillModeNull},
timeRange: backend.TimeRange{
From: time.Date(2020, 1, 2, 3, 4, 18, 0, time.UTC),
To: time.Date(2020, 1, 2, 3, 4, 26, 0, time.UTC),
},
interval: 2 * time.Second,
input: data.NewFrame("wide_test",
data.NewField("Time", nil, []time.Time{
time.Date(2020, 1, 2, 3, 4, 18, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 19, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 24, 0, time.UTC),
}),
data.NewField("Values Ints", nil, []*int64{
util.Pointer(int64(10)),
util.Pointer(int64(12)),
util.Pointer(int64(15)),
}),
data.NewField(`Values Floats`, data.Labels{"Animal Factor": "sloth"}, []*float64{
util.Pointer(10.5),
util.Pointer(12.5),
util.Pointer(15.0),
})),
output: data.NewFrame("wide_test",
data.NewField("Time", nil, []time.Time{
time.Date(2020, 1, 2, 3, 4, 18, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 20, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 22, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 24, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 26, 0, time.UTC),
}),
data.NewField("Values Ints", nil, []*int64{
util.Pointer(int64(10)),
util.Pointer(int64(12)),
nil,
util.Pointer(int64(15)),
nil,
}),
data.NewField(`Values Floats`, data.Labels{"Animal Factor": "sloth"}, []*float64{
util.Pointer(10.5),
util.Pointer(12.5),
nil,
util.Pointer(15.0),
nil,
})),
},
{
name: "interval 1s; fill null; rows outside timerange window",
fillMissing: &data.FillMissing{Mode: data.FillModeNull},
timeRange: backend.TimeRange{
From: time.Date(2020, 1, 2, 3, 4, 20, 0, time.UTC),
To: time.Date(2020, 1, 2, 3, 4, 24, 0, time.UTC),
},
interval: time.Second,
input: data.NewFrame("wide_test",
data.NewField("Time", nil, []time.Time{
time.Date(2020, 1, 2, 3, 4, 19, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 20, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 24, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 27, 0, time.UTC),
}),
data.NewField("Values Ints", nil, []*int64{
util.Pointer(int64(10)),
util.Pointer(int64(12)),
util.Pointer(int64(15)),
util.Pointer(int64(18)),
}),
data.NewField(`Values Floats`, data.Labels{"Animal Factor": "sloth"}, []*float64{
util.Pointer(10.5),
util.Pointer(12.5),
util.Pointer(15.0),
util.Pointer(17.5),
})),
output: data.NewFrame("wide_test",
data.NewField("Time", nil, []time.Time{
time.Date(2020, 1, 2, 3, 4, 20, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 21, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 22, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 23, 0, time.UTC),
time.Date(2020, 1, 2, 3, 4, 24, 0, time.UTC),
}),
data.NewField("Values Ints", nil, []*int64{
util.Pointer(int64(12)),
nil,
nil,
nil,
util.Pointer(int64(15)),
}),
data.NewField(`Values Floats`, data.Labels{"Animal Factor": "sloth"}, []*float64{
util.Pointer(12.5),
nil,
nil,
nil,
util.Pointer(15.0),
})),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
frame, err := resample(tt.input, dataQueryModel{
FillMissing: tt.fillMissing,
TimeRange: tt.timeRange,
Interval: tt.interval,
})
require.NoError(t, err)
if diff := cmp.Diff(tt.output, frame, data.FrameTestCompareOptions()...); diff != "" {
t.Errorf("Result mismatch (-want +got):\n%s", diff)
}
})
}
}

View File

@@ -347,8 +347,15 @@ func (e *DataSourceHandler) executeQuery(query backend.DataQuery, wg *sync.WaitG
} }
} }
if qm.FillMissing != nil { if qm.FillMissing != nil {
// we align the start-time
startUnixTime := qm.TimeRange.From.Unix() / int64(qm.Interval.Seconds()) * int64(qm.Interval.Seconds())
alignedTimeRange := backend.TimeRange{
From: time.Unix(startUnixTime, 0),
To: qm.TimeRange.To,
}
var err error var err error
frame, err = resample(frame, *qm) frame, err = sqlutil.ResampleWideFrame(frame, qm.FillMissing, alignedTimeRange, qm.Interval)
if err != nil { if err != nil {
logger.Error("Failed to resample dataframe", "err", err) logger.Error("Failed to resample dataframe", "err", err)
frame.AppendNotices(data.Notice{Text: "Failed to resample dataframe", Severity: data.NoticeSeverityWarning}) frame.AppendNotices(data.Notice{Text: "Failed to resample dataframe", Severity: data.NoticeSeverityWarning})