SSE/Alerting: Support prom instant vector responses (#44865)

* SSE/Alerting: (Draft) Support prom instant vector responses
fixes #35663
* reduce\classic expressions to handle mathexp.Number
* use Notice for warning

Co-authored-by: Yuriy Tseretyan <yuriy.tseretyan@grafana.com>
This commit is contained in:
Kyle Brandt 2022-05-23 10:08:14 -04:00 committed by GitHub
parent 0215195e6d
commit 01ef899753
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 245 additions and 19 deletions

View File

@ -7,6 +7,7 @@ import (
"strconv" "strconv"
"github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/expr/mathexp" "github.com/grafana/grafana/pkg/expr/mathexp"
) )
@ -99,13 +100,21 @@ func (ccc *ConditionsCmd) Execute(ctx context.Context, vars mathexp.Vars) (mathe
nilReducedCount := 0 nilReducedCount := 0
firingCount := 0 firingCount := 0
for _, val := range querySeriesSet.Values { for _, val := range querySeriesSet.Values {
series, ok := val.(mathexp.Series) var reducedNum mathexp.Number
if !ok { var name string
switch v := val.(type) {
case mathexp.Series:
reducedNum = c.Reducer.Reduce(v)
name = v.GetName()
case mathexp.Number:
reducedNum = v
if len(v.Frame.Fields) > 0 {
name = v.Frame.Fields[0].Name
}
default:
return newRes, fmt.Errorf("can only reduce type series, got type %v", val.Type()) return newRes, fmt.Errorf("can only reduce type series, got type %v", val.Type())
} }
reducedNum := c.Reducer.Reduce(series)
// TODO handle error / no data signals // TODO handle error / no data signals
thisCondNoDataFound := reducedNum.GetFloat64Value() == nil thisCondNoDataFound := reducedNum.GetFloat64Value() == nil
@ -118,7 +127,7 @@ func (ccc *ConditionsCmd) Execute(ctx context.Context, vars mathexp.Vars) (mathe
if evalRes { if evalRes {
match := EvalMatch{ match := EvalMatch{
Value: reducedNum.GetFloat64Value(), Value: reducedNum.GetFloat64Value(),
Metric: series.GetName(), Metric: name,
} }
if reducedNum.GetLabels() != nil { if reducedNum.GetLabels() != nil {
match.Labels = reducedNum.GetLabels().Copy() match.Labels = reducedNum.GetLabels().Copy()

View File

@ -6,9 +6,10 @@ import (
"testing" "testing"
"github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/expr/mathexp"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
ptr "github.com/xorcare/pointer" ptr "github.com/xorcare/pointer"
"github.com/grafana/grafana/pkg/expr/mathexp"
) )
func TestUnmarshalConditionCMD(t *testing.T) { func TestUnmarshalConditionCMD(t *testing.T) {
@ -346,6 +347,37 @@ func TestConditionsCmdExecute(t *testing.T) {
return v return v
}, },
}, },
{
name: "should accept numbers",
vars: mathexp.Vars{
"A": mathexp.Results{
Values: []mathexp.Value{
valBasedNumber(ptr.Float64(5)),
valBasedNumber(ptr.Float64(10)),
valBasedNumber(ptr.Float64(15)),
},
},
},
conditionsCmd: &ConditionsCmd{
Conditions: []condition{
{
QueryRefID: "A",
Reducer: classicReducer("avg"),
Operator: "and",
Evaluator: &thresholdEvaluator{"gt", 1},
},
},
},
resultNumber: func() mathexp.Number {
v := valBasedNumber(ptr.Float64(1))
v.SetMeta([]EvalMatch{
{Value: ptr.Float64(5)},
{Value: ptr.Float64(10)},
{Value: ptr.Float64(15)},
})
return v
},
},
} }
for _, tt := range tests { for _, tt := range tests {

View File

@ -7,6 +7,7 @@ import (
"time" "time"
"github.com/grafana/grafana-plugin-sdk-go/backend/gtime" "github.com/grafana/grafana-plugin-sdk-go/backend/gtime"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/expr/mathexp" "github.com/grafana/grafana/pkg/expr/mathexp"
) )
@ -152,18 +153,27 @@ func (gr *ReduceCommand) NeedsVars() []string {
// Execute runs the command and returns the results or an error if the command // Execute runs the command and returns the results or an error if the command
// failed to execute. // failed to execute.
func (gr *ReduceCommand) Execute(ctx context.Context, vars mathexp.Vars) (mathexp.Results, error) { func (gr *ReduceCommand) Execute(_ context.Context, vars mathexp.Vars) (mathexp.Results, error) {
newRes := mathexp.Results{} newRes := mathexp.Results{}
for _, val := range vars[gr.VarToReduce].Values { for _, val := range vars[gr.VarToReduce].Values {
series, ok := val.(mathexp.Series) switch v := val.(type) {
if !ok { case mathexp.Series:
num, err := v.Reduce(gr.refID, gr.Reducer, gr.seriesMapper)
if err != nil {
return newRes, err
}
newRes.Values = append(newRes.Values, num)
case mathexp.Number: // if incoming vars is just a number, any reduce op is just a noop, add it as it is
copyV := mathexp.NewNumber(gr.refID, v.GetLabels())
copyV.SetValue(v.GetFloat64Value())
copyV.AddNotice(data.Notice{
Severity: data.NoticeSeverityWarning,
Text: fmt.Sprintf("Reduce operation is not needed. Input query or expression %s is already reduced data.", gr.VarToReduce),
})
newRes.Values = append(newRes.Values, copyV)
default:
return newRes, fmt.Errorf("can only reduce type series, got type %v", val.Type()) return newRes, fmt.Errorf("can only reduce type series, got type %v", val.Type())
} }
num, err := series.Reduce(gr.refID, gr.Reducer, gr.seriesMapper)
if err != nil {
return newRes, err
}
newRes.Values = append(newRes.Values, num)
} }
return newRes, nil return newRes, nil
} }

View File

@ -1,13 +1,18 @@
package expr package expr
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"math/rand"
"testing" "testing"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
ptr "github.com/xorcare/pointer"
"github.com/grafana/grafana/pkg/expr/mathexp" "github.com/grafana/grafana/pkg/expr/mathexp"
"github.com/grafana/grafana/pkg/util"
) )
func Test_UnmarshalReduceCommand_Settings(t *testing.T) { func Test_UnmarshalReduceCommand_Settings(t *testing.T) {
@ -89,3 +94,51 @@ func Test_UnmarshalReduceCommand_Settings(t *testing.T) {
}) })
} }
} }
func TestReduceExecute(t *testing.T) {
varToReduce := util.GenerateShortUID()
cmd, err := NewReduceCommand(util.GenerateShortUID(), randomReduceFunc(), varToReduce, nil)
require.NoError(t, err)
t.Run("should noop if Number", func(t *testing.T) {
var numbers mathexp.Values = []mathexp.Value{
mathexp.GenerateNumber(ptr.Float64(rand.Float64())),
mathexp.GenerateNumber(ptr.Float64(rand.Float64())),
mathexp.GenerateNumber(ptr.Float64(rand.Float64())),
}
vars := map[string]mathexp.Results{
varToReduce: {
Values: numbers,
},
}
execute, err := cmd.Execute(context.Background(), vars)
require.NoError(t, err)
require.Len(t, execute.Values, len(numbers))
for i, value := range execute.Values {
expected := numbers[i]
require.Equal(t, expected.Type(), value.Type())
require.Equal(t, expected.GetLabels(), value.GetLabels())
expectedValue := expected.Value().(*mathexp.Number).GetFloat64Value()
actualValue := value.Value().(*mathexp.Number).GetFloat64Value()
require.Equal(t, expectedValue, actualValue)
}
t.Run("should add warn notices to every frame", func(t *testing.T) {
frames := execute.Values.AsDataFrames("test")
for _, frame := range frames {
require.Len(t, frame.Meta.Notices, 1)
notice := frame.Meta.Notices[0]
require.Equal(t, data.NoticeSeverityWarning, notice.Severity)
}
})
})
}
func randomReduceFunc() string {
res := mathexp.GetSupportedReduceFuncs()
return res[rand.Intn(len(res)-1)]
}

