mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
SSE: Dataplane Compliance (#65927)
Takes a specific code path for data that identifies itself as dataplane instead of "guessing" what the data is. The data must identify itself by being in the dataplane by having both the following frame metadata properties: - TypeVersion property that is greater than 0.0 - 'Type' property The flag is disableSSEDataplane and disables this functionality and uses the old code for all queries regardless. See https://github.com/grafana/grafana-plugin-sdk-go/blob/main/data/contract_docs/contract.md for dataplane details.
This commit is contained in:
parent
b83627a661
commit
e78be44e1a
@ -104,6 +104,7 @@ Alpha features might be changed or removed without prior notice.
|
||||
| `alertStateHistoryLokiSecondary` | Enable Grafana to write alert state history to an external Loki instance in addition to Grafana annotations. |
|
||||
| `alertStateHistoryLokiPrimary` | Enable a remote Loki instance as the primary source for state history reads. |
|
||||
| `alertStateHistoryLokiOnly` | Disable Grafana alerts from emitting annotations when a remote Loki instance is available. |
|
||||
| `disableSSEDataplane` | Disables dataplane specific processing in server side expressions. |
|
||||
| `unifiedRequestLog` | Writes error logs to the request logger |
|
||||
| `pyroscopeFlameGraph` | Changes flame graph to pyroscope one |
|
||||
| `dataplaneFrontendFallback` | Support dataplane contract field name change for transformations and field name matchers where the name is different |
|
||||
|
2
go.mod
2
go.mod
@ -268,6 +268,7 @@ require (
|
||||
github.com/Masterminds/semver/v3 v3.1.1
|
||||
github.com/dave/dst v0.27.2
|
||||
github.com/go-jose/go-jose/v3 v3.0.0
|
||||
github.com/grafana/dataplane/sdata v0.0.6
|
||||
github.com/grafana/go-mssqldb v0.9.1
|
||||
github.com/grafana/kindsys v0.0.0-20230309200316-812b9884a375
|
||||
github.com/grafana/thema v0.0.0-20230302221249-6952e4a999b7
|
||||
@ -306,6 +307,7 @@ require (
|
||||
github.com/google/gofuzz v1.2.0 // indirect
|
||||
github.com/googleapis/enterprise-certificate-proxy v0.2.0 // indirect
|
||||
github.com/gophercloud/gophercloud v1.0.0 // indirect
|
||||
github.com/grafana/dataplane/examples v0.0.0-20230404174214-4d6fd58a18ad // indirect
|
||||
github.com/grafana/sqlds/v2 v2.3.10 // indirect
|
||||
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.1 // indirect
|
||||
|
10
go.sum
10
go.sum
@ -1281,6 +1281,16 @@ github.com/grafana/codejen v0.0.3 h1:tAWxoTUuhgmEqxJPOLtJoxlPBbMULFwKFOcRsPRPXDw
|
||||
github.com/grafana/codejen v0.0.3/go.mod h1:zmwwM/DRyQB7pfuBjTWII3CWtxcXh8LTwAYGfDfpR6s=
|
||||
github.com/grafana/cuetsy v0.1.6 h1:61QGIDy1rVABU3OkoarOn0+qPdGopIJr34PyWVmGDfs=
|
||||
github.com/grafana/cuetsy v0.1.6/go.mod h1:4KWkUOslwvRTpEv7wdQG0jDFTuJmU+0L9x0h4kWxa2A=
|
||||
github.com/grafana/dataplane/examples v0.0.0-20230322121339-5fb4468146dd h1:w7pLYpu9nJLUiam5Zw0VjC0QwHVCYBswdioc/cLr858=
|
||||
github.com/grafana/dataplane/examples v0.0.0-20230322121339-5fb4468146dd/go.mod h1:h5YwY8s407/17XF5/dS8XrUtsTVV2RnuW8+m1Mp46mg=
|
||||
github.com/grafana/dataplane/examples v0.0.0-20230404174214-4d6fd58a18ad h1:bROIOQEkRlxAt+Epn9Pos8ytkVqoIAvnaUZOAb7EgEU=
|
||||
github.com/grafana/dataplane/examples v0.0.0-20230404174214-4d6fd58a18ad/go.mod h1:h5YwY8s407/17XF5/dS8XrUtsTVV2RnuW8+m1Mp46mg=
|
||||
github.com/grafana/dataplane/sdata v0.0.4 h1:GumUrUHsdCkGTjXbzy5uBmjLYavUYHPZ18ad6U9Whuc=
|
||||
github.com/grafana/dataplane/sdata v0.0.4/go.mod h1:Jvs5ddpGmn6vcxT7tCTWAZ1mgi4sbcdFt9utQx5uMAU=
|
||||
github.com/grafana/dataplane/sdata v0.0.5 h1:RIfwNCd5777nSOyw5C7nRFlWhO2cjVJaHmKjUQNiTn4=
|
||||
github.com/grafana/dataplane/sdata v0.0.5/go.mod h1:Jvs5ddpGmn6vcxT7tCTWAZ1mgi4sbcdFt9utQx5uMAU=
|
||||
github.com/grafana/dataplane/sdata v0.0.6 h1:Ejlj8d1Hvy/uDLeI4sOvL34Y8WLlVDd9iN270F+8aTw=
|
||||
github.com/grafana/dataplane/sdata v0.0.6/go.mod h1:Jvs5ddpGmn6vcxT7tCTWAZ1mgi4sbcdFt9utQx5uMAU=
|
||||
github.com/grafana/dskit v0.0.0-20230202092222-880a7f8141cc h1:lQFgXpsZNDdi0whUROW15r/akzLIdXAn6xr5vqlZucI=
|
||||
github.com/grafana/dskit v0.0.0-20230202092222-880a7f8141cc/go.mod h1:ulYLLoSd71AWIjxgifLO86Lndx82Yj+IcV+fFnh8tkI=
|
||||
github.com/grafana/go-mssqldb v0.9.1 h1:3CqteWF0CadwXV9f3FxoI+i3uSW3azjTlQipyOJtWtQ=
|
||||
|
@ -89,6 +89,7 @@ export interface FeatureToggles {
|
||||
alertStateHistoryLokiSecondary?: boolean;
|
||||
alertStateHistoryLokiPrimary?: boolean;
|
||||
alertStateHistoryLokiOnly?: boolean;
|
||||
disableSSEDataplane?: boolean;
|
||||
unifiedRequestLog?: boolean;
|
||||
renderAuthJWT?: boolean;
|
||||
pyroscopeFlameGraph?: boolean;
|
||||
|
129
pkg/expr/dataplane.go
Normal file
129
pkg/expr/dataplane.go
Normal file
@ -0,0 +1,129 @@
|
||||
package expr
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/grafana/dataplane/sdata"
|
||||
"github.com/grafana/dataplane/sdata/numeric"
|
||||
"github.com/grafana/dataplane/sdata/reader"
|
||||
"github.com/grafana/dataplane/sdata/timeseries"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana/pkg/expr/mathexp"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
)
|
||||
|
||||
func shouldUseDataplane(frames data.Frames, logger *log.ConcreteLogger, disable bool) (dt data.FrameType, b bool, e error) {
|
||||
if disable {
|
||||
return
|
||||
}
|
||||
dt = data.FrameTypeUnknown
|
||||
if len(frames) == 0 {
|
||||
return data.FrameTypeUnknown, true, nil
|
||||
}
|
||||
|
||||
firstFrame := frames[0]
|
||||
if firstFrame == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if firstFrame.Meta == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if firstFrame.Meta.Type == "" {
|
||||
return
|
||||
}
|
||||
|
||||
if firstFrame.Meta.TypeVersion.IsZero() {
|
||||
return
|
||||
}
|
||||
|
||||
dt, err := reader.CanReadBasedOnMeta(frames)
|
||||
if err != nil {
|
||||
var vw *sdata.VersionWarning
|
||||
if errors.As(err, &vw) {
|
||||
logger.Warn("Attempting to read mismatched version dataplane data", "error", err, "datatype", dt)
|
||||
return dt, true, nil
|
||||
}
|
||||
// TODO: Remove as more confidence is gained in dataplane data handling and return the error
|
||||
logger.Warn("Dataplane data detected but falling back to old processing due to error",
|
||||
"error", err)
|
||||
|
||||
return dt, false, err
|
||||
}
|
||||
return dt, true, nil
|
||||
}
|
||||
|
||||
func handleDataplaneFrames(k data.FrameTypeKind, frames data.Frames) (mathexp.Results, error) {
|
||||
switch k {
|
||||
case data.KindUnknown:
|
||||
return mathexp.Results{Values: mathexp.Values{mathexp.NoData{}.New()}}, nil
|
||||
case data.KindTimeSeries:
|
||||
return handleDataplaneTimeseries(frames)
|
||||
case data.KindNumeric:
|
||||
return handleDataplaneNumeric(frames)
|
||||
default:
|
||||
return mathexp.Results{}, fmt.Errorf("kind %s not supported by server side expressions", k)
|
||||
}
|
||||
}
|
||||
|
||||
func handleDataplaneTimeseries(frames data.Frames) (mathexp.Results, error) {
|
||||
dps, err := timeseries.CollectionReaderFromFrames(frames)
|
||||
if err != nil {
|
||||
return mathexp.Results{}, err
|
||||
}
|
||||
sc, err := dps.GetCollection(false)
|
||||
if err != nil {
|
||||
return mathexp.Results{}, err
|
||||
}
|
||||
if sc.NoData() {
|
||||
noData := mathexp.NoData{}.New()
|
||||
if len(dps.Frames()) == 1 {
|
||||
// If single frame form of nodata, copy the frame to pass through metadata
|
||||
noData.Frame = dps.Frames()[0]
|
||||
}
|
||||
return mathexp.Results{Values: mathexp.Values{noData}}, nil
|
||||
}
|
||||
res := mathexp.Results{}
|
||||
res.Values = make([]mathexp.Value, 0, len(sc.Refs))
|
||||
for _, s := range sc.Refs {
|
||||
newSeries, err := mathexp.NewSeriesFromRef(sc.RefID, s)
|
||||
if err != nil {
|
||||
return res, err
|
||||
}
|
||||
res.Values = append(res.Values, newSeries)
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func handleDataplaneNumeric(frames data.Frames) (mathexp.Results, error) {
|
||||
dn, err := numeric.CollectionReaderFromFrames(frames)
|
||||
if err != nil {
|
||||
return mathexp.Results{}, err
|
||||
}
|
||||
nc, err := dn.GetCollection(false)
|
||||
if err != nil {
|
||||
return mathexp.Results{}, err
|
||||
}
|
||||
if nc.NoData() {
|
||||
noData := mathexp.NoData{}.New()
|
||||
if len(dn.Frames()) == 1 {
|
||||
// If single frame form of nodata, copy the frame to pass through metadata
|
||||
noData.Frame = dn.Frames()[0]
|
||||
}
|
||||
return mathexp.Results{Values: mathexp.Values{noData}}, nil
|
||||
}
|
||||
res := mathexp.Results{}
|
||||
res.Values = make([]mathexp.Value, 0, len(nc.Refs))
|
||||
for _, n := range nc.Refs {
|
||||
newNum, err := mathexp.NumberFromRef(nc.RefID, n)
|
||||
if err != nil {
|
||||
return res, err
|
||||
}
|
||||
res.Values = append(res.Values, newNum)
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
242
pkg/expr/dataplane_test.go
Normal file
242
pkg/expr/dataplane_test.go
Normal file
@ -0,0 +1,242 @@
|
||||
package expr
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/dataplane/examples"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/services/datasources"
|
||||
datafakes "github.com/grafana/grafana/pkg/services/datasources/fakes"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/util"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestPassThroughDataplaneExamples(t *testing.T) {
|
||||
es, err := examples.GetExamples()
|
||||
require.NoError(t, err)
|
||||
|
||||
validExamples, err := es.Filter(examples.FilterOptions{
|
||||
Version: data.FrameTypeVersion{0, 1},
|
||||
Valid: util.Pointer(true),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, collection := range validExamples.Collections() {
|
||||
for _, example := range collection.ExampleSlice() {
|
||||
t.Run(example.Info().ID, func(t *testing.T) {
|
||||
_, err := framesPassThroughService(t, example.Frames("A"))
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func framesPassThroughService(t *testing.T, frames data.Frames) (data.Frames, error) {
|
||||
me := &mockEndpoint{
|
||||
Frames: frames,
|
||||
}
|
||||
|
||||
cfg := setting.NewCfg()
|
||||
|
||||
s := Service{
|
||||
cfg: cfg,
|
||||
dataService: me,
|
||||
dataSourceService: &datafakes.FakeDataSourceService{},
|
||||
features: &featuremgmt.FeatureManager{},
|
||||
}
|
||||
queries := []Query{{
|
||||
RefID: "A",
|
||||
DataSource: &datasources.DataSource{
|
||||
OrgID: 1,
|
||||
UID: "test",
|
||||
Type: "test",
|
||||
},
|
||||
JSON: json.RawMessage(`{ "datasource": { "uid": "1" }, "intervalMs": 1000, "maxDataPoints": 1000 }`),
|
||||
TimeRange: AbsoluteTimeRange{
|
||||
From: time.Time{},
|
||||
To: time.Time{},
|
||||
},
|
||||
}}
|
||||
|
||||
req := &Request{Queries: queries}
|
||||
|
||||
pl, err := s.BuildPipeline(req)
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err := s.ExecutePipeline(context.Background(), time.Now(), pl)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Contains(t, res.Responses, "A")
|
||||
|
||||
return res.Responses["A"].Frames, res.Responses["A"].Error
|
||||
}
|
||||
|
||||
func TestShouldUseDataplane(t *testing.T) {
|
||||
t.Run("zero frames returns no data and is allowed", func(t *testing.T) {
|
||||
f := data.Frames{}
|
||||
dt, use, err := shouldUseDataplane(f, log.New(""), false)
|
||||
require.NoError(t, err)
|
||||
require.True(t, use)
|
||||
require.Equal(t, data.KindUnknown, dt.Kind())
|
||||
})
|
||||
|
||||
t.Run("a frame with Type and TypeVersion 0.0 will not use dataplane", func(t *testing.T) {
|
||||
f := data.Frames{(&data.Frame{}).SetMeta(
|
||||
&data.FrameMeta{
|
||||
TypeVersion: data.FrameTypeVersion{},
|
||||
Type: data.FrameTypeTimeSeriesMulti,
|
||||
},
|
||||
)}
|
||||
_, use, err := shouldUseDataplane(f, log.New(""), false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, use)
|
||||
})
|
||||
|
||||
t.Run("a frame without Type and TypeVersion > 0.0 will not use dataplane", func(t *testing.T) {
|
||||
f := data.Frames{(&data.Frame{}).SetMeta(
|
||||
&data.FrameMeta{
|
||||
TypeVersion: data.FrameTypeVersion{0, 1},
|
||||
},
|
||||
)}
|
||||
_, use, err := shouldUseDataplane(f, log.New(""), false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, use)
|
||||
})
|
||||
|
||||
t.Run("a frame with no metadata will not use dataplane", func(t *testing.T) {
|
||||
f := data.Frames{&data.Frame{}}
|
||||
_, use, err := shouldUseDataplane(f, log.New(""), false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, use)
|
||||
})
|
||||
|
||||
t.Run("a newer version that supported will return a warning but still use dataplane", func(t *testing.T) {
|
||||
ty := data.FrameTypeTimeSeriesMulti
|
||||
v := data.FrameTypeVersion{999, 999}
|
||||
f := data.Frames{(&data.Frame{}).SetMeta(
|
||||
&data.FrameMeta{
|
||||
Type: ty,
|
||||
TypeVersion: v,
|
||||
},
|
||||
)}
|
||||
dt, use, err := shouldUseDataplane(f, log.New(""), false)
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
require.True(t, use)
|
||||
require.Equal(t, data.KindTimeSeries, dt.Kind())
|
||||
})
|
||||
|
||||
t.Run("all valid dataplane examples should use dataplane", func(t *testing.T) {
|
||||
es, err := examples.GetExamples()
|
||||
require.NoError(t, err)
|
||||
|
||||
validExamples, err := es.Filter(examples.FilterOptions{
|
||||
Version: data.FrameTypeVersion{0, 1},
|
||||
Valid: util.Pointer(true),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, collection := range validExamples.Collections() {
|
||||
for _, example := range collection.ExampleSlice() {
|
||||
t.Run(example.Info().ID, func(t *testing.T) {
|
||||
_, err := framesPassThroughService(t, example.Frames("A"))
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestHandleDataplaneNumeric(t *testing.T) {
|
||||
t.Run("no data", func(t *testing.T) {
|
||||
es, err := examples.GetExamples()
|
||||
require.NoError(t, err)
|
||||
|
||||
validNoDataNumericExamples, err := es.Filter(examples.FilterOptions{
|
||||
Version: data.FrameTypeVersion{0, 1},
|
||||
Valid: util.Pointer(true),
|
||||
Kind: data.KindNumeric,
|
||||
NoData: util.Pointer(true),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, example := range validNoDataNumericExamples.AsSlice() {
|
||||
t.Run(example.Info().ID, func(t *testing.T) {
|
||||
res, err := handleDataplaneNumeric(example.Frames("A"))
|
||||
require.NoError(t, err)
|
||||
require.Len(t, res.Values, 1)
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("should read correct number of items from examples", func(t *testing.T) {
|
||||
es, err := examples.GetExamples()
|
||||
require.NoError(t, err)
|
||||
|
||||
numericExamples, err := es.Filter(examples.FilterOptions{
|
||||
Version: data.FrameTypeVersion{0, 1},
|
||||
Valid: util.Pointer(true),
|
||||
Kind: data.KindNumeric,
|
||||
NoData: util.Pointer(false),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, example := range numericExamples.AsSlice() {
|
||||
t.Run(example.Info().ID, func(t *testing.T) {
|
||||
res, err := handleDataplaneNumeric(example.Frames("A"))
|
||||
require.NoError(t, err)
|
||||
require.Len(t, res.Values, example.Info().ItemCount)
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestHandleDataplaneTS(t *testing.T) {
|
||||
t.Run("no data", func(t *testing.T) {
|
||||
es, err := examples.GetExamples()
|
||||
require.NoError(t, err)
|
||||
|
||||
validNoDataTSExamples, err := es.Filter(examples.FilterOptions{
|
||||
Version: data.FrameTypeVersion{0, 1},
|
||||
Valid: util.Pointer(true),
|
||||
Kind: data.KindTimeSeries,
|
||||
NoData: util.Pointer(true),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, example := range validNoDataTSExamples.AsSlice() {
|
||||
t.Run(example.Info().ID, func(t *testing.T) {
|
||||
res, err := handleDataplaneTimeseries(example.Frames("A"))
|
||||
require.NoError(t, err)
|
||||
require.Len(t, res.Values, 1)
|
||||
})
|
||||
}
|
||||
})
|
||||
t.Run("should read correct number of items from examples", func(t *testing.T) {
|
||||
es, err := examples.GetExamples()
|
||||
require.NoError(t, err)
|
||||
|
||||
tsExamples, err := es.Filter(examples.FilterOptions{
|
||||
Version: data.FrameTypeVersion{0, 1},
|
||||
Valid: util.Pointer(true),
|
||||
Kind: data.KindTimeSeries,
|
||||
NoData: util.Pointer(false),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, example := range tsExamples.AsSlice() {
|
||||
t.Run(example.Info().ID, func(t *testing.T) {
|
||||
res, err := handleDataplaneTimeseries(example.Frames("A"))
|
||||
require.NoError(t, err)
|
||||
require.Len(t, res.Values, example.Info().ItemCount)
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
@ -5,6 +5,7 @@ import (
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/dataplane/sdata/timeseries"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
|
||||
"github.com/grafana/grafana/pkg/expr/mathexp/parse"
|
||||
@ -132,12 +133,50 @@ func NewSeries(refID string, labels data.Labels, size int) Series {
|
||||
fields := make([]*data.Field, 2)
|
||||
fields[seriesTypeTimeIdx] = data.NewField("Time", nil, make([]time.Time, size))
|
||||
fields[seriesTypeValIdx] = data.NewField(refID, labels, make([]*float64, size))
|
||||
frame := data.NewFrame("", fields...)
|
||||
frame.RefID = refID
|
||||
frame.Meta = &data.FrameMeta{
|
||||
Type: data.FrameTypeTimeSeriesMulti,
|
||||
TypeVersion: data.FrameTypeVersion{0, 1},
|
||||
}
|
||||
|
||||
return Series{
|
||||
Frame: data.NewFrame("", fields...),
|
||||
Frame: frame,
|
||||
}
|
||||
}
|
||||
|
||||
// NewSeries returns a dataframe of type Series.
|
||||
func NewSeriesFromRef(refID string, s timeseries.MetricRef) (Series, error) {
|
||||
frame := data.NewFrame("")
|
||||
frame.RefID = refID
|
||||
frame.Meta = &data.FrameMeta{
|
||||
Type: data.FrameTypeTimeSeriesMulti,
|
||||
TypeVersion: data.FrameTypeVersion{0, 1},
|
||||
}
|
||||
|
||||
valField := s.ValueField
|
||||
if valField.Type() != data.FieldTypeNullableFloat64 {
|
||||
convertedField := data.NewFieldFromFieldType(data.FieldTypeNullableFloat64, valField.Len())
|
||||
convertedField.Name = valField.Name
|
||||
convertedField.Labels = valField.Labels
|
||||
convertedField.Config = valField.Config
|
||||
for j := 0; j < valField.Len(); j++ {
|
||||
ff, err := valField.NullableFloatAt(j)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
convertedField.Set(j, ff)
|
||||
}
|
||||
valField = convertedField
|
||||
}
|
||||
frame.Fields = []*data.Field{s.TimeField, valField}
|
||||
|
||||
return Series{
|
||||
Frame: frame, // No Data Frame
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Type returns the Value type and allows it to fulfill the Value interface.
|
||||
func (s Series) Type() parse.ReturnType { return parse.TypeSeriesSet }
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
package mathexp
|
||||
|
||||
import (
|
||||
"github.com/grafana/dataplane/sdata/numeric"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
|
||||
"github.com/grafana/grafana/pkg/expr/mathexp/parse"
|
||||
@ -128,10 +129,31 @@ func NewNumber(name string, labels data.Labels) Number {
|
||||
return Number{
|
||||
data.NewFrame("",
|
||||
data.NewField(name, labels, make([]*float64, 1)),
|
||||
),
|
||||
).SetMeta(&data.FrameMeta{
|
||||
Type: data.FrameTypeNumericMulti,
|
||||
TypeVersion: data.FrameTypeVersion{0, 1},
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
// NewNumber returns a data that holds a float64Vector
|
||||
func NumberFromRef(refID string, nr numeric.MetricRef) (Number, error) {
|
||||
f, _, err := nr.NullableFloat64Value()
|
||||
if err != nil {
|
||||
return Number{}, err
|
||||
}
|
||||
|
||||
frame := data.NewFrame("",
|
||||
data.NewField(nr.GetMetricName(), nr.GetLabels(), []*float64{f})).SetMeta(&data.FrameMeta{
|
||||
Type: data.FrameTypeNumericMulti,
|
||||
TypeVersion: data.FrameTypeVersion{0, 1},
|
||||
})
|
||||
|
||||
frame.Fields[0].Config = nr.ValueField.Config
|
||||
|
||||
return Number{frame}, nil
|
||||
}
|
||||
|
||||
func (n Number) GetMeta() interface{} {
|
||||
return n.Frame.Meta.Custom
|
||||
}
|
||||
|
@ -14,6 +14,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/expr/mathexp"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/services/datasources"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/pluginsintegration/adapters"
|
||||
)
|
||||
|
||||
@ -258,6 +259,11 @@ func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s
|
||||
return mathexp.Results{}, QueryError{RefID: dn.refID, Err: response.Error}
|
||||
}
|
||||
|
||||
if dt, use, _ := shouldUseDataplane(response.Frames, logger, s.features.IsEnabled(featuremgmt.FlagDisableSSEDataplane)); use {
|
||||
logger.Debug("Handling SSE data source query through dataplane", "datatype", dt)
|
||||
return handleDataplaneFrames(dt.Kind(), response.Frames)
|
||||
}
|
||||
|
||||
dataSource := dn.datasource.Type
|
||||
if isAllFrameVectors(dataSource, response.Frames) { // Prometheus Specific Handling
|
||||
vals, err = framesToNumbers(response.Frames)
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/services/datasources"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
)
|
||||
|
||||
@ -40,13 +41,15 @@ type Service struct {
|
||||
cfg *setting.Cfg
|
||||
dataService backend.QueryDataHandler
|
||||
dataSourceService datasources.DataSourceService
|
||||
features featuremgmt.FeatureToggles
|
||||
}
|
||||
|
||||
func ProvideService(cfg *setting.Cfg, pluginClient plugins.Client, dataSourceService datasources.DataSourceService) *Service {
|
||||
func ProvideService(cfg *setting.Cfg, pluginClient plugins.Client, dataSourceService datasources.DataSourceService, features featuremgmt.FeatureToggles) *Service {
|
||||
return &Service{
|
||||
cfg: cfg,
|
||||
dataService: pluginClient,
|
||||
dataSourceService: dataSourceService,
|
||||
features: features,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -14,6 +14,7 @@ import (
|
||||
|
||||
"github.com/grafana/grafana/pkg/services/datasources"
|
||||
datafakes "github.com/grafana/grafana/pkg/services/datasources/fakes"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
)
|
||||
|
||||
@ -32,6 +33,7 @@ func TestService(t *testing.T) {
|
||||
cfg: cfg,
|
||||
dataService: me,
|
||||
dataSourceService: &datafakes.FakeDataSourceService{},
|
||||
features: &featuremgmt.FeatureManager{},
|
||||
}
|
||||
|
||||
queries := []Query{
|
||||
@ -67,6 +69,10 @@ func TestService(t *testing.T) {
|
||||
data.NewField("Time", nil, []time.Time{time.Unix(1, 0)}),
|
||||
data.NewField("B", nil, []*float64{fp(4)}))
|
||||
bDF.RefID = "B"
|
||||
bDF.SetMeta(&data.FrameMeta{
|
||||
Type: data.FrameTypeTimeSeriesMulti,
|
||||
TypeVersion: data.FrameTypeVersion{0, 1},
|
||||
})
|
||||
|
||||
expect := &backend.QueryDataResponse{
|
||||
Responses: backend.Responses{
|
||||
|
@ -475,6 +475,12 @@ var (
|
||||
State: FeatureStateAlpha,
|
||||
Owner: grafanaAlertingSquad,
|
||||
},
|
||||
{
|
||||
Name: "disableSSEDataplane",
|
||||
Description: "Disables dataplane specific processing in server side expressions.",
|
||||
State: FeatureStateAlpha,
|
||||
Owner: grafanaObservabilityMetricsSquad,
|
||||
},
|
||||
{
|
||||
Name: "unifiedRequestLog",
|
||||
Description: "Writes error logs to the request logger",
|
||||
|
@ -70,6 +70,7 @@ prometheusDataplane,alpha,@grafana/observability-metrics,false,false,false,false
|
||||
alertStateHistoryLokiSecondary,alpha,@grafana/alerting-squad,false,false,false,false
|
||||
alertStateHistoryLokiPrimary,alpha,@grafana/alerting-squad,false,false,false,false
|
||||
alertStateHistoryLokiOnly,alpha,@grafana/alerting-squad,false,false,false,false
|
||||
disableSSEDataplane,alpha,@grafana/observability-metrics,false,false,false,false
|
||||
unifiedRequestLog,alpha,@grafana/backend-platform,false,false,false,false
|
||||
renderAuthJWT,beta,@grafana/grafana-as-code,false,false,false,false
|
||||
pyroscopeFlameGraph,alpha,@grafana/observability-traces-and-profiling,false,false,false,false
|
||||
|
|
@ -291,6 +291,10 @@ const (
|
||||
// Disable Grafana alerts from emitting annotations when a remote Loki instance is available.
|
||||
FlagAlertStateHistoryLokiOnly = "alertStateHistoryLokiOnly"
|
||||
|
||||
// FlagDisableSSEDataplane
|
||||
// Disables dataplane specific processing in server side expressions.
|
||||
FlagDisableSSEDataplane = "disableSSEDataplane"
|
||||
|
||||
// FlagUnifiedRequestLog
|
||||
// Writes error logs to the request logger
|
||||
FlagUnifiedRequestLog = "unifiedRequestLog"
|
||||
|
@ -15,6 +15,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/services/datasources"
|
||||
fakes "github.com/grafana/grafana/pkg/services/datasources/fakes"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
"github.com/grafana/grafana/pkg/services/user"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
@ -532,7 +533,7 @@ func TestValidate(t *testing.T) {
|
||||
pluginsStore: store,
|
||||
})
|
||||
|
||||
evaluator := NewEvaluatorFactory(setting.UnifiedAlertingSettings{}, cacheService, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil), store)
|
||||
evaluator := NewEvaluatorFactory(setting.UnifiedAlertingSettings{}, cacheService, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil, &featuremgmt.FeatureManager{}), store)
|
||||
evalCtx := NewContext(context.Background(), u)
|
||||
|
||||
err := evaluator.Validate(evalCtx, condition)
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/expr"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
||||
@ -780,7 +781,7 @@ func setupScheduler(t *testing.T, rs *fakeRulesStore, is *state.FakeInstanceStor
|
||||
|
||||
var evaluator = evalMock
|
||||
if evalMock == nil {
|
||||
evaluator = eval.NewEvaluatorFactory(setting.UnifiedAlertingSettings{}, nil, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil), &plugins.FakePluginStore{})
|
||||
evaluator = eval.NewEvaluatorFactory(setting.UnifiedAlertingSettings{}, nil, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil, &featuremgmt.FeatureManager{}), &plugins.FakePluginStore{})
|
||||
}
|
||||
|
||||
if registry == nil {
|
||||
|
@ -446,7 +446,7 @@ func setup(t *testing.T) *testContext {
|
||||
DataSources: nil,
|
||||
SimulatePluginFailure: false,
|
||||
}
|
||||
exprService := expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, pc, fakeDatasourceService)
|
||||
exprService := expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, pc, fakeDatasourceService, &featuremgmt.FeatureManager{})
|
||||
queryService := ProvideService(setting.NewCfg(), dc, exprService, rv, ds, pc) // provider belonging to this package
|
||||
return &testContext{
|
||||
pluginContext: pc,
|
||||
|
Loading…
Reference in New Issue
Block a user