From 2459a0ceb588841fce40046083376245fe304afa Mon Sep 17 00:00:00 2001 From: Alexander Emelin Date: Wed, 12 May 2021 18:47:03 +0300 Subject: [PATCH] live: remove demultiplexor (#34012) --- .../live/demultiplexer/demultiplexer.go | 78 ------------------- pkg/services/live/live.go | 7 -- 2 files changed, 85 deletions(-) delete mode 100644 pkg/services/live/demultiplexer/demultiplexer.go diff --git a/pkg/services/live/demultiplexer/demultiplexer.go b/pkg/services/live/demultiplexer/demultiplexer.go deleted file mode 100644 index 6783fd16d5a..00000000000 --- a/pkg/services/live/demultiplexer/demultiplexer.go +++ /dev/null @@ -1,78 +0,0 @@ -package demultiplexer - -import ( - "context" - "errors" - - "github.com/grafana/grafana/pkg/infra/log" - "github.com/grafana/grafana/pkg/models" - "github.com/grafana/grafana/pkg/services/live/convert" - "github.com/grafana/grafana/pkg/services/live/livecontext" - "github.com/grafana/grafana/pkg/services/live/managedstream" - "github.com/grafana/grafana/pkg/services/live/pushurl" - - "github.com/grafana/grafana-plugin-sdk-go/backend" -) - -var ( - logger = log.New("live.push_ws") -) - -type Demultiplexer struct { - streamID string - managedStreamRunner *managedstream.Runner - converter *convert.Converter -} - -func New(streamID string, managedStreamRunner *managedstream.Runner) *Demultiplexer { - return &Demultiplexer{ - streamID: streamID, - managedStreamRunner: managedStreamRunner, - converter: convert.NewConverter(), - } -} - -func (s *Demultiplexer) GetHandlerForPath(_ string) (models.ChannelHandler, error) { - return s, nil -} - -func (s *Demultiplexer) OnSubscribe(_ context.Context, _ *models.SignedInUser, _ models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) { - return models.SubscribeReply{}, backend.SubscribeStreamStatusPermissionDenied, nil -} - -func (s *Demultiplexer) OnPublish(ctx context.Context, u *models.SignedInUser, evt models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) { - urlValues, ok := livecontext.GetContextValues(ctx) - if !ok { - return models.PublishReply{}, 0, errors.New("error extracting context url values") - } - - stream, err := s.managedStreamRunner.GetOrCreateStream(u.OrgId, s.streamID) - if err != nil { - logger.Error("Error getting stream", "error", err, "streamId", s.streamID) - return models.PublishReply{}, 0, err - } - - frameFormat := pushurl.FrameFormatFromValues(urlValues) - unstableSchema := pushurl.UnstableSchemaFromValues(urlValues) - - logger.Debug("Live Push request", - "protocol", "ws", - "streamId", s.streamID, - "bodyLength", len(evt.Data), - "unstableSchema", unstableSchema, - "frameFormat", frameFormat, - ) - - metricFrames, err := s.converter.Convert(evt.Data, frameFormat) - if err != nil { - logger.Error("Error converting metrics", "error", err, "data", string(evt.Data), "frameFormat", frameFormat) - return models.PublishReply{}, 0, err - } - for _, mf := range metricFrames { - err := stream.Push(u.OrgId, mf.Key(), mf.Frame(), unstableSchema) - if err != nil { - return models.PublishReply{}, 0, err - } - } - return models.PublishReply{}, backend.PublishStreamStatusOK, nil -} diff --git a/pkg/services/live/live.go b/pkg/services/live/live.go index 69a7a5241c3..44d57ff2b7b 100644 --- a/pkg/services/live/live.go +++ b/pkg/services/live/live.go @@ -20,7 +20,6 @@ import ( "github.com/grafana/grafana/pkg/registry" "github.com/grafana/grafana/pkg/services/datasources" "github.com/grafana/grafana/pkg/services/live/database" - "github.com/grafana/grafana/pkg/services/live/demultiplexer" "github.com/grafana/grafana/pkg/services/live/features" "github.com/grafana/grafana/pkg/services/live/livecontext" "github.com/grafana/grafana/pkg/services/live/managedstream" @@ -472,8 +471,6 @@ func (g *GrafanaLive) GetChannelHandlerFactory(user *models.SignedInUser, scope return g.handleDatasourceScope(user, namespace) case live.ScopeStream: return g.handleStreamScope(user, namespace) - case live.ScopePush: - return g.handlePushScope(user, namespace) default: return nil, fmt.Errorf("invalid scope: %q", scope) } @@ -511,10 +508,6 @@ func (g *GrafanaLive) handleStreamScope(u *models.SignedInUser, namespace string return g.ManagedStreamRunner.GetOrCreateStream(u.OrgId, namespace) } -func (g *GrafanaLive) handlePushScope(_ *models.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) { - return demultiplexer.New(namespace, g.ManagedStreamRunner), nil -} - func (g *GrafanaLive) handleDatasourceScope(user *models.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) { ds, err := g.DatasourceCache.GetDatasourceByUID(namespace, user, false) if err != nil {