SSE: Support time series frames with non-nullable float64 values (#32608)

* SSE: fix reduce to handle non-null
* add type for data.Field that is float64 or *float64
* resample fix for non-null input case, add couple non-null tests
This commit is contained in:
Kyle Brandt 2021-04-01 14:33:35 -04:00 committed by GitHub
parent d42a5b2561
commit 512faa7c9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 167 additions and 115 deletions

View File

@ -86,6 +86,7 @@ func (ccc *ConditionsCmd) Execute(ctx context.Context, vars mathexp.Vars) (mathe
} }
reducedNum := c.Reducer.Reduce(series) reducedNum := c.Reducer.Reduce(series)
// TODO handle error / no data signals // TODO handle error / no data signals
thisCondNoDataFound := reducedNum.GetFloat64Value() == nil thisCondNoDataFound := reducedNum.GetFloat64Value() == nil

View File

@ -4,7 +4,6 @@ import (
"math" "math"
"sort" "sort"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/expr/mathexp" "github.com/grafana/grafana/pkg/expr/mathexp"
) )
@ -35,44 +34,42 @@ func (cr classicReducer) Reduce(series mathexp.Series) mathexp.Number {
allNull := true allNull := true
vF := series.Frame.Fields[series.ValueIdx] vF := series.Frame.Fields[series.ValueIdx]
ff := mathexp.Float64Field(*vF)
switch cr { switch cr {
case "avg": case "avg":
validPointsCount := 0 validPointsCount := 0
for i := 0; i < vF.Len(); i++ { for i := 0; i < ff.Len(); i++ {
if f, ok := vF.At(i).(*float64); ok { f := ff.GetValue(i)
if nilOrNaN(f) { if nilOrNaN(f) {
continue continue
}
value += *f
validPointsCount++
allNull = false
} }
value += *f
validPointsCount++
allNull = false
} }
if validPointsCount > 0 { if validPointsCount > 0 {
value /= float64(validPointsCount) value /= float64(validPointsCount)
} }
case "sum": case "sum":
for i := 0; i < vF.Len(); i++ { for i := 0; i < ff.Len(); i++ {
if f, ok := vF.At(i).(*float64); ok { f := ff.GetValue(i)
if nilOrNaN(f) { if nilOrNaN(f) {
continue continue
}
value += *f
allNull = false
} }
value += *f
allNull = false
} }
case "min": case "min":
value = math.MaxFloat64 value = math.MaxFloat64
for i := 0; i < vF.Len(); i++ { for i := 0; i < ff.Len(); i++ {
if f, ok := vF.At(i).(*float64); ok { f := ff.GetValue(i)
if nilOrNaN(f) { if nilOrNaN(f) {
continue continue
} }
allNull = false allNull = false
if value > *f { if value > *f {
value = *f value = *f
}
} }
} }
if allNull { if allNull {
@ -80,43 +77,40 @@ func (cr classicReducer) Reduce(series mathexp.Series) mathexp.Number {
} }
case "max": case "max":
value = -math.MaxFloat64 value = -math.MaxFloat64
for i := 0; i < vF.Len(); i++ { for i := 0; i < ff.Len(); i++ {
if f, ok := vF.At(i).(*float64); ok { f := ff.GetValue(i)
if nilOrNaN(f) { if nilOrNaN(f) {
continue continue
} }
allNull = false allNull = false
if value < *f { if value < *f {
value = *f value = *f
}
} }
} }
if allNull { if allNull {
value = 0 value = 0
} }
case "count": case "count":
value = float64(vF.Len()) value = float64(ff.Len())
allNull = false allNull = false
case "last": case "last":
for i := vF.Len() - 1; i >= 0; i-- { for i := ff.Len() - 1; i >= 0; i-- {
if f, ok := vF.At(i).(*float64); ok { f := ff.GetValue(i)
if !nilOrNaN(f) { if !nilOrNaN(f) {
value = *f value = *f
allNull = false allNull = false
break break
}
} }
} }
case "median": case "median":
var values []float64 var values []float64
for i := 0; i < vF.Len(); i++ { for i := 0; i < ff.Len(); i++ {
if f, ok := vF.At(i).(*float64); ok { f := ff.GetValue(i)
if nilOrNaN(f) { if nilOrNaN(f) {
continue continue
}
allNull = false
values = append(values, *f)
} }
allNull = false
values = append(values, *f)
} }
if len(values) >= 1 { if len(values) >= 1 {
sort.Float64s(values) sort.Float64s(values)
@ -128,21 +122,20 @@ func (cr classicReducer) Reduce(series mathexp.Series) mathexp.Number {
} }
} }
case "diff": case "diff":
allNull, value = calculateDiff(vF, allNull, value, diff) allNull, value = calculateDiff(ff, allNull, value, diff)
case "diff_abs": case "diff_abs":
allNull, value = calculateDiff(vF, allNull, value, diffAbs) allNull, value = calculateDiff(ff, allNull, value, diffAbs)
case "percent_diff": case "percent_diff":
allNull, value = calculateDiff(vF, allNull, value, percentDiff) allNull, value = calculateDiff(ff, allNull, value, percentDiff)
case "percent_diff_abs": case "percent_diff_abs":
allNull, value = calculateDiff(vF, allNull, value, percentDiffAbs) allNull, value = calculateDiff(ff, allNull, value, percentDiffAbs)
case "count_non_null": case "count_non_null":
for i := 0; i < vF.Len(); i++ { for i := 0; i < ff.Len(); i++ {
if f, ok := vF.At(i).(*float64); ok { f := ff.GetValue(i)
if nilOrNaN(f) { if nilOrNaN(f) {
continue continue
}
value++
} }
value++
} }
if value > 0 { if value > 0 {
@ -158,30 +151,28 @@ func (cr classicReducer) Reduce(series mathexp.Series) mathexp.Number {
return num return num
} }
func calculateDiff(vF *data.Field, allNull bool, value float64, fn func(float64, float64) float64) (bool, float64) { func calculateDiff(ff mathexp.Float64Field, allNull bool, value float64, fn func(float64, float64) float64) (bool, float64) {
var ( var (
first float64 first float64
i int i int
) )
// get the newest point // get the newest point
for i = vF.Len() - 1; i >= 0; i-- { for i = ff.Len() - 1; i >= 0; i-- {
if f, ok := vF.At(i).(*float64); ok { f := ff.GetValue(i)
if !nilOrNaN(f) { if !nilOrNaN(f) {
first = *f first = *f
allNull = false allNull = false
break break
}
} }
} }
if i >= 1 { if i >= 1 {
// get the oldest point // get the oldest point
for i := 0; i < vF.Len(); i++ { for i := 0; i < ff.Len(); i++ {
if f, ok := vF.At(i).(*float64); ok { f := ff.GetValue(i)
if !nilOrNaN(f) { if !nilOrNaN(f) {
value = fn(first, *f) value = fn(first, *f)
allNull = false allNull = false
break break
}
} }
} }
} }

View File

@ -7,67 +7,64 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana-plugin-sdk-go/data"
) )
func Sum(v *data.Field) *float64 { func Sum(fv *Float64Field) *float64 {
var sum float64 var sum float64
for i := 0; i < v.Len(); i++ { for i := 0; i < fv.Len(); i++ {
if f, ok := v.At(i).(*float64); ok { f := fv.GetValue(i)
if f == nil || math.IsNaN(*f) { if f == nil || math.IsNaN(*f) {
nan := math.NaN() nan := math.NaN()
return &nan return &nan
}
sum += *f
} }
sum += *f
} }
return &sum return &sum
} }
func Avg(v *data.Field) *float64 { func Avg(fv *Float64Field) *float64 {
sum := Sum(v) sum := Sum(fv)
f := *sum / float64(v.Len()) f := *sum / float64(fv.Len())
return &f return &f
} }
func Min(fv *data.Field) *float64 { func Min(fv *Float64Field) *float64 {
var f float64 var f float64
if fv.Len() == 0 { if fv.Len() == 0 {
nan := math.NaN() nan := math.NaN()
return &nan return &nan
} }
for i := 0; i < fv.Len(); i++ { for i := 0; i < fv.Len(); i++ {
if v, ok := fv.At(i).(*float64); ok { v := fv.GetValue(i)
if v == nil || math.IsNaN(*v) { if v == nil || math.IsNaN(*v) {
nan := math.NaN() nan := math.NaN()
return &nan return &nan
} }
if i == 0 || *v < f { if i == 0 || *v < f {
f = *v f = *v
}
} }
} }
return &f return &f
} }
func Max(fv *data.Field) *float64 { func Max(fv *Float64Field) *float64 {
var f float64 var f float64
if fv.Len() == 0 { if fv.Len() == 0 {
nan := math.NaN() nan := math.NaN()
return &nan return &nan
} }
for i := 0; i < fv.Len(); i++ { for i := 0; i < fv.Len(); i++ {
if v, ok := fv.At(i).(*float64); ok { v := fv.GetValue(i)
if v == nil || math.IsNaN(*v) { if v == nil || math.IsNaN(*v) {
nan := math.NaN() nan := math.NaN()
return &nan return &nan
} }
if i == 0 || *v > f { if i == 0 || *v > f {
f = *v f = *v
}
} }
} }
return &f return &f
} }
func Count(fv *data.Field) *float64 { func Count(fv *Float64Field) *float64 {
f := float64(fv.Len()) f := float64(fv.Len())
return &f return &f
} }
@ -80,18 +77,19 @@ func (s Series) Reduce(refID, rFunc string) (Number, error) {
} }
number := NewNumber(refID, l) number := NewNumber(refID, l)
var f *float64 var f *float64
fVec := s.Frame.Fields[1] fVec := s.Frame.Fields[s.ValueIdx]
floatField := Float64Field(*fVec)
switch rFunc { switch rFunc {
case "sum": case "sum":
f = Sum(fVec) f = Sum(&floatField)
case "mean": case "mean":
f = Avg(fVec) f = Avg(&floatField)
case "min": case "min":
f = Min(fVec) f = Min(&floatField)
case "max": case "max":
f = Max(fVec) f = Max(&floatField)
case "count": case "count":
f = Count(fVec) f = Count(&floatField)
default: default:
return number, fmt.Errorf("reduction %v not implemented", rFunc) return number, fmt.Errorf("reduction %v not implemented", rFunc)
} }

View File

@ -178,6 +178,19 @@ func TestSeriesReduce(t *testing.T) {
}, },
}, },
}, },
{
name: "mean series (non-null-value)",
red: "mean",
varToReduce: "A",
vars: aSeriesNoNull,
errIs: require.NoError,
resultsIs: require.Equal,
results: Results{
[]Value{
makeNumber("", nil, float64Pointer(1.5)),
},
},
},
{ {
name: "count empty series", name: "count empty series",
red: "count", red: "count",

View File

@ -14,7 +14,7 @@ func (s Series) Resample(refID string, interval time.Duration, downsampler strin
if newSeriesLength <= 0 { if newSeriesLength <= 0 {
return s, fmt.Errorf("the series cannot be sampled further; the time range is shorter than the interval") return s, fmt.Errorf("the series cannot be sampled further; the time range is shorter than the interval")
} }
resampled := NewSeries(refID, s.GetLabels(), s.TimeIdx, s.TimeIsNullable, s.ValueIdx, s.ValueIsNullable, newSeriesLength+1) resampled := NewSeries(refID, s.GetLabels(), s.TimeIdx, true, s.ValueIdx, true, newSeriesLength+1)
bookmark := 0 bookmark := 0
var lastSeen *float64 var lastSeen *float64
idx := 0 idx := 0
@ -57,16 +57,17 @@ func (s Series) Resample(refID string, interval time.Duration, downsampler strin
} }
} else { // downsampling } else { // downsampling
fVec := data.NewField("", s.GetLabels(), vals) fVec := data.NewField("", s.GetLabels(), vals)
ff := Float64Field(*fVec)
var tmp *float64 var tmp *float64
switch downsampler { switch downsampler {
case "sum": case "sum":
tmp = Sum(fVec) tmp = Sum(&ff)
case "mean": case "mean":
tmp = Avg(fVec) tmp = Avg(&ff)
case "min": case "min":
tmp = Min(fVec) tmp = Min(&ff)
case "max": case "max":
tmp = Max(fVec) tmp = Max(&ff)
default: default:
return s, fmt.Errorf("downsampling %v not implemented", downsampler) return s, fmt.Errorf("downsampling %v not implemented", downsampler)
} }

View File

@ -77,6 +77,34 @@ func TestResampleSeries(t *testing.T) {
unixTimePointer(15, 0), float64Pointer(2), unixTimePointer(15, 0), float64Pointer(2),
}), }),
}, },
{
name: "resample series: downsampling (mean / pad) (no-nullable)",
interval: time.Second * 5,
downsampler: "mean",
upsampler: "pad",
timeRange: backend.TimeRange{
From: time.Unix(0, 0),
To: time.Unix(16, 0),
},
seriesToResample: makeNoNullSeries("", nil, noNullTP{
time.Unix(2, 0), 2,
}, noNullTP{
time.Unix(4, 0), 3,
}, noNullTP{
time.Unix(7, 0), 1,
}, noNullTP{
time.Unix(9, 0), 2,
}),
series: makeSeriesNullableTime("", nil, nullTimeTP{
unixTimePointer(0, 0), nil,
}, nullTimeTP{
unixTimePointer(5, 0), float64Pointer(2.5),
}, nullTimeTP{
unixTimePointer(10, 0), float64Pointer(1.5),
}, nullTimeTP{
unixTimePointer(15, 0), float64Pointer(2),
}),
},
{ {
name: "resample series: downsampling (max / fillna)", name: "resample series: downsampling (max / fillna)",
interval: time.Second * 5, interval: time.Second * 5,

View File

@ -123,3 +123,23 @@ func (n Number) GetMeta() interface{} {
func (n Number) SetMeta(v interface{}) { func (n Number) SetMeta(v interface{}) {
n.Frame.SetMeta(&data.FrameMeta{Custom: v}) n.Frame.SetMeta(&data.FrameMeta{Custom: v})
} }
// FloatField is a *float64 or a float64 data.Field with methods to always
// get a *float64.
type Float64Field data.Field
// GetValue returns the value at idx as *float64.
func (ff *Float64Field) GetValue(idx int) *float64 {
field := data.Field(*ff)
if field.Type() == data.FieldTypeNullableFloat64 {
return field.At(idx).(*float64)
}
f := field.At(idx).(float64)
return &f
}
// Len returns the the length of the field.
func (ff *Float64Field) Len() int {
df := data.Field(*ff)
return df.Len()
}