SSE: Mode to drop NaN/Inf/Null in Reduction operations (#43583)

Co-authored-by: Yuriy Tseretyan <yuriy.tseretyan@grafana.com>
Co-authored-by: gillesdemey <gilles.de.mey@gmail.com>
This commit is contained in:
Kyle Brandt
2022-02-02 08:50:44 -05:00
committed by GitHub
parent 0cb3037b55
commit 040ce40113
14 changed files with 627 additions and 100 deletions

View File

@@ -133,18 +133,14 @@ func (e *State) unarySeries(s Series, op string) (Series, error) {
for i := 0; i < s.Len(); i++ {
t, f := s.GetPoint(i)
if f == nil {
if err := newSeries.SetPoint(i, t, nil); err != nil {
return newSeries, err
}
newSeries.SetPoint(i, t, nil)
continue
}
newF, err := unaryOp(op, *f)
if err != nil {
return newSeries, err
}
if err := newSeries.SetPoint(i, t, &newF); err != nil {
return newSeries, err
}
newSeries.SetPoint(i, t, &newF)
}
return newSeries, nil
}
@@ -437,9 +433,7 @@ func (e *State) biSeriesNumber(labels data.Labels, op string, s Series, scalarVa
nF := math.NaN()
t, f := s.GetPoint(i)
if f == nil || scalarVal == nil {
if err := newSeries.SetPoint(i, t, nil); err != nil {
return newSeries, err
}
newSeries.SetPoint(i, t, nil)
continue
}
if seriesFirst {
@@ -450,9 +444,7 @@ func (e *State) biSeriesNumber(labels data.Labels, op string, s Series, scalarVa
if err != nil {
return newSeries, err
}
if err := newSeries.SetPoint(i, t, &nF); err != nil {
return newSeries, err
}
newSeries.SetPoint(i, t, &nF)
}
return newSeries, nil
}
@@ -475,18 +467,14 @@ func (e *State) biSeriesSeries(labels data.Labels, op string, aSeries, bSeries S
continue
}
if aF == nil || bF == nil {
if err := newSeries.AppendPoint(aIdx, aTime, nil); err != nil {
return newSeries, err
}
newSeries.AppendPoint(aTime, nil)
continue
}
nF, err := binaryOp(op, *aF, *bF)
if err != nil {
return newSeries, err
}
if err := newSeries.AppendPoint(aIdx, aTime, &nF); err != nil {
return newSeries, err
}
newSeries.AppendPoint(aTime, &nF)
}
return newSeries, nil
}

View File

@@ -116,28 +116,24 @@ func TestNumberExpr(t *testing.T) {
results: Results{[]Value{makeNumber("", nil, float64Pointer(-2.0))}},
},
{
name: "binary: Scalar Op Number (Number will nil val) - currently Panics",
name: "binary: Scalar Op Number (Number will nil val) returns nil",
expr: "1 + $A",
newErrIs: assert.NoError,
execErrIs: assert.NoError,
resultIs: assert.Equal,
vars: Vars{"A": Results{[]Value{makeNumber("", nil, nil)}}},
willPanic: true,
results: Results{[]Value{makeNumber("", nil, nil)}},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testBlock := func() {
e, err := New(tt.expr)
tt.newErrIs(t, err)
if e != nil {
res, err := e.Execute("", tt.vars)
tt.execErrIs(t, err)
tt.resultIs(t, tt.results, res)
}
}
if tt.willPanic {
assert.Panics(t, testBlock)
} else {
assert.NotPanics(t, testBlock)
e, err := New(tt.expr)
tt.newErrIs(t, err)
if e != nil {
res, err := e.Execute("", tt.vars)
tt.execErrIs(t, err)
tt.resultIs(t, tt.results, res)
}
})
}

View File

