mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Live: remove json exact converter (#58282)
This commit is contained in:
parent
eb1cc80941
commit
a83fdc6b87
2
go.mod
2
go.mod
@ -32,7 +32,6 @@ require (
|
||||
github.com/cortexproject/cortex v1.10.1-0.20211014125347-85c378182d0d
|
||||
github.com/crewjam/saml v0.4.8
|
||||
github.com/denisenkom/go-mssqldb v0.12.0
|
||||
github.com/dop251/goja v0.0.0-20210804101310-32956a348b49
|
||||
github.com/fatih/color v1.13.0
|
||||
github.com/gchaincl/sqlhooks v1.3.0
|
||||
github.com/getsentry/sentry-go v0.13.0
|
||||
@ -75,7 +74,6 @@ require (
|
||||
github.com/mattn/go-sqlite3 v1.14.7
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.2
|
||||
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f
|
||||
github.com/ohler55/ojg v1.12.9
|
||||
github.com/opentracing/opentracing-go v1.2.0
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
|
||||
|
@ -1,163 +0,0 @@
|
||||
package pipeline
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/ohler55/ojg/jp"
|
||||
"github.com/ohler55/ojg/oj"
|
||||
)
|
||||
|
||||
// ExactJsonConverter can convert JSON to a single data.Frame according to
|
||||
// user-defined field configuration and value extraction rules.
|
||||
type ExactJsonConverter struct {
|
||||
config ExactJsonConverterConfig
|
||||
nowTimeFunc func() time.Time
|
||||
}
|
||||
|
||||
func NewExactJsonConverter(c ExactJsonConverterConfig) *ExactJsonConverter {
|
||||
return &ExactJsonConverter{config: c}
|
||||
}
|
||||
|
||||
const ConverterTypeJsonExact = "jsonExact"
|
||||
|
||||
func (c *ExactJsonConverter) Type() string {
|
||||
return ConverterTypeJsonExact
|
||||
}
|
||||
|
||||
func (c *ExactJsonConverter) Convert(_ context.Context, vars Vars, body []byte) ([]*ChannelFrame, error) {
|
||||
obj, err := oj.Parse(body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var fields []*data.Field
|
||||
|
||||
var initGojaOnce sync.Once
|
||||
var gojaRuntime *gojaRuntime
|
||||
|
||||
for _, f := range c.config.Fields {
|
||||
field := data.NewFieldFromFieldType(f.Type, 1)
|
||||
field.Name = f.Name
|
||||
field.Config = f.Config
|
||||
|
||||
if strings.HasPrefix(f.Value, "$") {
|
||||
// JSON path.
|
||||
fragments, err := jp.ParseString(f.Value[1:])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
values := fragments.Get(obj)
|
||||
if len(values) == 0 {
|
||||
field.Set(0, nil)
|
||||
} else if len(values) == 1 {
|
||||
val := values[0]
|
||||
switch f.Type {
|
||||
case data.FieldTypeNullableFloat64:
|
||||
if val == nil {
|
||||
field.Set(0, nil)
|
||||
} else {
|
||||
switch v := val.(type) {
|
||||
case float64:
|
||||
field.SetConcrete(0, v)
|
||||
case int64:
|
||||
field.SetConcrete(0, float64(v))
|
||||
default:
|
||||
return nil, fmt.Errorf("malformed float64 type for %s: %T", f.Name, v)
|
||||
}
|
||||
}
|
||||
case data.FieldTypeNullableString:
|
||||
v, ok := val.(string)
|
||||
if !ok {
|
||||
return nil, errors.New("malformed string type")
|
||||
}
|
||||
field.SetConcrete(0, v)
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported field type: %s (%s)", f.Type, f.Name)
|
||||
}
|
||||
} else {
|
||||
return nil, errors.New("too many values")
|
||||
}
|
||||
} else if strings.HasPrefix(f.Value, "{") {
|
||||
// Goja script.
|
||||
script := strings.Trim(f.Value, "{}")
|
||||
var err error
|
||||
initGojaOnce.Do(func() {
|
||||
gojaRuntime, err = getRuntime(body)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
switch f.Type {
|
||||
case data.FieldTypeNullableBool:
|
||||
v, err := gojaRuntime.getBool(script)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
field.SetConcrete(0, v)
|
||||
case data.FieldTypeNullableFloat64:
|
||||
v, err := gojaRuntime.getFloat64(script)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
field.SetConcrete(0, v)
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported field type: %s (%s)", f.Type, f.Name)
|
||||
}
|
||||
} else if f.Value == "#{now}" {
|
||||
// Variable.
|
||||
// TODO: make consistent with Grafana variables?
|
||||
nowTimeFunc := c.nowTimeFunc
|
||||
if nowTimeFunc == nil {
|
||||
nowTimeFunc = time.Now
|
||||
}
|
||||
field.SetConcrete(0, nowTimeFunc())
|
||||
}
|
||||
|
||||
labels := map[string]string{}
|
||||
for _, label := range f.Labels {
|
||||
if strings.HasPrefix(label.Value, "$") {
|
||||
fragments, err := jp.ParseString(label.Value[1:])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
values := fragments.Get(obj)
|
||||
if len(values) == 0 {
|
||||
labels[label.Name] = ""
|
||||
} else if len(values) == 1 {
|
||||
labels[label.Name] = fmt.Sprintf("%v", values[0])
|
||||
} else {
|
||||
return nil, errors.New("too many values for a label")
|
||||
}
|
||||
} else if strings.HasPrefix(label.Value, "{") {
|
||||
script := strings.Trim(label.Value, "{}")
|
||||
var err error
|
||||
initGojaOnce.Do(func() {
|
||||
gojaRuntime, err = getRuntime(body)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
v, err := gojaRuntime.getString(script)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
labels[label.Name] = v
|
||||
} else {
|
||||
labels[label.Name] = label.Value
|
||||
}
|
||||
}
|
||||
field.Labels = labels
|
||||
fields = append(fields, field)
|
||||
}
|
||||
|
||||
frame := data.NewFrame(vars.Path, fields...)
|
||||
return []*ChannelFrame{
|
||||
{Channel: "", Frame: frame},
|
||||
}, nil
|
||||
}
|
@ -1,96 +0,0 @@
|
||||
package pipeline
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/experimental"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func checkExactConversion(t *testing.T, file string, fields []Field) *backend.DataResponse {
|
||||
t.Helper()
|
||||
content := loadTestJson(t, file)
|
||||
|
||||
converter := NewExactJsonConverter(ExactJsonConverterConfig{
|
||||
Fields: fields,
|
||||
})
|
||||
converter.nowTimeFunc = func() time.Time {
|
||||
return time.Date(2021, 01, 01, 12, 12, 12, 0, time.UTC)
|
||||
}
|
||||
channelFrames, err := converter.Convert(context.Background(), Vars{}, content)
|
||||
require.NoError(t, err)
|
||||
|
||||
dr := &backend.DataResponse{}
|
||||
for _, cf := range channelFrames {
|
||||
require.Empty(t, cf.Channel)
|
||||
dr.Frames = append(dr.Frames, cf.Frame)
|
||||
}
|
||||
|
||||
experimental.CheckGoldenJSONResponse(t, "testdata", file+".golden", dr, *update)
|
||||
return dr
|
||||
}
|
||||
|
||||
func BenchmarkExactJsonConverter_Convert(b *testing.B) {
|
||||
content := loadTestJson(b, "json_exact")
|
||||
|
||||
converter := NewExactJsonConverter(ExactJsonConverterConfig{
|
||||
Fields: []Field{
|
||||
{
|
||||
Name: "ax",
|
||||
Value: "$.ax",
|
||||
Type: data.FieldTypeNullableFloat64,
|
||||
}, {
|
||||
Name: "array_value",
|
||||
Value: "$.string_array[0]",
|
||||
Type: data.FieldTypeNullableString,
|
||||
}, {
|
||||
Name: "map_key",
|
||||
Value: "$.map_with_floats['key1']",
|
||||
Type: data.FieldTypeNullableFloat64,
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err := converter.Convert(context.Background(), Vars{}, content)
|
||||
require.NoError(b, err)
|
||||
//require.Len(b, cf, 1)
|
||||
//require.Len(b, cf[0].Frame.Fields, 3)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExactJsonConverter_Convert(t *testing.T) {
|
||||
checkExactConversion(t, "json_exact", []Field{
|
||||
{
|
||||
Name: "time",
|
||||
Value: "#{now}",
|
||||
Type: data.FieldTypeTime,
|
||||
},
|
||||
{
|
||||
Name: "ax",
|
||||
Value: "$.ax",
|
||||
Type: data.FieldTypeNullableFloat64,
|
||||
},
|
||||
{
|
||||
Name: "key1",
|
||||
Value: "{x.map_with_floats.key1}",
|
||||
Type: data.FieldTypeNullableFloat64,
|
||||
Labels: []Label{
|
||||
{
|
||||
Name: "label1",
|
||||
Value: "{x.map_with_floats.key2.toString()}",
|
||||
},
|
||||
{
|
||||
Name: "label2",
|
||||
Value: "$.map_with_floats.key2",
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
@ -208,130 +208,6 @@ func (f *DevRuleBuilder) BuildRules(_ context.Context, _ int64) ([]*LiveChannelR
|
||||
NewManagedStreamFrameOutput(f.ManagedStream),
|
||||
},
|
||||
},
|
||||
{
|
||||
OrgId: 1,
|
||||
Pattern: "stream/json/exact",
|
||||
Converter: NewExactJsonConverter(ExactJsonConverterConfig{
|
||||
Fields: []Field{
|
||||
{
|
||||
Name: "time",
|
||||
Type: data.FieldTypeTime,
|
||||
Value: "#{now}",
|
||||
},
|
||||
{
|
||||
Name: "value1",
|
||||
Type: data.FieldTypeNullableFloat64,
|
||||
Value: "$.value1",
|
||||
},
|
||||
{
|
||||
Name: "value2",
|
||||
Type: data.FieldTypeNullableFloat64,
|
||||
Value: "$.value2",
|
||||
},
|
||||
{
|
||||
Name: "value3",
|
||||
Type: data.FieldTypeNullableFloat64,
|
||||
Value: "$.value3",
|
||||
Labels: []Label{
|
||||
{
|
||||
Name: "host",
|
||||
Value: "$.host",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "value4",
|
||||
Type: data.FieldTypeNullableFloat64,
|
||||
Value: "$.value4",
|
||||
Config: &data.FieldConfig{
|
||||
Thresholds: &data.ThresholdsConfig{
|
||||
Mode: data.ThresholdsModeAbsolute,
|
||||
Steps: []data.Threshold{
|
||||
{
|
||||
Value: 2,
|
||||
State: "normal",
|
||||
Color: "green",
|
||||
},
|
||||
{
|
||||
Value: 6,
|
||||
State: "warning",
|
||||
Color: "orange",
|
||||
},
|
||||
{
|
||||
Value: 8,
|
||||
State: "critical",
|
||||
Color: "red",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "map.red",
|
||||
Type: data.FieldTypeNullableFloat64,
|
||||
Value: "$.map.red",
|
||||
Labels: []Label{
|
||||
{
|
||||
Name: "host",
|
||||
Value: "$.host",
|
||||
},
|
||||
{
|
||||
Name: "host2",
|
||||
Value: "$.host",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "annotation",
|
||||
Type: data.FieldTypeNullableString,
|
||||
Value: "$.annotation",
|
||||
},
|
||||
{
|
||||
Name: "running",
|
||||
Type: data.FieldTypeNullableBool,
|
||||
Value: "{x.status === 'running'}",
|
||||
},
|
||||
{
|
||||
Name: "num_map_colors",
|
||||
Type: data.FieldTypeNullableFloat64,
|
||||
Value: "{Object.keys(x.map).length}",
|
||||
},
|
||||
},
|
||||
}),
|
||||
FrameOutputters: []FrameOutputter{
|
||||
NewManagedStreamFrameOutput(f.ManagedStream),
|
||||
NewRemoteWriteFrameOutput(
|
||||
os.Getenv("GF_LIVE_REMOTE_WRITE_ENDPOINT"),
|
||||
&BasicAuth{
|
||||
User: os.Getenv("GF_LIVE_REMOTE_WRITE_USER"),
|
||||
Password: os.Getenv("GF_LIVE_REMOTE_WRITE_PASSWORD"),
|
||||
},
|
||||
0,
|
||||
),
|
||||
NewChangeLogFrameOutput(f.FrameStorage, ChangeLogOutputConfig{
|
||||
FieldName: "value3",
|
||||
Channel: "stream/json/exact/value3/changes",
|
||||
}),
|
||||
NewChangeLogFrameOutput(f.FrameStorage, ChangeLogOutputConfig{
|
||||
FieldName: "annotation",
|
||||
Channel: "stream/json/exact/annotation/changes",
|
||||
}),
|
||||
NewConditionalOutput(
|
||||
NewMultipleFrameConditionChecker(
|
||||
ConditionAll,
|
||||
NewFrameNumberCompareCondition("value1", "gte", 3.0),
|
||||
NewFrameNumberCompareCondition("value2", "gte", 3.0),
|
||||
),
|
||||
NewRedirectFrameOutput(RedirectOutputConfig{
|
||||
Channel: "stream/json/exact/condition",
|
||||
}),
|
||||
),
|
||||
NewThresholdOutput(f.FrameStorage, ThresholdOutputConfig{
|
||||
FieldName: "value4",
|
||||
Channel: "stream/json/exact/value4/state",
|
||||
}),
|
||||
},
|
||||
},
|
||||
{
|
||||
OrgId: 1,
|
||||
Pattern: "stream/json/exact/value3/changes",
|
||||
|
@ -1,97 +0,0 @@
|
||||
package pipeline
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/dop251/goja"
|
||||
"github.com/dop251/goja/parser"
|
||||
)
|
||||
|
||||
func getRuntime(payload []byte) (*gojaRuntime, error) {
|
||||
vm := goja.New()
|
||||
vm.SetMaxCallStackSize(64)
|
||||
vm.SetParserOptions(parser.WithDisableSourceMaps)
|
||||
r := &gojaRuntime{vm}
|
||||
err := r.init(payload)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return r, nil
|
||||
}
|
||||
|
||||
type gojaRuntime struct {
|
||||
vm *goja.Runtime
|
||||
}
|
||||
|
||||
// Parse JSON once.
|
||||
func (r *gojaRuntime) init(payload []byte) error {
|
||||
err := r.vm.Set("__body", string(payload))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = r.runString(`var x = JSON.parse(__body)`)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *gojaRuntime) runString(script string) (goja.Value, error) {
|
||||
doneCh := make(chan struct{})
|
||||
go func() {
|
||||
select {
|
||||
case <-doneCh:
|
||||
return
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
// Some ideas to prevent misuse of scripts:
|
||||
// * parse/validate scripts on save
|
||||
// * block scripts after several timeouts in a row
|
||||
// * block scripts on malformed returned error
|
||||
// * limit total quota of time for scripts
|
||||
// * maybe allow only one statement, reject scripts with cycles and functions.
|
||||
r.vm.Interrupt(errors.New("timeout"))
|
||||
}
|
||||
}()
|
||||
defer close(doneCh)
|
||||
return r.vm.RunString(script)
|
||||
}
|
||||
|
||||
func (r *gojaRuntime) getBool(script string) (bool, error) {
|
||||
v, err := r.runString(script)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
num, ok := v.Export().(bool)
|
||||
if !ok {
|
||||
return false, errors.New("unexpected return value")
|
||||
}
|
||||
return num, nil
|
||||
}
|
||||
|
||||
func (r *gojaRuntime) getString(script string) (string, error) {
|
||||
v, err := r.runString(script)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
exportedVal := v.Export()
|
||||
stringVal, ok := exportedVal.(string)
|
||||
if !ok {
|
||||
return "", fmt.Errorf("unexpected return value: %v (%T), script: %s", exportedVal, exportedVal, script)
|
||||
}
|
||||
return stringVal, nil
|
||||
}
|
||||
|
||||
func (r *gojaRuntime) getFloat64(script string) (float64, error) {
|
||||
v, err := r.runString(script)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
exported := v.Export()
|
||||
switch v := exported.(type) {
|
||||
case float64:
|
||||
return v, nil
|
||||
case int64:
|
||||
return float64(v), nil
|
||||
default:
|
||||
return 0, fmt.Errorf("unexpected return value: %T", exported)
|
||||
}
|
||||
}
|
@ -1,55 +0,0 @@
|
||||
package pipeline
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/dop251/goja"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestGojaGetBool(t *testing.T) {
|
||||
r, err := getRuntime([]byte(`{"ax": true}`))
|
||||
require.NoError(t, err)
|
||||
val, err := r.getBool("x.ax")
|
||||
require.NoError(t, err)
|
||||
require.True(t, val)
|
||||
}
|
||||
|
||||
func TestGojaGetFloat64(t *testing.T) {
|
||||
r, err := getRuntime([]byte(`{"ax": 3}`))
|
||||
require.NoError(t, err)
|
||||
val, err := r.getFloat64("x.ax")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 3.0, val)
|
||||
}
|
||||
|
||||
func TestGojaGetString(t *testing.T) {
|
||||
r, err := getRuntime([]byte(`{"ax": "test"}`))
|
||||
require.NoError(t, err)
|
||||
val, err := r.getString("x.ax")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "test", val)
|
||||
}
|
||||
|
||||
func TestGojaInvalidReturnValue(t *testing.T) {
|
||||
r, err := getRuntime([]byte(`{"ax": "test"}`))
|
||||
require.NoError(t, err)
|
||||
_, err = r.getBool("x.ax")
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestGojaIInterrupt(t *testing.T) {
|
||||
r, err := getRuntime([]byte(`{}`))
|
||||
require.NoError(t, err)
|
||||
_, err = r.getBool("while (true) {}")
|
||||
var interrupted *goja.InterruptedError
|
||||
require.ErrorAs(t, err, &interrupted)
|
||||
}
|
||||
|
||||
func TestGojaIMaxStack(t *testing.T) {
|
||||
r, err := getRuntime([]byte(`{}`))
|
||||
require.NoError(t, err)
|
||||
_, err = r.getBool("function test() {test()}; test();")
|
||||
// TODO: strange <nil> error returned here, need to investigate what is it.
|
||||
require.Error(t, err)
|
||||
}
|
@ -55,10 +55,6 @@ var ConvertersRegistry = []EntityInfo{
|
||||
Type: ConverterTypeJsonAuto,
|
||||
Description: "automatic recursive JSON to Frame conversion",
|
||||
},
|
||||
{
|
||||
Type: ConverterTypeJsonExact,
|
||||
Description: "JSON to Frame conversion according to exact list of fields",
|
||||
},
|
||||
{
|
||||
Type: ConverterTypeInfluxAuto,
|
||||
Description: "accept influx line protocol",
|
||||
|
@ -58,11 +58,6 @@ func (f *StorageRuleBuilder) extractConverter(config *ConverterConfig) (Converte
|
||||
config.AutoJsonConverterConfig = &AutoJsonConverterConfig{}
|
||||
}
|
||||
return NewAutoJsonConverter(*config.AutoJsonConverterConfig), nil
|
||||
case ConverterTypeJsonExact:
|
||||
if config.ExactJsonConverterConfig == nil {
|
||||
return nil, missingConfiguration
|
||||
}
|
||||
return NewExactJsonConverter(*config.ExactJsonConverterConfig), nil
|
||||
case ConverterTypeJsonFrame:
|
||||
if config.JsonFrameConverterConfig == nil {
|
||||
config.JsonFrameConverterConfig = &JsonFrameConverterConfig{}
|
||||
|
Loading…
Reference in New Issue
Block a user