View File

@ -100,6 +100,11 @@ func GetReduceFunc(rFunc string) (ReducerFunc, error) {
} }
} }
// GetSupportedReduceFuncs returns collection of supported function names
func GetSupportedReduceFuncs() []string {
return []string{"sum", "mean", "min", "max", "count", "last"}
}
// Reduce turns the Series into a Number based on the given reduction function // Reduce turns the Series into a Number based on the given reduction function
// if ReduceMapper is defined it applies it to the provided series and performs reduction of the resulting series. // if ReduceMapper is defined it applies it to the provided series and performs reduction of the resulting series.
// Otherwise, the reduction operation is done against the original series. // Otherwise, the reduction operation is done against the original series.

View File

@ -0,0 +1,18 @@
package mathexp
import (
"math/rand"
"github.com/grafana/grafana/pkg/util"
)
func GenerateNumber(value *float64) Number {
size := rand.Intn(5)
labels := make(map[string]string, size)
for i := 0; i < size; i++ {
labels[util.GenerateShortUID()] = util.GenerateShortUID()
}
result := NewNumber(util.GenerateShortUID(), labels)
result.SetValue(value)
return result
}

View File

@ -6,6 +6,7 @@ import (
"time" "time"
"github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/expr/mathexp/parse" "github.com/grafana/grafana/pkg/expr/mathexp/parse"
) )
@ -154,7 +155,21 @@ func (s Series) GetMeta() interface{} {
} }
func (s Series) SetMeta(v interface{}) { func (s Series) SetMeta(v interface{}) {
s.Frame.SetMeta(&data.FrameMeta{Custom: v}) m := s.Frame.Meta
if m == nil {
m = &data.FrameMeta{}
s.Frame.SetMeta(m)
}
m.Custom = v
}
func (s Series) AddNotice(notice data.Notice) {
m := s.Frame.Meta
if m == nil {
m = &data.FrameMeta{}
s.Frame.SetMeta(m)
}
m.Notices = append(m.Notices, notice)
} }
// AsDataFrame returns the underlying *data.Frame. // AsDataFrame returns the underlying *data.Frame.