@@ -16,10 +16,7 @@ type tp struct {
func makeSeries(name string, labels data.Labels, points ...tp) Series {
newSeries := NewSeries(name, labels, len(points))
for idx, p := range points {
err := newSeries.SetPoint(idx, p.t, p.f)
if err != nil {
panic(err)
}
newSeries.SetPoint(idx, p.t, p.f)
}
return newSeries
}

View File

@@ -226,9 +226,7 @@ func perFloat(e *State, val Value, floatF func(x float64) float64) (Value, error
if f != nil {
nF = floatF(*f)
}
if err := newSeries.SetPoint(i, t, &nF); err != nil {
return newSeries, err
}
newSeries.SetPoint(i, t, &nF)
}
newVal = newSeries
default:
@@ -257,9 +255,7 @@ func perNullableFloat(e *State, val Value, floatF func(x *float64) *float64) (Va
newSeries := NewSeries(e.RefID, resSeries.GetLabels(), resSeries.Len())
for i := 0; i < resSeries.Len(); i++ {
t, f := resSeries.GetPoint(i)
if err := newSeries.SetPoint(i, t, floatF(f)); err != nil {
return newSeries, err
}
newSeries.SetPoint(i, t, floatF(f))
}
newVal = newSeries
default:

View File

@@ -3,10 +3,13 @@ package mathexp
import (
"fmt"
"math"
"strings"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
type ReducerFunc = func(fv *Float64Field) *float64
func Sum(fv *Float64Field) *float64 {
var sum float64
for i := 0; i < fv.Len(); i++ {
@@ -75,38 +78,114 @@ func Last(fv *Float64Field) *float64 {
f = math.NaN()
return &f
}
v := fv.GetValue(fv.Len() - 1)
f = *v
return &f
return fv.GetValue(fv.Len() - 1)
}
func GetReduceFunc(rFunc string) (ReducerFunc, error) {
switch strings.ToLower(rFunc) {
case "sum":
return Sum, nil
case "mean":
return Avg, nil
case "min":
return Min, nil
case "max":
return Max, nil
case "count":
return Count, nil
case "last":
return Last, nil
default:
return nil, fmt.Errorf("reduction %v not implemented", rFunc)
}
}
// Reduce turns the Series into a Number based on the given reduction function
func (s Series) Reduce(refID, rFunc string) (Number, error) {
// if ReduceMapper is defined it applies it to the provided series and performs reduction of the resulting series.
// Otherwise, the reduction operation is done against the original series.
func (s Series) Reduce(refID, rFunc string, mapper ReduceMapper) (Number, error) {
var l data.Labels
if s.GetLabels() != nil {
l = s.GetLabels().Copy()
}
number := NewNumber(refID, l)
var f *float64
fVec := s.Frame.Fields[seriesTypeValIdx]
series := s
if mapper != nil {
series = mapSeries(s, mapper)
}
fVec := series.Frame.Fields[seriesTypeValIdx]
floatField := Float64Field(*fVec)
switch rFunc {
case "sum":
f = Sum(&floatField)
case "mean":
f = Avg(&floatField)
case "min":
f = Min(&floatField)
case "max":
f = Max(&floatField)
case "count":
f = Count(&floatField)
case "last":
f = Last(&floatField)
default:
return number, fmt.Errorf("reduction %v not implemented", rFunc)
reduceFunc, err := GetReduceFunc(rFunc)
if err != nil {
return number, err
}
f = reduceFunc(&floatField)
if f != nil && mapper != nil {
f = mapper.MapOutput(f)
}
number.SetValue(f)
return number, nil
}
type ReduceMapper interface {
MapInput(s *float64) *float64
MapOutput(v *float64) *float64
}
// mapSeries creates a series where all points are mapped using the provided map function ReduceMapper.MapInput
func mapSeries(s Series, mapper ReduceMapper) Series {
newSeries := NewSeries(s.Frame.RefID, s.GetLabels(), 0)
for i := 0; i < s.Len(); i++ {
f := s.GetValue(i)
f = mapper.MapInput(f)
if f == nil {
continue
}
newFloat := *f
newSeries.AppendPoint(s.GetTime(i), &newFloat)
}
return newSeries
}
type DropNonNumber struct {
}
// MapInput returns nil if the input parameter is nil or point to either a NaN or a Inf
func (d DropNonNumber) MapInput(s *float64) *float64 {
if s == nil || math.IsNaN(*s) || math.IsInf(*s, 0) {
return nil
}
return s
}
// MapOutput returns nil if the input parameter is nil or point to either a NaN or a Inf
func (d DropNonNumber) MapOutput(s *float64) *float64 {
if s != nil && math.IsNaN(*s) {
return nil
}
return s
}
type ReplaceNonNumberWithValue struct {
Value float64
}
// MapInput returns a pointer to ReplaceNonNumberWithValue.Value if input parameter is nil or points to either a NaN or an Inf.
// Otherwise, returns the input pointer as is.
func (r ReplaceNonNumberWithValue) MapInput(v *float64) *float64 {
if v == nil || math.IsNaN(*v) || math.IsInf(*v, 0) {
return &r.Value
} else {
return v
}
}
// MapOutput returns a pointer to ReplaceNonNumberWithValue.Value if input parameter is nil or points to either a NaN or an Inf.
// Otherwise, returns the input pointer as is.
func (r ReplaceNonNumberWithValue) MapOutput(s *float64) *float64 {
if s != nil && math.IsNaN(*s) {
return &r.Value
}
return s
}

View File

@@ -2,6 +2,7 @@ package mathexp
import (
"math"
"math/rand"
"testing"
"time"
@@ -227,6 +228,19 @@ func TestSeriesReduce(t *testing.T) {
},
},
},
{
name: "last null series",
red: "last",
varToReduce: "A",
vars: seriesWithNil,
errIs: require.NoError,
resultsIs: require.Equal,
results: Results{
[]Value{
makeNumber("", nil, nil),
},
},
},
}
for _, tt := range tests {
@@ -234,7 +248,7 @@ func TestSeriesReduce(t *testing.T) {
results := Results{}
seriesSet := tt.vars[tt.varToReduce]
for _, series := range seriesSet.Values {
ns, err := series.Value().(*Series).Reduce("", tt.red)
ns, err := series.Value().(*Series).Reduce("", tt.red, nil)
tt.errIs(t, err)
if err != nil {
return
@@ -251,3 +265,252 @@ func TestSeriesReduce(t *testing.T) {
})
}
}
var seriesNonNumbers = Vars{
"A": Results{
[]Value{
makeSeries("temp", nil,
tp{time.Unix(5, 0), NaN},
tp{time.Unix(10, 0), float64Pointer(math.Inf(-1))},
tp{time.Unix(15, 0), float64Pointer(math.Inf(1))},
tp{time.Unix(15, 0), nil}),
},
},
}
func TestSeriesReduceDropNN(t *testing.T) {
var tests = []struct {
name string
red string
vars Vars
varToReduce string
results Results
}{
{
name: "dropNN: sum series",
red: "sum",
varToReduce: "A",
vars: aSeries,
results: Results{
[]Value{
makeNumber("", nil, float64Pointer(3)),
},
},
},
{
name: "dropNN: sum series with a nil value",
red: "sum",
varToReduce: "A",
vars: seriesWithNil,
results: Results{
[]Value{
makeNumber("", nil, float64Pointer(2)),
},
},
},
{
name: "dropNN: sum empty series",
red: "sum",
varToReduce: "A",
vars: seriesEmpty,
results: Results{
[]Value{
makeNumber("", nil, float64Pointer(0)),
},
},
},
{
name: "dropNN: mean series with a nil value and real value",
red: "mean",
varToReduce: "A",
vars: seriesWithNil,
results: Results{
[]Value{
makeNumber("", nil, float64Pointer(2)),
},
},
},
{
name: "DropNN: mean empty series",
red: "mean",
varToReduce: "A",
vars: seriesEmpty,
results: Results{
[]Value{
makeNumber("", nil, nil),
},
},
},
{
name: "DropNN: mean series that becomes empty after filtering non-number",
red: "mean",
varToReduce: "A",
vars: seriesNonNumbers,
results: Results{
[]Value{
makeNumber("", nil, nil),
},
},
},
{
name: "DropNN: count empty series",
red: "count",
varToReduce: "A",
vars: seriesEmpty,
results: Results{
[]Value{
makeNumber("", nil, float64Pointer(0)),
},
},
},
{
name: "DropNN: count series with nil and value should only count real numbers",
red: "count",
varToReduce: "A",
vars: seriesWithNil,
results: Results{
[]Value{
makeNumber("", nil, float64Pointer(1)),
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
results := Results{}
seriesSet := tt.vars[tt.varToReduce]
for _, series := range seriesSet.Values {
ns, err := series.Value().(*Series).Reduce("", tt.red, DropNonNumber{})
require.NoError(t, err)
results.Values = append(results.Values, ns)
}
opt := cmp.Comparer(func(x, y float64) bool {
return (math.IsNaN(x) && math.IsNaN(y)) || x == y
})
options := append([]cmp.Option{opt}, data.FrameTestCompareOptions()...)
if diff := cmp.Diff(tt.results, results, options...); diff != "" {
t.Errorf("Result mismatch (-want +got):\n%s", diff)
}
})
}
}
func TestSeriesReduceReplaceNN(t *testing.T) {
replaceWith := rand.Float64()
var tests = []struct {
name string
red string
vars Vars
varToReduce string
results Results
}{
{
name: "replaceNN: sum series",
red: "sum",
varToReduce: "A",
vars: aSeries,
results: Results{
[]Value{
makeNumber("", nil, float64Pointer(3)),
},
},
},
{
name: "replaceNN: sum series with a nil value",
red: "sum",
varToReduce: "A",
vars: seriesWithNil,
results: Results{
[]Value{
makeNumber("", nil, float64Pointer(replaceWith+2)),
},
},
},
{
name: "replaceNN: sum empty series",
red: "sum",
varToReduce: "A",
vars: seriesEmpty,
results: Results{
[]Value{
makeNumber("", nil, float64Pointer(0)),
},
},
},
{
name: "replaceNN: mean series with a nil value and real value",
red: "mean",
varToReduce: "A",
vars: seriesWithNil,
results: Results{
[]Value{
makeNumber("", nil, float64Pointer((2+replaceWith)/2e0)),
},
},
},
{
name: "replaceNN: mean empty series",
red: "mean",
varToReduce: "A",
vars: seriesEmpty,
results: Results{
[]Value{
makeNumber("", nil, float64Pointer(replaceWith)),
},
},
},
{
name: "replaceNN: mean series that becomes empty after filtering non-number",
red: "mean",
varToReduce: "A",
vars: seriesNonNumbers,
results: Results{
[]Value{
makeNumber("", nil, float64Pointer(replaceWith)),
},
},
},
{
name: "replaceNN: count empty series",
red: "count",
varToReduce: "A",
vars: seriesEmpty,
results: Results{
[]Value{
makeNumber("", nil, float64Pointer(0)),
},
},
},
{
name: "replaceNN: count series with nil and value should only count real numbers",
red: "count",
varToReduce: "A",
vars: seriesWithNil,
results: Results{
[]Value{
makeNumber("", nil, float64Pointer(2)),
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
results := Results{}
seriesSet := tt.vars[tt.varToReduce]
for _, series := range seriesSet.Values {
ns, err := series.Value().(*Series).Reduce("", tt.red, ReplaceNonNumberWithValue{Value: replaceWith})
require.NoError(t, err)
results.Values = append(results.Values, ns)
}
opt := cmp.Comparer(func(x, y float64) bool {
return (math.IsNaN(x) && math.IsNaN(y)) || x == y
})
options := append([]cmp.Option{opt}, data.FrameTestCompareOptions()...)
if diff := cmp.Diff(tt.results, results, options...); diff != "" {
t.Errorf("Result mismatch (-want +got):\n%s", diff)
}
})
}
}

View File

@@ -72,9 +72,7 @@ func (s Series) Resample(refID string, interval time.Duration, downsampler strin
}
value = tmp
}
if err := resampled.SetPoint(idx, t, value); err != nil {
return resampled, err
}
resampled.SetPoint(idx, t, value)
t = t.Add(interval)
idx++
}

View File

@@ -166,17 +166,15 @@ func (s Series) GetPoint(pointIdx int) (time.Time, *float64) {
}
// SetPoint sets the time and value on the corresponding vectors at the specified index.
func (s Series) SetPoint(pointIdx int, t time.Time, f *float64) (err error) {
func (s Series) SetPoint(pointIdx int, t time.Time, f *float64) {
s.Frame.Fields[seriesTypeTimeIdx].Set(pointIdx, t)
s.Frame.Fields[seriesTypeValIdx].Set(pointIdx, f)
return
}
// AppendPoint appends a point (time/value).
func (s Series) AppendPoint(pointIdx int, t time.Time, f *float64) (err error) {
func (s Series) AppendPoint(t time.Time, f *float64) {
s.Frame.Fields[seriesTypeTimeIdx].Append(t)
s.Frame.Fields[seriesTypeValIdx].Append(f)
return
}
// Len returns the length of the series.
@@ -214,8 +212,8 @@ func (ss SortSeriesByTime) Len() int { return Series(ss).Len() }
func (ss SortSeriesByTime) Swap(i, j int) {
iTimeVal, iFVal := Series(ss).GetPoint(i)
jTimeVal, jFVal := Series(ss).GetPoint(j)
_ = Series(ss).SetPoint(j, iTimeVal, iFVal)
_ = Series(ss).SetPoint(i, jTimeVal, jFVal)
Series(ss).SetPoint(j, iTimeVal, iFVal)
Series(ss).SetPoint(i, jTimeVal, jFVal)
}
func (ss SortSeriesByTime) Less(i, j int) bool {