Live: check schema equality between push messages (#34548)

This commit is contained in:
Ryan McKinley 2021-05-27 02:55:42 -07:00 committed by GitHub
parent c59a2e1bcf
commit 67028af99e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 54 additions and 100 deletions

2
go.mod
View File

@ -51,7 +51,7 @@ require (
github.com/gosimple/slug v1.9.0
github.com/grafana/grafana-aws-sdk v0.4.0
github.com/grafana/grafana-live-sdk v0.0.6
github.com/grafana/grafana-plugin-sdk-go v0.101.0
github.com/grafana/grafana-plugin-sdk-go v0.102.0
github.com/grafana/loki v1.6.2-0.20210520072447-15d417efe103
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/hashicorp/go-hclog v0.16.0

4
go.sum
View File

@ -920,8 +920,8 @@ github.com/grafana/grafana-live-sdk v0.0.6 h1:P1QFn0ZradOJp3zVpfG0STZMP+pgZrW0e0
github.com/grafana/grafana-live-sdk v0.0.6/go.mod h1:f15hHmWyLdFjmuWLsjeKeZnq/HnNQ3QkoPcaEww45AY=
github.com/grafana/grafana-plugin-sdk-go v0.79.0/go.mod h1:NvxLzGkVhnoBKwzkst6CFfpMFKwAdIUZ1q8ssuLeF60=
github.com/grafana/grafana-plugin-sdk-go v0.91.0/go.mod h1:Ot3k7nY7P6DXmUsDgKvNB7oG1v7PRyTdmnYVoS554bU=
github.com/grafana/grafana-plugin-sdk-go v0.101.0 h1:QyXMkgwZXUX9EQjLv5S5uDcvYjwsntqFV/dCC49Fn+w=
github.com/grafana/grafana-plugin-sdk-go v0.101.0/go.mod h1:D7x3ah+1d4phNXpbnOaxa/osSaZlwh9/ZUnGGzegRbk=
github.com/grafana/grafana-plugin-sdk-go v0.102.0 h1:Pknh7mlOaJvdhPgKHxcimDOSd9h29eSpA34W0/sOF6c=
github.com/grafana/grafana-plugin-sdk-go v0.102.0/go.mod h1:D7x3ah+1d4phNXpbnOaxa/osSaZlwh9/ZUnGGzegRbk=
github.com/grafana/loki v1.6.2-0.20210520072447-15d417efe103 h1:qCmofFVwQR9QnsinstVqI1NPLMVl33jNCnOCXEAVn6E=
github.com/grafana/loki v1.6.2-0.20210520072447-15d417efe103/go.mod h1:GHIsn+EohCChsdu5YouNZewqLeV9L2FNw4DEJU3P9qE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=

View File

@ -2,6 +2,7 @@ package live
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
@ -621,22 +622,24 @@ func (g *GrafanaLive) HandleListHTTP(c *models.ReqContext) response.Response {
}
// Hardcode sample streams
frame := data.NewFrame("testdata",
frameJSON, err := data.FrameToJSON(data.NewFrame("testdata",
data.NewField("Time", nil, make([]time.Time, 0)),
data.NewField("Value", nil, make([]float64, 0)),
data.NewField("Min", nil, make([]float64, 0)),
data.NewField("Max", nil, make([]float64, 0)),
)
channels = append(channels, util.DynMap{
"channel": "plugin/testdata/random-2s-stream",
"data": frame,
}, util.DynMap{
"channel": "plugin/testdata/random-flakey-stream",
"data": frame,
}, util.DynMap{
"channel": "plugin/testdata/random-20Hz-stream",
"data": frame,
})
), data.IncludeSchemaOnly)
if err == nil {
channels = append(channels, util.DynMap{
"channel": "plugin/testdata/random-2s-stream",
"data": json.RawMessage(frameJSON),
}, util.DynMap{
"channel": "plugin/testdata/random-flakey-stream",
"data": json.RawMessage(frameJSON),
}, util.DynMap{
"channel": "plugin/testdata/random-20Hz-stream",
"data": json.RawMessage(frameJSON),
})
}
info["channels"] = channels
return response.JSONStreaming(200, info)

View File

@ -69,7 +69,7 @@ type ManagedStream struct {
mu sync.RWMutex
id string
start time.Time
last map[int64]map[string]json.RawMessage
last map[int64]map[string]data.FrameJSONCache
publisher models.ChannelPublisher
}
@ -78,7 +78,7 @@ func NewManagedStream(id string, publisher models.ChannelPublisher) *ManagedStre
return &ManagedStream{
id: id,
start: time.Now(),
last: map[int64]map[string]json.RawMessage{},
last: map[int64]map[string]data.FrameJSONCache{},
publisher: publisher,
}
}
@ -96,7 +96,7 @@ func (s *ManagedStream) ListChannels(orgID int64, prefix string) []util.DynMap {
for k, v := range s.last[orgID] {
ch := util.DynMap{}
ch["channel"] = prefix + k
ch["data"] = v
ch["data"] = json.RawMessage(v.Bytes(data.IncludeSchemaOnly))
info = append(info, ch)
}
return info
@ -104,55 +104,36 @@ func (s *ManagedStream) ListChannels(orgID int64, prefix string) []util.DynMap {
// Push sends frame to the stream and saves it for later retrieval by subscribers.
// unstableSchema flag can be set to disable schema caching for a path.
func (s *ManagedStream) Push(orgID int64, path string, frame *data.Frame, unstableSchema bool) error {
func (s *ManagedStream) Push(orgID int64, path string, frame *data.Frame) error {
// Keep schema + data for last packet.
frameJSONWrapper, err := data.FrameToJSON(frame)
msg, err := data.FrameToJSONCache(frame)
if err != nil {
logger.Error("Error marshaling frame with Schema", "error", err)
logger.Error("Error marshaling frame with data", "error", err)
return err
}
frameJSON := frameJSONWrapper.Bytes(data.IncludeAll)
if !unstableSchema {
// If schema is stable we can safely cache it, and only send values if
// stream already has schema cached.
s.mu.Lock()
if _, ok := s.last[orgID]; !ok {
s.last[orgID] = map[string]json.RawMessage{}
}
_, exists := s.last[orgID][path]
s.last[orgID][path] = frameJSON
s.mu.Unlock()
// When the packet already exits, only send the data.
// TODO: maybe a good idea would be MarshalJSON function of
// frame to keep Schema JSON and Values JSON in frame object
// to avoid encoding twice.
if exists {
frameJSONWrapper, err = data.FrameToJSON(frame)
if err != nil {
logger.Error("Error marshaling Frame to JSON", "error", err)
return err
}
frameJSON = frameJSONWrapper.Bytes(data.IncludeDataOnly)
}
} else {
// For unstable schema we always need to send everything to a connection.
// And we don't want to cache schema for unstable case. But we still need to
// set path to a map to make stream visible in UI stream select widget.
s.mu.Lock()
if _, ok := s.last[orgID]; ok {
s.last[orgID][path] = nil
}
s.mu.Unlock()
s.mu.Lock()
if _, ok := s.last[orgID]; !ok {
s.last[orgID] = map[string]data.FrameJSONCache{}
}
last, exists := s.last[orgID][path]
s.last[orgID][path] = msg
s.mu.Unlock()
include := data.IncludeAll
if exists && last.SameSchema(&msg) {
// When the schema has not changed, just send the data.
include = data.IncludeDataOnly
}
frameJSON := msg.Bytes(include)
// The channel this will be posted into.
channel := live.Channel{Scope: live.ScopeStream, Namespace: s.id, Path: path}.String()
logger.Debug("Publish data to channel", "channel", channel, "dataLength", len(frameJSON))
return s.publisher(orgID, channel, frameJSON)
}
// getLastPacket retrieves schema for a channel.
// getLastPacket retrieves last packet channel.
func (s *ManagedStream) getLastPacket(orgId int64, path string) (json.RawMessage, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
@ -160,8 +141,11 @@ func (s *ManagedStream) getLastPacket(orgId int64, path string) (json.RawMessage
if !ok {
return nil, false
}
schema, ok := s.last[orgId][path]
return schema, ok && schema != nil
msg, ok := s.last[orgId][path]
if ok {
return msg.Bytes(data.IncludeAll), ok
}
return nil, ok
}
func (s *ManagedStream) GetHandlerForPath(_ string) (models.ChannelHandler, error) {
@ -184,7 +168,7 @@ func (s *ManagedStream) OnPublish(_ context.Context, u *models.SignedInUser, evt
// Stream scope only deals with data frames.
return models.PublishReply{}, 0, err
}
err = s.Push(u.OrgId, evt.Path, &frame, true)
err = s.Push(u.OrgId, evt.Path, &frame)
if err != nil {
// Stream scope only deals with data frames.
return models.PublishReply{}, 0, err

View File

@ -23,31 +23,17 @@ func TestNewManagedStream(t *testing.T) {
require.NotNil(t, c)
}
func TestManagedStream_GetLastPacket_UnstableSchema(t *testing.T) {
var orgID int64 = 1
publisher := &testPublisher{orgID: orgID, t: t}
c := NewManagedStream("a", publisher.publish)
_, ok := c.getLastPacket(orgID, "test")
require.False(t, ok)
err := c.Push(orgID, "test", data.NewFrame("hello"), true)
require.NoError(t, err)
_, ok = c.getLastPacket(orgID, "test")
require.NoError(t, err)
require.False(t, ok)
}
func TestManagedStream_GetLastPacket(t *testing.T) {
var orgID int64 = 1
publisher := &testPublisher{orgID: orgID, t: t}
c := NewManagedStream("a", publisher.publish)
_, ok := c.getLastPacket(orgID, "test")
require.False(t, ok)
err := c.Push(orgID, "test", data.NewFrame("hello"), false)
err := c.Push(orgID, "test", data.NewFrame("hello"))
require.NoError(t, err)
s, ok := c.getLastPacket(orgID, "test")
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, `{"schema":{"name":"hello","fields":[]},"data":{"values":[]}}`, string(s))
require.JSONEq(t, `{"schema":{"name":"hello","fields":[]},"data":{"values":[]}}`, string(s))
}

View File

@ -57,7 +57,6 @@ func (g *Gateway) Handle(ctx *models.ReqContext) {
// TODO Grafana 8: decide which formats to use or keep all.
urlValues := ctx.Req.URL.Query()
frameFormat := pushurl.FrameFormatFromValues(urlValues)
unstableSchema := pushurl.UnstableSchemaFromValues(urlValues)
body, err := ctx.Req.Body().Bytes()
if err != nil {
@ -69,7 +68,6 @@ func (g *Gateway) Handle(ctx *models.ReqContext) {
"protocol", "http",
"streamId", streamID,
"bodyLength", len(body),
"unstableSchema", unstableSchema,
"frameFormat", frameFormat,
)
@ -88,7 +86,7 @@ func (g *Gateway) Handle(ctx *models.ReqContext) {
// interval = "1s" vs flush_interval = "5s"
for _, mf := range metricFrames {
err := stream.Push(ctx.SignedInUser.OrgId, mf.Key(), mf.Frame(), unstableSchema)
err := stream.Push(ctx.SignedInUser.OrgId, mf.Key(), mf.Frame())
if err != nil {
ctx.Resp.WriteHeader(http.StatusInternalServerError)
return

View File

@ -6,15 +6,9 @@ import (
)
const (
unstableSchemaParam = "gf_live_unstable_schema"
frameFormatParam = "gf_live_frame_format"
frameFormatParam = "gf_live_frame_format"
)
// UnstableSchemaFromValues extracts unstable schema tip from url values.
func UnstableSchemaFromValues(values url.Values) bool {
return strings.ToLower(values.Get(unstableSchemaParam)) == "true" || values.Get(unstableSchemaParam) == "1"
}
// FrameFormatFromValues extracts frame format tip from url values.
func FrameFormatFromValues(values url.Values) string {
frameFormat := strings.ToLower(values.Get(frameFormatParam))

View File

@ -7,17 +7,6 @@ import (
"github.com/stretchr/testify/require"
)
func TestUnstableSchemaFromValues(t *testing.T) {
values := url.Values{}
require.False(t, UnstableSchemaFromValues(values))
values.Set(unstableSchemaParam, "yes")
require.False(t, UnstableSchemaFromValues(values))
values.Set(unstableSchemaParam, "true")
require.True(t, UnstableSchemaFromValues(values))
values.Set(unstableSchemaParam, "True")
require.True(t, UnstableSchemaFromValues(values))
}
func TestFrameFormatFromValues(t *testing.T) {
values := url.Values{}
require.Equal(t, "labels_column", FrameFormatFromValues(values))

View File

@ -174,13 +174,11 @@ func (s *Handler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
// TODO Grafana 8: decide which formats to use or keep all.
urlValues := r.URL.Query()
frameFormat := pushurl.FrameFormatFromValues(urlValues)
unstableSchema := pushurl.UnstableSchemaFromValues(urlValues)
logger.Debug("Live Push request",
"protocol", "http",
"streamId", streamID,
"bodyLength", len(body),
"unstableSchema", unstableSchema,
"frameFormat", frameFormat,
)
@ -191,7 +189,7 @@ func (s *Handler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
}
for _, mf := range metricFrames {
err := stream.Push(user.OrgId, mf.Key(), mf.Frame(), unstableSchema)
err := stream.Push(user.OrgId, mf.Key(), mf.Frame())
if err != nil {
return
}

View File

@ -70,9 +70,8 @@ func TestReadCSV(t *testing.T) {
require.NoError(t, err)
frame := data.NewFrame("", fBool, fBool2, fNum, fStr)
frameToJSON, err := data.FrameToJSON(frame)
out, err := data.FrameToJSON(frame, data.IncludeAll)
require.NoError(t, err)
out := frameToJSON.Bytes(data.IncludeAll)
require.JSONEq(t, `{"schema":{
"fields":[

View File

@ -72,6 +72,9 @@ export class QueryEditor extends PureComponent<Props, State> {
const { onChange, query, onRunQuery } = this.props;
onChange({ ...query, queryType: sel.value! });
onRunQuery();
// Reload the channel list
this.loadChannelInfo();
};
onChannelChange = (sel: SelectableValue<string>) => {