live: rename url param - assume stable schema by default (#33383)

This commit is contained in:
Alexander Emelin 2021-04-26 20:46:26 +03:00 committed by GitHub
parent c41b5d80a3
commit 2abd9bc3b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 21 additions and 19 deletions

View File

@ -53,13 +53,13 @@ func (s *Demultiplexer) OnPublish(ctx context.Context, _ *models.SignedInUser, e
}
frameFormat := pushurl.FrameFormatFromValues(urlValues)
stableSchema := pushurl.StableSchemaFromValues(urlValues)
unstableSchema := pushurl.UnstableSchemaFromValues(urlValues)
logger.Debug("Live Push request",
"protocol", "ws",
"streamId", s.streamID,
"bodyLength", len(evt.Data),
"stableSchema", stableSchema,
"unstableSchema", unstableSchema,
"frameFormat", frameFormat,
)
@ -69,7 +69,7 @@ func (s *Demultiplexer) OnPublish(ctx context.Context, _ *models.SignedInUser, e
return models.PublishReply{}, 0, err
}
for _, mf := range metricFrames {
err := stream.Push(mf.Key(), mf.Frame(), stableSchema)
err := stream.Push(mf.Key(), mf.Frame(), unstableSchema)
if err != nil {
return models.PublishReply{}, 0, err
}

View File

@ -92,7 +92,8 @@ func (s *ManagedStream) ListChannels(prefix string) []util.DynMap {
}
// Push sends frame to the stream and saves it for later retrieval by subscribers.
func (s *ManagedStream) Push(path string, frame *data.Frame, stableSchema bool) error {
// unstableSchema flag can be set to disable schema caching for a path.
func (s *ManagedStream) Push(path string, frame *data.Frame, unstableSchema bool) error {
// Keep schema + data for last packet.
frameJSON, err := data.FrameToJSON(frame, true, true)
if err != nil {
@ -100,7 +101,7 @@ func (s *ManagedStream) Push(path string, frame *data.Frame, stableSchema bool)
return err
}
if stableSchema {
if !unstableSchema {
// If schema is stable we can safely cache it, and only send values if
// stream already has schema cached.
s.mu.Lock()

View File

@ -16,11 +16,11 @@ func TestNewManagedStream(t *testing.T) {
require.NotNil(t, c)
}
func TestManagedStream_GetLastPacket(t *testing.T) {
func TestManagedStream_GetLastPacket_UnstableSchema(t *testing.T) {
c := NewManagedStream("a", noopPublisher)
_, ok := c.getLastPacket("test")
require.False(t, ok)
err := c.Push("test", data.NewFrame("hello"), false)
err := c.Push("test", data.NewFrame("hello"), true)
require.NoError(t, err)
_, ok = c.getLastPacket("test")
@ -28,11 +28,11 @@ func TestManagedStream_GetLastPacket(t *testing.T) {
require.False(t, ok)
}
func TestManagedStream_GetLastPacket_StableSchema(t *testing.T) {
func TestManagedStream_GetLastPacket(t *testing.T) {
c := NewManagedStream("a", noopPublisher)
_, ok := c.getLastPacket("test")
require.False(t, ok)
err := c.Push("test", data.NewFrame("hello"), true)
err := c.Push("test", data.NewFrame("hello"), false)
require.NoError(t, err)
s, ok := c.getLastPacket("test")

View File

@ -71,7 +71,7 @@ func (g *Gateway) Handle(ctx *models.ReqContext) {
// TODO Grafana 8: decide which formats to use or keep all.
urlValues := ctx.Req.URL.Query()
frameFormat := pushurl.FrameFormatFromValues(urlValues)
stableSchema := pushurl.StableSchemaFromValues(urlValues)
unstableSchema := pushurl.UnstableSchemaFromValues(urlValues)
body, err := ctx.Req.Body().Bytes()
if err != nil {
@ -83,7 +83,7 @@ func (g *Gateway) Handle(ctx *models.ReqContext) {
"protocol", "http",
"streamId", streamID,
"bodyLength", len(body),
"stableSchema", stableSchema,
"unstableSchema", unstableSchema,
"frameFormat", frameFormat,
)
@ -102,7 +102,7 @@ func (g *Gateway) Handle(ctx *models.ReqContext) {
// interval = "1s" vs flush_interval = "5s"
for _, mf := range metricFrames {
err := stream.Push(mf.Key(), mf.Frame(), stableSchema)
err := stream.Push(mf.Key(), mf.Frame(), unstableSchema)
if err != nil {
ctx.Resp.WriteHeader(http.StatusInternalServerError)
return

View File

@ -5,12 +5,13 @@ import (
"strings"
)
// StableSchemaFromValues assumes stable
func StableSchemaFromValues(values url.Values) bool {
key := "gf_live_stable_schema"
return !(strings.ToLower(values.Get(key)) == "false" || values.Get(key) == "0")
// UnstableSchemaFromValues extracts unstable schema tip from url values.
func UnstableSchemaFromValues(values url.Values) bool {
key := "gf_live_unstable_schema"
return strings.ToLower(values.Get(key)) == "true" || values.Get(key) == "1"
}
// FrameFormatFromValues extracts frame format tip from url values.
func FrameFormatFromValues(values url.Values) string {
frameFormat := strings.ToLower(values.Get("gf_live_frame_format"))
if frameFormat == "" {

View File

@ -171,13 +171,13 @@ func (s *Handler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
// TODO Grafana 8: decide which formats to use or keep all.
urlValues := r.URL.Query()
frameFormat := pushurl.FrameFormatFromValues(urlValues)
stableSchema := pushurl.StableSchemaFromValues(urlValues)
unstableSchema := pushurl.UnstableSchemaFromValues(urlValues)
logger.Debug("Live Push request",
"protocol", "http",
"streamId", streamID,
"bodyLength", len(body),
"stableSchema", stableSchema,
"unstableSchema", unstableSchema,
"frameFormat", frameFormat,
)
@ -188,7 +188,7 @@ func (s *Handler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
}
for _, mf := range metricFrames {
err := stream.Push(mf.Key(), mf.Frame(), stableSchema)
err := stream.Push(mf.Key(), mf.Frame(), unstableSchema)
if err != nil {
return
}