From 67028af99ebdc79e1fe4bdd8e5235e8879918dd4 Mon Sep 17 00:00:00 2001 From: Ryan McKinley Date: Thu, 27 May 2021 02:55:42 -0700 Subject: [PATCH] Live: check schema equality between push messages (#34548) --- go.mod | 2 +- go.sum | 4 +- pkg/services/live/live.go | 27 +++---- pkg/services/live/managedstream/runner.go | 70 +++++++------------ .../live/managedstream/runner_test.go | 18 +---- pkg/services/live/pushhttp/push.go | 4 +- pkg/services/live/pushurl/values.go | 8 +-- pkg/services/live/pushurl/values_test.go | 11 --- pkg/services/live/pushws/push.go | 4 +- pkg/tsdb/testdatasource/csv_data_test.go | 3 +- .../grafana/components/QueryEditor.tsx | 3 + 11 files changed, 54 insertions(+), 100 deletions(-) diff --git a/go.mod b/go.mod index 87957257248..b6d4e0e78ac 100644 --- a/go.mod +++ b/go.mod @@ -51,7 +51,7 @@ require ( github.com/gosimple/slug v1.9.0 github.com/grafana/grafana-aws-sdk v0.4.0 github.com/grafana/grafana-live-sdk v0.0.6 - github.com/grafana/grafana-plugin-sdk-go v0.101.0 + github.com/grafana/grafana-plugin-sdk-go v0.102.0 github.com/grafana/loki v1.6.2-0.20210520072447-15d417efe103 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/hashicorp/go-hclog v0.16.0 diff --git a/go.sum b/go.sum index cf7f6179b19..a9536529212 100644 --- a/go.sum +++ b/go.sum @@ -920,8 +920,8 @@ github.com/grafana/grafana-live-sdk v0.0.6 h1:P1QFn0ZradOJp3zVpfG0STZMP+pgZrW0e0 github.com/grafana/grafana-live-sdk v0.0.6/go.mod h1:f15hHmWyLdFjmuWLsjeKeZnq/HnNQ3QkoPcaEww45AY= github.com/grafana/grafana-plugin-sdk-go v0.79.0/go.mod h1:NvxLzGkVhnoBKwzkst6CFfpMFKwAdIUZ1q8ssuLeF60= github.com/grafana/grafana-plugin-sdk-go v0.91.0/go.mod h1:Ot3k7nY7P6DXmUsDgKvNB7oG1v7PRyTdmnYVoS554bU= -github.com/grafana/grafana-plugin-sdk-go v0.101.0 h1:QyXMkgwZXUX9EQjLv5S5uDcvYjwsntqFV/dCC49Fn+w= -github.com/grafana/grafana-plugin-sdk-go v0.101.0/go.mod h1:D7x3ah+1d4phNXpbnOaxa/osSaZlwh9/ZUnGGzegRbk= +github.com/grafana/grafana-plugin-sdk-go v0.102.0 h1:Pknh7mlOaJvdhPgKHxcimDOSd9h29eSpA34W0/sOF6c= +github.com/grafana/grafana-plugin-sdk-go v0.102.0/go.mod h1:D7x3ah+1d4phNXpbnOaxa/osSaZlwh9/ZUnGGzegRbk= github.com/grafana/loki v1.6.2-0.20210520072447-15d417efe103 h1:qCmofFVwQR9QnsinstVqI1NPLMVl33jNCnOCXEAVn6E= github.com/grafana/loki v1.6.2-0.20210520072447-15d417efe103/go.mod h1:GHIsn+EohCChsdu5YouNZewqLeV9L2FNw4DEJU3P9qE= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= diff --git a/pkg/services/live/live.go b/pkg/services/live/live.go index ecf03159532..d5c4dae71c2 100644 --- a/pkg/services/live/live.go +++ b/pkg/services/live/live.go @@ -2,6 +2,7 @@ package live import ( "context" + "encoding/json" "errors" "fmt" "net/http" @@ -621,22 +622,24 @@ func (g *GrafanaLive) HandleListHTTP(c *models.ReqContext) response.Response { } // Hardcode sample streams - frame := data.NewFrame("testdata", + frameJSON, err := data.FrameToJSON(data.NewFrame("testdata", data.NewField("Time", nil, make([]time.Time, 0)), data.NewField("Value", nil, make([]float64, 0)), data.NewField("Min", nil, make([]float64, 0)), data.NewField("Max", nil, make([]float64, 0)), - ) - channels = append(channels, util.DynMap{ - "channel": "plugin/testdata/random-2s-stream", - "data": frame, - }, util.DynMap{ - "channel": "plugin/testdata/random-flakey-stream", - "data": frame, - }, util.DynMap{ - "channel": "plugin/testdata/random-20Hz-stream", - "data": frame, - }) + ), data.IncludeSchemaOnly) + if err == nil { + channels = append(channels, util.DynMap{ + "channel": "plugin/testdata/random-2s-stream", + "data": json.RawMessage(frameJSON), + }, util.DynMap{ + "channel": "plugin/testdata/random-flakey-stream", + "data": json.RawMessage(frameJSON), + }, util.DynMap{ + "channel": "plugin/testdata/random-20Hz-stream", + "data": json.RawMessage(frameJSON), + }) + } info["channels"] = channels return response.JSONStreaming(200, info) diff --git a/pkg/services/live/managedstream/runner.go b/pkg/services/live/managedstream/runner.go index db055df9c7f..f98d08accd9 100644 --- a/pkg/services/live/managedstream/runner.go +++ b/pkg/services/live/managedstream/runner.go @@ -69,7 +69,7 @@ type ManagedStream struct { mu sync.RWMutex id string start time.Time - last map[int64]map[string]json.RawMessage + last map[int64]map[string]data.FrameJSONCache publisher models.ChannelPublisher } @@ -78,7 +78,7 @@ func NewManagedStream(id string, publisher models.ChannelPublisher) *ManagedStre return &ManagedStream{ id: id, start: time.Now(), - last: map[int64]map[string]json.RawMessage{}, + last: map[int64]map[string]data.FrameJSONCache{}, publisher: publisher, } } @@ -96,7 +96,7 @@ func (s *ManagedStream) ListChannels(orgID int64, prefix string) []util.DynMap { for k, v := range s.last[orgID] { ch := util.DynMap{} ch["channel"] = prefix + k - ch["data"] = v + ch["data"] = json.RawMessage(v.Bytes(data.IncludeSchemaOnly)) info = append(info, ch) } return info @@ -104,55 +104,36 @@ func (s *ManagedStream) ListChannels(orgID int64, prefix string) []util.DynMap { // Push sends frame to the stream and saves it for later retrieval by subscribers. // unstableSchema flag can be set to disable schema caching for a path. -func (s *ManagedStream) Push(orgID int64, path string, frame *data.Frame, unstableSchema bool) error { +func (s *ManagedStream) Push(orgID int64, path string, frame *data.Frame) error { // Keep schema + data for last packet. - frameJSONWrapper, err := data.FrameToJSON(frame) + msg, err := data.FrameToJSONCache(frame) if err != nil { - logger.Error("Error marshaling frame with Schema", "error", err) + logger.Error("Error marshaling frame with data", "error", err) return err } - frameJSON := frameJSONWrapper.Bytes(data.IncludeAll) - if !unstableSchema { - // If schema is stable we can safely cache it, and only send values if - // stream already has schema cached. - s.mu.Lock() - if _, ok := s.last[orgID]; !ok { - s.last[orgID] = map[string]json.RawMessage{} - } - _, exists := s.last[orgID][path] - s.last[orgID][path] = frameJSON - s.mu.Unlock() - - // When the packet already exits, only send the data. - // TODO: maybe a good idea would be MarshalJSON function of - // frame to keep Schema JSON and Values JSON in frame object - // to avoid encoding twice. - if exists { - frameJSONWrapper, err = data.FrameToJSON(frame) - if err != nil { - logger.Error("Error marshaling Frame to JSON", "error", err) - return err - } - frameJSON = frameJSONWrapper.Bytes(data.IncludeDataOnly) - } - } else { - // For unstable schema we always need to send everything to a connection. - // And we don't want to cache schema for unstable case. But we still need to - // set path to a map to make stream visible in UI stream select widget. - s.mu.Lock() - if _, ok := s.last[orgID]; ok { - s.last[orgID][path] = nil - } - s.mu.Unlock() + s.mu.Lock() + if _, ok := s.last[orgID]; !ok { + s.last[orgID] = map[string]data.FrameJSONCache{} } + last, exists := s.last[orgID][path] + s.last[orgID][path] = msg + s.mu.Unlock() + + include := data.IncludeAll + if exists && last.SameSchema(&msg) { + // When the schema has not changed, just send the data. + include = data.IncludeDataOnly + } + frameJSON := msg.Bytes(include) + // The channel this will be posted into. channel := live.Channel{Scope: live.ScopeStream, Namespace: s.id, Path: path}.String() logger.Debug("Publish data to channel", "channel", channel, "dataLength", len(frameJSON)) return s.publisher(orgID, channel, frameJSON) } -// getLastPacket retrieves schema for a channel. +// getLastPacket retrieves last packet channel. func (s *ManagedStream) getLastPacket(orgId int64, path string) (json.RawMessage, bool) { s.mu.RLock() defer s.mu.RUnlock() @@ -160,8 +141,11 @@ func (s *ManagedStream) getLastPacket(orgId int64, path string) (json.RawMessage if !ok { return nil, false } - schema, ok := s.last[orgId][path] - return schema, ok && schema != nil + msg, ok := s.last[orgId][path] + if ok { + return msg.Bytes(data.IncludeAll), ok + } + return nil, ok } func (s *ManagedStream) GetHandlerForPath(_ string) (models.ChannelHandler, error) { @@ -184,7 +168,7 @@ func (s *ManagedStream) OnPublish(_ context.Context, u *models.SignedInUser, evt // Stream scope only deals with data frames. return models.PublishReply{}, 0, err } - err = s.Push(u.OrgId, evt.Path, &frame, true) + err = s.Push(u.OrgId, evt.Path, &frame) if err != nil { // Stream scope only deals with data frames. return models.PublishReply{}, 0, err diff --git a/pkg/services/live/managedstream/runner_test.go b/pkg/services/live/managedstream/runner_test.go index 45a37afd89b..6982ae29f76 100644 --- a/pkg/services/live/managedstream/runner_test.go +++ b/pkg/services/live/managedstream/runner_test.go @@ -23,31 +23,17 @@ func TestNewManagedStream(t *testing.T) { require.NotNil(t, c) } -func TestManagedStream_GetLastPacket_UnstableSchema(t *testing.T) { - var orgID int64 = 1 - publisher := &testPublisher{orgID: orgID, t: t} - c := NewManagedStream("a", publisher.publish) - _, ok := c.getLastPacket(orgID, "test") - require.False(t, ok) - err := c.Push(orgID, "test", data.NewFrame("hello"), true) - require.NoError(t, err) - - _, ok = c.getLastPacket(orgID, "test") - require.NoError(t, err) - require.False(t, ok) -} - func TestManagedStream_GetLastPacket(t *testing.T) { var orgID int64 = 1 publisher := &testPublisher{orgID: orgID, t: t} c := NewManagedStream("a", publisher.publish) _, ok := c.getLastPacket(orgID, "test") require.False(t, ok) - err := c.Push(orgID, "test", data.NewFrame("hello"), false) + err := c.Push(orgID, "test", data.NewFrame("hello")) require.NoError(t, err) s, ok := c.getLastPacket(orgID, "test") require.NoError(t, err) require.True(t, ok) - require.Equal(t, `{"schema":{"name":"hello","fields":[]},"data":{"values":[]}}`, string(s)) + require.JSONEq(t, `{"schema":{"name":"hello","fields":[]},"data":{"values":[]}}`, string(s)) } diff --git a/pkg/services/live/pushhttp/push.go b/pkg/services/live/pushhttp/push.go index 89e40781d5f..70ace3ef4c9 100644 --- a/pkg/services/live/pushhttp/push.go +++ b/pkg/services/live/pushhttp/push.go @@ -57,7 +57,6 @@ func (g *Gateway) Handle(ctx *models.ReqContext) { // TODO Grafana 8: decide which formats to use or keep all. urlValues := ctx.Req.URL.Query() frameFormat := pushurl.FrameFormatFromValues(urlValues) - unstableSchema := pushurl.UnstableSchemaFromValues(urlValues) body, err := ctx.Req.Body().Bytes() if err != nil { @@ -69,7 +68,6 @@ func (g *Gateway) Handle(ctx *models.ReqContext) { "protocol", "http", "streamId", streamID, "bodyLength", len(body), - "unstableSchema", unstableSchema, "frameFormat", frameFormat, ) @@ -88,7 +86,7 @@ func (g *Gateway) Handle(ctx *models.ReqContext) { // interval = "1s" vs flush_interval = "5s" for _, mf := range metricFrames { - err := stream.Push(ctx.SignedInUser.OrgId, mf.Key(), mf.Frame(), unstableSchema) + err := stream.Push(ctx.SignedInUser.OrgId, mf.Key(), mf.Frame()) if err != nil { ctx.Resp.WriteHeader(http.StatusInternalServerError) return diff --git a/pkg/services/live/pushurl/values.go b/pkg/services/live/pushurl/values.go index ccbd8b1d81a..aeb58844d2a 100644 --- a/pkg/services/live/pushurl/values.go +++ b/pkg/services/live/pushurl/values.go @@ -6,15 +6,9 @@ import ( ) const ( - unstableSchemaParam = "gf_live_unstable_schema" - frameFormatParam = "gf_live_frame_format" + frameFormatParam = "gf_live_frame_format" ) -// UnstableSchemaFromValues extracts unstable schema tip from url values. -func UnstableSchemaFromValues(values url.Values) bool { - return strings.ToLower(values.Get(unstableSchemaParam)) == "true" || values.Get(unstableSchemaParam) == "1" -} - // FrameFormatFromValues extracts frame format tip from url values. func FrameFormatFromValues(values url.Values) string { frameFormat := strings.ToLower(values.Get(frameFormatParam)) diff --git a/pkg/services/live/pushurl/values_test.go b/pkg/services/live/pushurl/values_test.go index 0791813a3e5..676d933b805 100644 --- a/pkg/services/live/pushurl/values_test.go +++ b/pkg/services/live/pushurl/values_test.go @@ -7,17 +7,6 @@ import ( "github.com/stretchr/testify/require" ) -func TestUnstableSchemaFromValues(t *testing.T) { - values := url.Values{} - require.False(t, UnstableSchemaFromValues(values)) - values.Set(unstableSchemaParam, "yes") - require.False(t, UnstableSchemaFromValues(values)) - values.Set(unstableSchemaParam, "true") - require.True(t, UnstableSchemaFromValues(values)) - values.Set(unstableSchemaParam, "True") - require.True(t, UnstableSchemaFromValues(values)) -} - func TestFrameFormatFromValues(t *testing.T) { values := url.Values{} require.Equal(t, "labels_column", FrameFormatFromValues(values)) diff --git a/pkg/services/live/pushws/push.go b/pkg/services/live/pushws/push.go index 2b206e07898..bf50e9e8b89 100644 --- a/pkg/services/live/pushws/push.go +++ b/pkg/services/live/pushws/push.go @@ -174,13 +174,11 @@ func (s *Handler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { // TODO Grafana 8: decide which formats to use or keep all. urlValues := r.URL.Query() frameFormat := pushurl.FrameFormatFromValues(urlValues) - unstableSchema := pushurl.UnstableSchemaFromValues(urlValues) logger.Debug("Live Push request", "protocol", "http", "streamId", streamID, "bodyLength", len(body), - "unstableSchema", unstableSchema, "frameFormat", frameFormat, ) @@ -191,7 +189,7 @@ func (s *Handler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { } for _, mf := range metricFrames { - err := stream.Push(user.OrgId, mf.Key(), mf.Frame(), unstableSchema) + err := stream.Push(user.OrgId, mf.Key(), mf.Frame()) if err != nil { return } diff --git a/pkg/tsdb/testdatasource/csv_data_test.go b/pkg/tsdb/testdatasource/csv_data_test.go index bae01cf51f4..6e0a389e155 100644 --- a/pkg/tsdb/testdatasource/csv_data_test.go +++ b/pkg/tsdb/testdatasource/csv_data_test.go @@ -70,9 +70,8 @@ func TestReadCSV(t *testing.T) { require.NoError(t, err) frame := data.NewFrame("", fBool, fBool2, fNum, fStr) - frameToJSON, err := data.FrameToJSON(frame) + out, err := data.FrameToJSON(frame, data.IncludeAll) require.NoError(t, err) - out := frameToJSON.Bytes(data.IncludeAll) require.JSONEq(t, `{"schema":{ "fields":[ diff --git a/public/app/plugins/datasource/grafana/components/QueryEditor.tsx b/public/app/plugins/datasource/grafana/components/QueryEditor.tsx index 9ee6a1c634a..bae1730c094 100644 --- a/public/app/plugins/datasource/grafana/components/QueryEditor.tsx +++ b/public/app/plugins/datasource/grafana/components/QueryEditor.tsx @@ -72,6 +72,9 @@ export class QueryEditor extends PureComponent { const { onChange, query, onRunQuery } = this.props; onChange({ ...query, queryType: sel.value! }); onRunQuery(); + + // Reload the channel list + this.loadChannelInfo(); }; onChannelChange = (sel: SelectableValue) => {