mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
live: remove demultiplexor (#34012)
This commit is contained in:
parent
ca8f6addab
commit
2459a0ceb5
@ -1,78 +0,0 @@
|
|||||||
package demultiplexer
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/infra/log"
|
|
||||||
"github.com/grafana/grafana/pkg/models"
|
|
||||||
"github.com/grafana/grafana/pkg/services/live/convert"
|
|
||||||
"github.com/grafana/grafana/pkg/services/live/livecontext"
|
|
||||||
"github.com/grafana/grafana/pkg/services/live/managedstream"
|
|
||||||
"github.com/grafana/grafana/pkg/services/live/pushurl"
|
|
||||||
|
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
logger = log.New("live.push_ws")
|
|
||||||
)
|
|
||||||
|
|
||||||
type Demultiplexer struct {
|
|
||||||
streamID string
|
|
||||||
managedStreamRunner *managedstream.Runner
|
|
||||||
converter *convert.Converter
|
|
||||||
}
|
|
||||||
|
|
||||||
func New(streamID string, managedStreamRunner *managedstream.Runner) *Demultiplexer {
|
|
||||||
return &Demultiplexer{
|
|
||||||
streamID: streamID,
|
|
||||||
managedStreamRunner: managedStreamRunner,
|
|
||||||
converter: convert.NewConverter(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Demultiplexer) GetHandlerForPath(_ string) (models.ChannelHandler, error) {
|
|
||||||
return s, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Demultiplexer) OnSubscribe(_ context.Context, _ *models.SignedInUser, _ models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) {
|
|
||||||
return models.SubscribeReply{}, backend.SubscribeStreamStatusPermissionDenied, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Demultiplexer) OnPublish(ctx context.Context, u *models.SignedInUser, evt models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) {
|
|
||||||
urlValues, ok := livecontext.GetContextValues(ctx)
|
|
||||||
if !ok {
|
|
||||||
return models.PublishReply{}, 0, errors.New("error extracting context url values")
|
|
||||||
}
|
|
||||||
|
|
||||||
stream, err := s.managedStreamRunner.GetOrCreateStream(u.OrgId, s.streamID)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error("Error getting stream", "error", err, "streamId", s.streamID)
|
|
||||||
return models.PublishReply{}, 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
frameFormat := pushurl.FrameFormatFromValues(urlValues)
|
|
||||||
unstableSchema := pushurl.UnstableSchemaFromValues(urlValues)
|
|
||||||
|
|
||||||
logger.Debug("Live Push request",
|
|
||||||
"protocol", "ws",
|
|
||||||
"streamId", s.streamID,
|
|
||||||
"bodyLength", len(evt.Data),
|
|
||||||
"unstableSchema", unstableSchema,
|
|
||||||
"frameFormat", frameFormat,
|
|
||||||
)
|
|
||||||
|
|
||||||
metricFrames, err := s.converter.Convert(evt.Data, frameFormat)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error("Error converting metrics", "error", err, "data", string(evt.Data), "frameFormat", frameFormat)
|
|
||||||
return models.PublishReply{}, 0, err
|
|
||||||
}
|
|
||||||
for _, mf := range metricFrames {
|
|
||||||
err := stream.Push(u.OrgId, mf.Key(), mf.Frame(), unstableSchema)
|
|
||||||
if err != nil {
|
|
||||||
return models.PublishReply{}, 0, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return models.PublishReply{}, backend.PublishStreamStatusOK, nil
|
|
||||||
}
|
|
@ -20,7 +20,6 @@ import (
|
|||||||
"github.com/grafana/grafana/pkg/registry"
|
"github.com/grafana/grafana/pkg/registry"
|
||||||
"github.com/grafana/grafana/pkg/services/datasources"
|
"github.com/grafana/grafana/pkg/services/datasources"
|
||||||
"github.com/grafana/grafana/pkg/services/live/database"
|
"github.com/grafana/grafana/pkg/services/live/database"
|
||||||
"github.com/grafana/grafana/pkg/services/live/demultiplexer"
|
|
||||||
"github.com/grafana/grafana/pkg/services/live/features"
|
"github.com/grafana/grafana/pkg/services/live/features"
|
||||||
"github.com/grafana/grafana/pkg/services/live/livecontext"
|
"github.com/grafana/grafana/pkg/services/live/livecontext"
|
||||||
"github.com/grafana/grafana/pkg/services/live/managedstream"
|
"github.com/grafana/grafana/pkg/services/live/managedstream"
|
||||||
@ -472,8 +471,6 @@ func (g *GrafanaLive) GetChannelHandlerFactory(user *models.SignedInUser, scope
|
|||||||
return g.handleDatasourceScope(user, namespace)
|
return g.handleDatasourceScope(user, namespace)
|
||||||
case live.ScopeStream:
|
case live.ScopeStream:
|
||||||
return g.handleStreamScope(user, namespace)
|
return g.handleStreamScope(user, namespace)
|
||||||
case live.ScopePush:
|
|
||||||
return g.handlePushScope(user, namespace)
|
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("invalid scope: %q", scope)
|
return nil, fmt.Errorf("invalid scope: %q", scope)
|
||||||
}
|
}
|
||||||
@ -511,10 +508,6 @@ func (g *GrafanaLive) handleStreamScope(u *models.SignedInUser, namespace string
|
|||||||
return g.ManagedStreamRunner.GetOrCreateStream(u.OrgId, namespace)
|
return g.ManagedStreamRunner.GetOrCreateStream(u.OrgId, namespace)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GrafanaLive) handlePushScope(_ *models.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) {
|
|
||||||
return demultiplexer.New(namespace, g.ManagedStreamRunner), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *GrafanaLive) handleDatasourceScope(user *models.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) {
|
func (g *GrafanaLive) handleDatasourceScope(user *models.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) {
|
||||||
ds, err := g.DatasourceCache.GetDatasourceByUID(namespace, user, false)
|
ds, err := g.DatasourceCache.GetDatasourceByUID(namespace, user, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user