View File

@ -2,6 +2,7 @@ package mathexp
import ( import (
"github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/expr/mathexp/parse" "github.com/grafana/grafana/pkg/expr/mathexp/parse"
) )
@ -33,6 +34,7 @@ type Value interface {
GetMeta() interface{} GetMeta() interface{}
SetMeta(interface{}) SetMeta(interface{})
AsDataFrame() *data.Frame AsDataFrame() *data.Frame
AddNotice(notice data.Notice)
} }
// Scalar is the type that holds a single number constant. // Scalar is the type that holds a single number constant.
@ -55,7 +57,21 @@ func (s Scalar) GetMeta() interface{} {
} }
func (s Scalar) SetMeta(v interface{}) { func (s Scalar) SetMeta(v interface{}) {
s.Frame.SetMeta(&data.FrameMeta{Custom: v}) m := s.Frame.Meta
if m == nil {
m = &data.FrameMeta{}
s.Frame.SetMeta(m)
}
m.Custom = v
}
func (s Scalar) AddNotice(notice data.Notice) {
m := s.Frame.Meta
if m == nil {
m = &data.FrameMeta{}
s.Frame.SetMeta(m)
}
m.Notices = append(m.Notices, notice)
} }
// AsDataFrame returns the underlying *data.Frame. // AsDataFrame returns the underlying *data.Frame.
@ -121,7 +137,21 @@ func (n Number) GetMeta() interface{} {
} }
func (n Number) SetMeta(v interface{}) { func (n Number) SetMeta(v interface{}) {
n.Frame.SetMeta(&data.FrameMeta{Custom: v}) m := n.Frame.Meta
if m == nil {
m = &data.FrameMeta{}
n.Frame.SetMeta(m)
}
m.Custom = v
}
func (n Number) AddNotice(notice data.Notice) {
m := n.Frame.Meta
if m == nil {
m = &data.FrameMeta{}
n.Frame.SetMeta(m)
}
m.Notices = append(m.Notices, notice)
} }
// FloatField is a *float64 or a float64 data.Field with methods to always // FloatField is a *float64 or a float64 data.Field with methods to always

