Live: push scope to demultiplex incoming data to stream channels (#32808)

This commit is contained in:
Alexander Emelin 2021-04-08 22:40:06 +03:00 committed by GitHub
parent 8f8cad9b83
commit a92bcce9de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 80 additions and 3 deletions

View File

@ -35,7 +35,7 @@ type PublishEvent struct {
// PublishReply is a reaction to PublishEvent.
type PublishReply struct {
// By default, it's a handler responsibility to publish data
// into a stream upon OnPublish but setting Fallthrough to true
// into a stream upon OnPublish but returning a data here
// will make Grafana Live publish data itself (i.e. stream handler
// just works as permission proxy in this case).
Data json.RawMessage

View File

@ -41,5 +41,8 @@ func ParseChannelAddress(id string) ChannelAddress {
// IsValid checks if all parts of the address are valid.
func (ca *ChannelAddress) IsValid() bool {
if ca.Scope == ScopePush {
return ca.Namespace != "" && ca.Path == ""
}
return ca.Scope != "" && ca.Namespace != "" && ca.Path != ""
}

View File

@ -0,0 +1,53 @@
package live
import (
"context"
"github.com/grafana/grafana-live-sdk/telemetry/telegraf"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/models"
)
type Demultiplexer struct {
streamID string
telegrafConverterWide *telegraf.Converter
managedStreamRunner *ManagedStreamRunner
}
func NewDemultiplexer(managedStreamRunner *ManagedStreamRunner, streamID string) *Demultiplexer {
return &Demultiplexer{
streamID: streamID,
telegrafConverterWide: telegraf.NewConverter(),
managedStreamRunner: managedStreamRunner,
}
}
func (s *Demultiplexer) GetHandlerForPath(_ string) (models.ChannelHandler, error) {
return s, nil
}
func (s *Demultiplexer) OnSubscribe(_ context.Context, _ *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) {
reply := models.SubscribeReply{}
return reply, backend.SubscribeStreamStatusPermissionDenied, nil
}
func (s *Demultiplexer) OnPublish(_ context.Context, _ *models.SignedInUser, evt models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) {
stream, err := s.managedStreamRunner.GetOrCreateStream(s.streamID)
if err != nil {
logger.Error("Error getting stream", "error", err)
return models.PublishReply{}, 0, err
}
metricFrames, err := s.telegrafConverterWide.Convert(evt.Data)
if err != nil {
logger.Error("Error converting metrics", "error", err)
return models.PublishReply{}, 0, err
}
for _, mf := range metricFrames {
err := stream.Push(mf.Key(), mf.Frame())
if err != nil {
return models.PublishReply{}, 0, err
}
}
return models.PublishReply{}, backend.PublishStreamStatusOK, nil
}

View File

@ -206,6 +206,7 @@ func (g *GrafanaLive) Init() error {
reply, status, err := handler.OnPublish(client.Context(), user, models.PublishEvent{
Channel: e.Channel,
Path: addr.Path,
Data: e.Data,
})
if err != nil {
logger.Error("Error calling channel handler publish", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
@ -356,6 +357,8 @@ func (g *GrafanaLive) GetChannelHandlerFactory(user *models.SignedInUser, scope
return g.handleDatasourceScope(user, namespace)
case ScopeStream:
return g.handleStreamScope(user, namespace)
case ScopePush:
return g.handlePushScope(user, namespace)
default:
return nil, fmt.Errorf("invalid scope: %q", scope)
}
@ -393,6 +396,10 @@ func (g *GrafanaLive) handleStreamScope(_ *models.SignedInUser, namespace string
return g.ManagedStreamRunner.GetOrCreateStream(namespace)
}
func (g *GrafanaLive) handlePushScope(_ *models.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) {
return NewDemultiplexer(g.ManagedStreamRunner, namespace), nil
}
func (g *GrafanaLive) handleDatasourceScope(user *models.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) {
ds, err := g.DatasourceCache.GetDatasourceByUID(namespace, user, false)
if err != nil {

View File

@ -137,6 +137,18 @@ func (s *ManagedStream) OnSubscribe(_ context.Context, _ *models.SignedInUser, e
return reply, backend.SubscribeStreamStatusOK, nil
}
func (s *ManagedStream) OnPublish(_ context.Context, _ *models.SignedInUser, _ models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) {
return models.PublishReply{}, backend.PublishStreamStatusPermissionDenied, nil
func (s *ManagedStream) OnPublish(_ context.Context, _ *models.SignedInUser, evt models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) {
logger.Debug("OnPublish", evt.Channel, "evt", evt)
var frame data.Frame
err := json.Unmarshal(evt.Data, &frame)
if err != nil {
// stream scope only deals with data frames.
return models.PublishReply{}, 0, err
}
err = s.Push(evt.Path, &frame)
if err != nil {
// stream scope only deals with data frames.
return models.PublishReply{}, 0, err
}
return models.PublishReply{}, backend.PublishStreamStatusOK, nil
}

View File

@ -9,4 +9,6 @@ const (
ScopeDatasource = "ds"
// ScopeStream is a managed data frame stream
ScopeStream = "stream"
// ScopePush allows sending data into managed streams. It does not support subscriptions
ScopePush = "push"
)