live: cleanups and tests (#32827)

This commit is contained in:
Alexander Emelin 2021-04-09 21:06:25 +03:00 committed by GitHub
parent 0c96cbeef0
commit d9602a5e60
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 134 additions and 42 deletions

View File

@ -4,8 +4,8 @@ import (
"strings"
)
// ChannelAddress is the channel ID split by parts.
type ChannelAddress struct {
// Channel is the channel ID split by parts.
type Channel struct {
// Scope is one of available channel scopes:
// like ScopeGrafana, ScopePlugin, ScopeDatasource, ScopeStream.
Scope string `json:"scope,omitempty"`
@ -21,11 +21,11 @@ type ChannelAddress struct {
Path string `json:"path,omitempty"`
}
// ParseChannelAddress parses the parts from a channel ID:
// ParseChannel parses the parts from a channel ID:
// ${scope} / ${namespace} / ${path}.
func ParseChannelAddress(id string) ChannelAddress {
addr := ChannelAddress{}
parts := strings.SplitN(id, "/", 3)
func ParseChannel(chID string) Channel {
addr := Channel{}
parts := strings.SplitN(chID, "/", 3)
length := len(parts)
if length > 0 {
addr.Scope = parts[0]
@ -39,10 +39,22 @@ func ParseChannelAddress(id string) ChannelAddress {
return addr
}
// IsValid checks if all parts of the address are valid.
func (ca *ChannelAddress) IsValid() bool {
if ca.Scope == ScopePush {
return ca.Namespace != "" && ca.Path == ""
func (c Channel) String() string {
ch := c.Scope
if c.Namespace != "" {
ch += "/" + c.Namespace
}
return ca.Scope != "" && ca.Namespace != "" && ca.Path != ""
if c.Path != "" {
ch += "/" + c.Path
}
return ch
}
// IsValid checks if all parts of the address are valid.
func (c *Channel) IsValid() bool {
if c.Scope == ScopePush {
// Push scope channels supposed to be like push/{$stream_id}.
return c.Namespace != "" && c.Path == ""
}
return c.Scope != "" && c.Namespace != "" && c.Path != ""
}

View File

@ -7,11 +7,11 @@ import (
"github.com/stretchr/testify/require"
)
func TestParseChannelAddress_Valid(t *testing.T) {
addr := ParseChannelAddress("aaa/bbb/ccc/ddd")
func TestParseChannel(t *testing.T) {
addr := ParseChannel("aaa/bbb/ccc/ddd")
require.True(t, addr.IsValid())
ex := ChannelAddress{
ex := Channel{
Scope: "aaa",
Namespace: "bbb",
Path: "ccc/ddd",
@ -22,7 +22,89 @@ func TestParseChannelAddress_Valid(t *testing.T) {
}
}
func TestParseChannelAddress_Invalid(t *testing.T) {
addr := ParseChannelAddress("aaa/bbb")
require.False(t, addr.IsValid())
func TestParseChannel_IsValid(t *testing.T) {
tests := []struct {
name string
id string
isValid bool
}{
{
name: "valid",
id: "stream/cpu/test",
isValid: true,
},
{
name: "valid_long_path",
id: "stream/cpu/test/other",
isValid: true,
},
{
name: "invalid_no_path",
id: "grafana/bbb",
isValid: false,
},
{
name: "invalid_only_scope",
id: "grafana",
isValid: false,
},
{
name: "push_scope_no_path_valid",
id: "push/telegraf",
isValid: true,
},
{
name: "push_scope_with_path_invalid",
id: "push/telegraf/test",
isValid: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := ParseChannel(tt.id); got.IsValid() != tt.isValid {
t.Errorf("unexpected isValid result for %s", tt.id)
}
})
}
}
func TestChannel_String(t *testing.T) {
type fields struct {
Scope string
Namespace string
Path string
}
tests := []struct {
name string
fields fields
want string
}{
{
"with_all_parts",
fields{Scope: ScopeStream, Namespace: "telegraf", Path: "test"},
"stream/telegraf/test",
},
{
"with_scope_and_namespace",
fields{Scope: ScopeStream, Namespace: "telegraf"},
"stream/telegraf",
},
{
"with_scope_only",
fields{Scope: ScopeStream},
"stream",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := Channel{
Scope: tt.fields.Scope,
Namespace: tt.fields.Namespace,
Path: tt.fields.Path,
}.String()
if got != tt.want {
t.Errorf("String() = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -11,15 +11,15 @@ import (
type Demultiplexer struct {
streamID string
telegrafConverterWide *telegraf.Converter
managedStreamRunner *ManagedStreamRunner
telegrafConverterWide *telegraf.Converter
}
func NewDemultiplexer(managedStreamRunner *ManagedStreamRunner, streamID string) *Demultiplexer {
func NewDemultiplexer(streamID string, managedStreamRunner *ManagedStreamRunner) *Demultiplexer {
return &Demultiplexer{
streamID: streamID,
telegrafConverterWide: telegraf.NewConverter(),
managedStreamRunner: managedStreamRunner,
telegrafConverterWide: telegraf.NewConverter(),
}
}
@ -27,20 +27,19 @@ func (s *Demultiplexer) GetHandlerForPath(_ string) (models.ChannelHandler, erro
return s, nil
}
func (s *Demultiplexer) OnSubscribe(_ context.Context, _ *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) {
reply := models.SubscribeReply{}
return reply, backend.SubscribeStreamStatusPermissionDenied, nil
func (s *Demultiplexer) OnSubscribe(_ context.Context, _ *models.SignedInUser, _ models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) {
return models.SubscribeReply{}, backend.SubscribeStreamStatusPermissionDenied, nil
}
func (s *Demultiplexer) OnPublish(_ context.Context, _ *models.SignedInUser, evt models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) {
stream, err := s.managedStreamRunner.GetOrCreateStream(s.streamID)
if err != nil {
logger.Error("Error getting stream", "error", err)
logger.Error("Error getting stream", "error", err, "streamId", s.streamID)
return models.PublishReply{}, 0, err
}
metricFrames, err := s.telegrafConverterWide.Convert(evt.Data)
if err != nil {
logger.Error("Error converting metrics", "error", err)
logger.Error("Error converting metrics", "error", err, "data", string(evt.Data))
return models.PublishReply{}, 0, err
}
for _, mf := range metricFrames {

View File

@ -306,11 +306,11 @@ func publishStatusToHTTPError(status backend.PublishStreamStatus) (int, string)
}
// GetChannelHandler gives thread-safe access to the channel.
func (g *GrafanaLive) GetChannelHandler(user *models.SignedInUser, channel string) (models.ChannelHandler, ChannelAddress, error) {
func (g *GrafanaLive) GetChannelHandler(user *models.SignedInUser, channel string) (models.ChannelHandler, Channel, error) {
// Parse the identifier ${scope}/${namespace}/${path}
addr := ParseChannelAddress(channel)
addr := ParseChannel(channel)
if !addr.IsValid() {
return nil, ChannelAddress{}, fmt.Errorf("invalid channel: %q", channel)
return nil, Channel{}, fmt.Errorf("invalid channel: %q", channel)
}
g.channelsMu.RLock()
@ -397,7 +397,7 @@ func (g *GrafanaLive) handleStreamScope(_ *models.SignedInUser, namespace string
}
func (g *GrafanaLive) handlePushScope(_ *models.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) {
return NewDemultiplexer(g.ManagedStreamRunner, namespace), nil
return NewDemultiplexer(namespace, g.ManagedStreamRunner), nil
}
func (g *GrafanaLive) handleDatasourceScope(user *models.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) {
@ -430,7 +430,7 @@ func (g *GrafanaLive) IsEnabled() bool {
}
func (g *GrafanaLive) HandleHTTPPublish(ctx *models.ReqContext, cmd dtos.LivePublishCmd) response.Response {
addr := ParseChannelAddress(cmd.Channel)
addr := ParseChannel(cmd.Channel)
if !addr.IsValid() {
return response.Error(http.StatusBadRequest, "Bad channel address", nil)
}

View File

@ -3,7 +3,6 @@ package live
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
@ -13,13 +12,14 @@ import (
"github.com/grafana/grafana/pkg/util"
)
// ManagedStreamRunner keeps ManagedStream per streamID.
type ManagedStreamRunner struct {
mu sync.RWMutex
streams map[string]*ManagedStream
publisher models.ChannelPublisher
}
// NewPluginRunner creates new PluginRunner.
// NewManagedStreamRunner creates new ManagedStreamRunner.
func NewManagedStreamRunner(publisher models.ChannelPublisher) *ManagedStreamRunner {
return &ManagedStreamRunner{
publisher: publisher,
@ -27,7 +27,7 @@ func NewManagedStreamRunner(publisher models.ChannelPublisher) *ManagedStreamRun
}
}
// Streams returns map of active managed streams.
// Streams returns a map of active managed streams (per streamID).
func (r *ManagedStreamRunner) Streams() map[string]*ManagedStream {
r.mu.RLock()
defer r.mu.RUnlock()
@ -51,7 +51,7 @@ func (r *ManagedStreamRunner) GetOrCreateStream(streamID string) (*ManagedStream
return s, nil
}
// ManagedStream holds the state of a managed stream
// ManagedStream holds the state of a managed stream.
type ManagedStream struct {
mu sync.RWMutex
id string
@ -60,7 +60,7 @@ type ManagedStream struct {
publisher models.ChannelPublisher
}
// NewCache creates new Cache.
// NewManagedStream creates new ManagedStream.
func NewManagedStream(id string, publisher models.ChannelPublisher) *ManagedStream {
return &ManagedStream{
id: id,
@ -85,7 +85,7 @@ func (s *ManagedStream) ListChannels(prefix string) []util.DynMap {
return info
}
// Push sends data to the stream and optionally processes it.
// Push sends frame to the stream and saves it for later retrieval by subscribers.
func (s *ManagedStream) Push(path string, frame *data.Frame) error {
// Keep schema + data for last packet.
frameJSON, err := data.FrameToJSON(frame, true, true)
@ -111,7 +111,7 @@ func (s *ManagedStream) Push(path string, frame *data.Frame) error {
}
// The channel this will be posted into.
channel := fmt.Sprintf("stream/%s/%s", s.id, path)
channel := Channel{Scope: ScopeStream, Namespace: s.id, Path: path}.String()
logger.Debug("Publish data to channel", "channel", channel, "dataLength", len(frameJSON))
return s.publisher(channel, frameJSON)
}
@ -138,16 +138,15 @@ func (s *ManagedStream) OnSubscribe(_ context.Context, _ *models.SignedInUser, e
}
func (s *ManagedStream) OnPublish(_ context.Context, _ *models.SignedInUser, evt models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) {
logger.Debug("OnPublish", evt.Channel, "evt", evt)
var frame data.Frame
err := json.Unmarshal(evt.Data, &frame)
if err != nil {
// stream scope only deals with data frames.
// Stream scope only deals with data frames.
return models.PublishReply{}, 0, err
}
err = s.Push(evt.Path, &frame)
if err != nil {
// stream scope only deals with data frames.
// Stream scope only deals with data frames.
return models.PublishReply{}, 0, err
}
return models.PublishReply{}, backend.PublishStreamStatusOK, nil

View File

@ -7,8 +7,8 @@ const (
ScopePlugin = "plugin"
// ScopeDatasource passes control to a datasource plugin.
ScopeDatasource = "ds"
// ScopeStream is a managed data frame stream
// ScopeStream is a managed data frame stream.
ScopeStream = "stream"
// ScopePush allows sending data into managed streams. It does not support subscriptions
// ScopePush allows sending data into managed streams. It does not support subscriptions.
ScopePush = "push"
)