View File

@ -8,6 +8,7 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/expr/classic" "github.com/grafana/grafana/pkg/expr/classic"
"github.com/grafana/grafana/pkg/expr/mathexp" "github.com/grafana/grafana/pkg/expr/mathexp"
"github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/log"
@ -236,6 +237,15 @@ func (dn *DSNode) Execute(ctx context.Context, vars mathexp.Vars, s *Service) (m
return mathexp.Results{}, QueryError{RefID: refID, Err: qr.Error} return mathexp.Results{}, QueryError{RefID: refID, Err: qr.Error}
} }
dataSource := dn.datasource.Type
if isAllFrameVectors(dataSource, qr.Frames) {
vals, err = framesToNumbers(qr.Frames)
if err != nil {
return mathexp.Results{}, fmt.Errorf("failed to read frames as numbers: %w", err)
}
return mathexp.Results{Values: vals}, nil
}
if len(qr.Frames) == 1 { if len(qr.Frames) == 1 {
frame := qr.Frames[0] frame := qr.Frames[0]
if frame.TimeSeriesSchema().Type == data.TimeSeriesTypeNot && isNumberTable(frame) { if frame.TimeSeriesSchema().Type == data.TimeSeriesTypeNot && isNumberTable(frame) {
@ -254,7 +264,6 @@ func (dn *DSNode) Execute(ctx context.Context, vars mathexp.Vars, s *Service) (m
} }
} }
dataSource := dn.datasource.Type
for _, frame := range qr.Frames { for _, frame := range qr.Frames {
logger.Debug("expression datasource query (seriesSet)", "query", refID) logger.Debug("expression datasource query (seriesSet)", "query", refID)
// Check for TimeSeriesTypeNot in InfluxDB queries. A data frame of this type will cause // Check for TimeSeriesTypeNot in InfluxDB queries. A data frame of this type will cause
@ -278,6 +287,51 @@ func (dn *DSNode) Execute(ctx context.Context, vars mathexp.Vars, s *Service) (m
}, nil }, nil
} }
func isAllFrameVectors(datasourceType string, frames data.Frames) bool {
if datasourceType != "prometheus" {
return false
}
allVector := false
for i, frame := range frames {
if frame.Meta != nil && frame.Meta.Custom != nil {
if sMap, ok := frame.Meta.Custom.(map[string]string); ok {
if sMap != nil {
if sMap["resultType"] == "vector" {
if i != 0 && !allVector {
break
}
allVector = true
}
}
}
}
}
return allVector
}
func framesToNumbers(frames data.Frames) ([]mathexp.Value, error) {
vals := make([]mathexp.Value, 0, len(frames))
for _, frame := range frames {
if frame == nil {
continue
}
if len(frame.Fields) == 2 && frame.Fields[0].Len() == 1 {
// Can there be zero Len Field results that are being skipped?
valueField := frame.Fields[1]
if valueField.Type().Numeric() { // should be []float64
val, err := valueField.FloatAt(0) // FloatAt should not err if numeric
if err != nil {
return nil, fmt.Errorf("failed to read value of frame [%v] (RefID %v) of type [%v] as float: %w", frame.Name, frame.RefID, valueField.Type(), err)
}
n := mathexp.NewNumber(frame.Name, valueField.Labels)
n.SetValue(&val)
vals = append(vals, n)
}
}
}
return vals, nil
}
func isNumberTable(frame *data.Frame) bool { func isNumberTable(frame *data.Frame) bool {
if frame == nil || frame.Fields == nil { if frame == nil || frame.Fields == nil {
return false return false

View File

@ -587,7 +587,7 @@ func newDataFrame(name string, typ string, fields ...*data.Field) *data.Frame {
frame.Meta = &data.FrameMeta{ frame.Meta = &data.FrameMeta{
Type: data.FrameTypeTimeSeriesMany, Type: data.FrameTypeTimeSeriesMany,
Custom: map[string]string{ Custom: map[string]string{
"resultType": typ, "resultType": typ, // Note: SSE depends on this property and map type
}, },
} }