SSE: Add utility methods for HysteresisCommand (#79157)

* add GetCommandsFromPipeline
* refactor method GetCommandType to func GetExpressionCommandType
* add function to create fingerprint frames
* add function to determine whether raw query represents a hysteresis command and a function to patch it with loaded metrics
This commit is contained in:
Yuri Tseretyan 2023-12-11 15:40:31 -05:00 committed by GitHub
parent f69516bf47
commit bf8be46e6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 284 additions and 6 deletions

View File

@ -292,3 +292,23 @@ func buildGraphEdges(dp *simple.DirectedGraph, registry map[string]Node) error {
}
return nil
}
// GetCommandsFromPipeline traverses the pipeline and extracts all CMDNode commands that match the type
func GetCommandsFromPipeline[T Command](pipeline DataPipeline) []T {
var results []T
for _, p := range pipeline {
if p.NodeType() != TypeCMDNode {
continue
}
switch cmd := p.(type) {
case *CMDNode:
switch r := cmd.Command.(type) {
case T:
results = append(results, r)
}
default:
continue
}
}
return results
}

View File

@ -246,6 +246,41 @@ func TestServicebuildPipeLine(t *testing.T) {
}
}
func TestGetCommandsFromPipeline(t *testing.T) {
pipeline := DataPipeline{
&MLNode{},
&DSNode{},
&CMDNode{
baseNode: baseNode{},
CMDType: 0,
Command: &ReduceCommand{},
},
&CMDNode{
baseNode: baseNode{},
CMDType: 0,
Command: &ReduceCommand{},
},
&CMDNode{
baseNode: baseNode{},
CMDType: 0,
Command: &HysteresisCommand{},
},
}
t.Run("should find command that exists", func(t *testing.T) {
cmds := GetCommandsFromPipeline[*HysteresisCommand](pipeline)
require.Len(t, cmds, 1)
require.Equal(t, pipeline[4].(*CMDNode).Command, cmds[0])
})
t.Run("should find all commands that exist", func(t *testing.T) {
cmds := GetCommandsFromPipeline[*ReduceCommand](pipeline)
require.Len(t, cmds, 2)
})
t.Run("should not find all command that does not exist", func(t *testing.T) {
cmds := GetCommandsFromPipeline[*MathCommand](pipeline)
require.Len(t, cmds, 0)
})
}
func getRefIDOrder(nodes []Node) []string {
ids := make([]string, 0, len(nodes))
for _, n := range nodes {

View File

@ -120,3 +120,17 @@ func FingerprintsFromFrame(frame *data.Frame) (Fingerprints, error) {
}
return result, nil
}
// FingerprintsToFrame converts Fingerprints to data.Frame.
func FingerprintsToFrame(fingerprints Fingerprints) *data.Frame {
fp := make([]uint64, 0, len(fingerprints))
for fingerprint := range fingerprints {
fp = append(fp, uint64(fingerprint))
}
frame := data.NewFrame("", data.NewField("fingerprints", nil, fp))
frame.SetMeta(&data.FrameMeta{
Type: "fingerprints",
TypeVersion: data.FrameTypeVersion{1, 0},
})
return frame
}

View File

@ -186,3 +186,36 @@ func TestLoadedDimensionsFromFrame(t *testing.T) {
})
}
}
func TestFingerprintsToFrame(t *testing.T) {
testCases := []struct {
name string
input Fingerprints
expected Fingerprints
expectedError bool
}{
{
name: "when empty map",
input: Fingerprints{},
expected: Fingerprints{},
},
{
name: "when nil",
input: nil,
expected: Fingerprints{},
},
{
name: "when has values",
input: Fingerprints{1: {}, 2: {}, 3: {}, 4: {}, 5: {}},
expected: Fingerprints{1: {}, 2: {}, 3: {}, 4: {}, 5: {}},
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
frame := FingerprintsToFrame(testCase.input)
actual, err := FingerprintsFromFrame(frame)
require.NoError(t, err)
require.EqualValues(t, testCase.expected, actual)
})
}
}

View File

@ -3,6 +3,7 @@ package expr
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
@ -45,10 +46,10 @@ type rawNode struct {
idx int64
}
func (rn *rawNode) GetCommandType() (c CommandType, err error) {
rawType, ok := rn.Query["type"]
func GetExpressionCommandType(rawQuery map[string]any) (c CommandType, err error) {
rawType, ok := rawQuery["type"]
if !ok {
return c, fmt.Errorf("no expression command type in query for refId %v", rn.RefID)
return c, errors.New("no expression command type in query")
}
typeString, ok := rawType.(string)
if !ok {
@ -97,7 +98,7 @@ func (gn *CMDNode) Execute(ctx context.Context, now time.Time, vars mathexp.Vars
}
func buildCMDNode(rn *rawNode, toggles featuremgmt.FeatureToggles) (*CMDNode, error) {
commandType, err := rn.GetCommandType()
commandType, err := GetExpressionCommandType(rn.Query)
if err != nil {
return nil, fmt.Errorf("invalid command type in expression '%v': %w", rn.RefID, err)
}

View File

@ -3,6 +3,7 @@ package expr
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
@ -163,3 +164,64 @@ type ThresholdConditionJSON struct {
UnloadEvaluator *ConditionEvalJSON `json:"unloadEvaluator"`
LoadedDimensions *data.Frame `json:"loadedDimensions"`
}
// IsHysteresisExpression returns true if the raw model describes a hysteresis command:
// - field 'type' has value "threshold",
// - field 'conditions' is array of objects and has exactly one element
// - field 'conditions[0].unloadEvaluator is not nil
func IsHysteresisExpression(query map[string]any) bool {
c, err := getConditionForHysteresisCommand(query)
if err != nil {
return false
}
return c != nil
}
// SetLoadedDimensionsToHysteresisCommand mutates the input map and sets field "conditions[0].loadedMetrics" with the data frame created from the provided fingerprints.
func SetLoadedDimensionsToHysteresisCommand(query map[string]any, fingerprints Fingerprints) error {
condition, err := getConditionForHysteresisCommand(query)
if err != nil {
return err
}
if condition == nil {
return errors.New("not a hysteresis command")
}
fr := FingerprintsToFrame(fingerprints)
condition["loadedDimensions"] = fr
return nil
}
func getConditionForHysteresisCommand(query map[string]any) (map[string]any, error) {
t, err := GetExpressionCommandType(query)
if err != nil {
return nil, err
}
if t != TypeThreshold {
return nil, errors.New("not a threshold command")
}
c, ok := query["conditions"]
if !ok {
return nil, errors.New("invalid threshold command: expected field \"condition\"")
}
var condition map[string]any
switch arr := c.(type) {
case []any:
if len(arr) != 1 {
return nil, errors.New("invalid threshold command: field \"condition\" expected to have exactly 1 field")
}
switch m := arr[0].(type) {
case map[string]any:
condition = m
default:
return nil, errors.New("invalid threshold command: value of the first element of field \"condition\" expected to be an object")
}
default:
return nil, errors.New("invalid threshold command: field \"condition\" expected to be an array of objects")
}
_, ok = condition["unloadEvaluator"]
if !ok {
return nil, nil
}
return condition, nil
}

View File

@ -3,6 +3,7 @@ package expr
import (
"encoding/json"
"fmt"
"math"
"sort"
"testing"
@ -162,7 +163,7 @@ func TestUnmarshalThresholdCommand(t *testing.T) {
],
"type": "lt"
},
"loadedDimensions": {"schema":{"name":"test","meta":{"type":"fingerprints","typeVersion":[1,0]},"fields":[{"name":"fingerprints","type":"number","typeInfo":{"frame":"uint64"}}]},"data":{"values":[[1,2,3,4,5]]}}
"loadedDimensions": {"schema":{"name":"test","meta":{"type":"fingerprints","typeVersion":[1,0]},"fields":[{"name":"fingerprints","type":"number","typeInfo":{"frame":"uint64"}}]},"data":{"values":[[18446744073709551615,2,3,4,5]]}}
}
]
}`,
@ -186,7 +187,7 @@ func TestUnmarshalThresholdCommand(t *testing.T) {
return actual[i] < actual[j]
})
require.EqualValues(t, []uint64{1, 2, 3, 4, 5}, actual)
require.EqualValues(t, []uint64{2, 3, 4, 5, 18446744073709551615}, actual)
},
},
}
@ -330,3 +331,115 @@ func TestIsSupportedThresholdFunc(t *testing.T) {
})
}
}
func TestIsHysteresisExpression(t *testing.T) {
cases := []struct {
name string
input json.RawMessage
expected bool
}{
{
name: "false if it's empty",
input: json.RawMessage(`{}`),
expected: false,
},
{
name: "false if it is not threshold type",
input: json.RawMessage(`{ "type": "reduce" }`),
expected: false,
},
{
name: "false if no conditions",
input: json.RawMessage(`{ "type": "threshold" }`),
expected: false,
},
{
name: "false if many conditions",
input: json.RawMessage(`{ "type": "threshold", "conditions": [{}, {}] }`),
expected: false,
},
{
name: "false if condition is not an object",
input: json.RawMessage(`{ "type": "threshold", "conditions": ["test"] }`),
expected: false,
},
{
name: "false if condition is does not have unloadEvaluator",
input: json.RawMessage(`{ "type": "threshold", "conditions": [{}] }`),
expected: false,
},
{
name: "true type is threshold and a single condition has unloadEvaluator field",
input: json.RawMessage(`{ "type": "threshold", "conditions": [{ "unloadEvaluator" : {}}] }`),
expected: true,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
query := map[string]any{}
require.NoError(t, json.Unmarshal(tc.input, &query))
require.Equal(t, tc.expected, IsHysteresisExpression(query))
})
}
}
func TestSetLoadedDimensionsToHysteresisCommand(t *testing.T) {
cases := []struct {
name string
input json.RawMessage
}{
{
name: "error if model is empty",
input: json.RawMessage(`{}`),
},
{
name: "error if is not a threshold type",
input: json.RawMessage(`{ "type": "reduce" }`),
},
{
name: "error if threshold but no conditions",
input: json.RawMessage(`{ "type": "threshold" }`),
},
{
name: "error if threshold and many conditions",
input: json.RawMessage(`{ "type": "threshold", "conditions": [{}, {}] }`),
},
{
name: "error if condition is not an object",
input: json.RawMessage(`{ "type": "threshold", "conditions": ["test"] }`),
},
{
name: "error if condition does not have unloadEvaluator",
input: json.RawMessage(`{ "type": "threshold", "conditions": [{ "evaluator": { "params": [5], "type": "gt"}}], "expression": "A" }`),
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
query := map[string]any{}
require.NoError(t, json.Unmarshal(tc.input, &query))
err := SetLoadedDimensionsToHysteresisCommand(query, Fingerprints{math.MaxUint64: {}, 2: {}, 3: {}})
require.Error(t, err)
})
}
t.Run("when unloadEvaluator is set, mutates query with loaded dimensions", func(t *testing.T) {
fingerprints := Fingerprints{math.MaxUint64: {}, 2: {}, 3: {}}
input := json.RawMessage(`{ "type": "threshold", "conditions": [{ "evaluator": { "params": [5], "type": "gt" }, "unloadEvaluator" : {"params": [2], "type": "lt"}}], "expression": "A" }`)
query := map[string]any{}
require.NoError(t, json.Unmarshal(input, &query))
require.NoError(t, SetLoadedDimensionsToHysteresisCommand(query, fingerprints))
raw, err := json.Marshal(query)
require.NoError(t, err)
// Assert the query is set by unmarshalling the query because it's the easiest way to assert Fingerprints
cmd, err := UnmarshalThresholdCommand(&rawNode{
RefID: "B",
QueryRaw: raw,
}, featuremgmt.WithFeatures(featuremgmt.FlagRecoveryThreshold))
require.NoError(t, err)
require.Equal(t, fingerprints, cmd.(*HysteresisCommand).LoadedDimensions)
})
}