2021-03-23 12:24:08 -05:00
|
|
|
package features
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"testing"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
|
|
|
|
|
|
|
"github.com/golang/mock/gomock"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
|
|
)
|
|
|
|
|
|
|
|
// wait until channel closed with timeout.
|
|
|
|
func waitWithTimeout(tb testing.TB, ch chan struct{}, timeout time.Duration) {
|
|
|
|
select {
|
|
|
|
case <-ch:
|
|
|
|
case <-time.After(timeout):
|
|
|
|
tb.Fatal("timeout")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestStreamManager_Run(t *testing.T) {
|
|
|
|
mockCtrl := gomock.NewController(t)
|
|
|
|
defer mockCtrl.Finish()
|
|
|
|
|
2021-04-02 11:41:45 -05:00
|
|
|
mockPacketSender := NewMockStreamPacketSender(mockCtrl)
|
2021-03-23 12:24:08 -05:00
|
|
|
mockPresenceGetter := NewMockPresenceGetter(mockCtrl)
|
|
|
|
|
2021-04-02 11:41:45 -05:00
|
|
|
manager := NewStreamManager(mockPacketSender, mockPresenceGetter)
|
2021-03-23 12:24:08 -05:00
|
|
|
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
cancel()
|
|
|
|
}()
|
|
|
|
|
|
|
|
err := manager.Run(ctx)
|
|
|
|
require.ErrorIs(t, err, context.Canceled)
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestStreamManager_SubmitStream_Send(t *testing.T) {
|
|
|
|
mockCtrl := gomock.NewController(t)
|
|
|
|
defer mockCtrl.Finish()
|
|
|
|
|
2021-04-02 11:41:45 -05:00
|
|
|
mockPacketSender := NewMockStreamPacketSender(mockCtrl)
|
2021-03-23 12:24:08 -05:00
|
|
|
mockPresenceGetter := NewMockPresenceGetter(mockCtrl)
|
|
|
|
|
2021-04-02 11:41:45 -05:00
|
|
|
manager := NewStreamManager(mockPacketSender, mockPresenceGetter)
|
2021-03-23 12:24:08 -05:00
|
|
|
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
defer cancel()
|
|
|
|
go func() {
|
|
|
|
_ = manager.Run(ctx)
|
|
|
|
}()
|
|
|
|
|
|
|
|
startedCh := make(chan struct{})
|
|
|
|
doneCh := make(chan struct{})
|
|
|
|
|
2021-04-02 11:41:45 -05:00
|
|
|
mockPacketSender.EXPECT().Send("test", gomock.Any()).Times(1)
|
2021-03-23 12:24:08 -05:00
|
|
|
|
|
|
|
mockStreamRunner := NewMockStreamRunner(mockCtrl)
|
|
|
|
mockStreamRunner.EXPECT().RunStream(
|
|
|
|
gomock.Any(), gomock.Any(), gomock.Any(),
|
|
|
|
).Do(func(ctx context.Context, req *backend.RunStreamRequest, sender backend.StreamPacketSender) error {
|
|
|
|
require.Equal(t, "test", req.Path)
|
|
|
|
close(startedCh)
|
|
|
|
err := sender.Send(&backend.StreamPacket{
|
2021-04-02 11:41:45 -05:00
|
|
|
Data: []byte("test"),
|
2021-03-23 12:24:08 -05:00
|
|
|
})
|
|
|
|
require.NoError(t, err)
|
|
|
|
<-ctx.Done()
|
|
|
|
close(doneCh)
|
|
|
|
return ctx.Err()
|
|
|
|
}).Times(1)
|
|
|
|
|
2021-04-02 11:41:45 -05:00
|
|
|
result, err := manager.SubmitStream(context.Background(), "test", "test", backend.PluginContext{}, mockStreamRunner)
|
2021-03-23 12:24:08 -05:00
|
|
|
require.NoError(t, err)
|
2021-04-02 11:41:45 -05:00
|
|
|
require.False(t, result.StreamExists)
|
2021-03-23 12:24:08 -05:00
|
|
|
|
|
|
|
// try submit the same.
|
2021-04-02 11:41:45 -05:00
|
|
|
result, err = manager.SubmitStream(context.Background(), "test", "test", backend.PluginContext{}, mockStreamRunner)
|
2021-03-23 12:24:08 -05:00
|
|
|
require.NoError(t, err)
|
2021-04-02 11:41:45 -05:00
|
|
|
require.True(t, result.StreamExists)
|
2021-03-23 12:24:08 -05:00
|
|
|
|
|
|
|
waitWithTimeout(t, startedCh, time.Second)
|
|
|
|
require.Len(t, manager.streams, 1)
|
|
|
|
cancel()
|
|
|
|
waitWithTimeout(t, doneCh, time.Second)
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestStreamManager_SubmitStream_CloseNoSubscribers(t *testing.T) {
|
|
|
|
mockCtrl := gomock.NewController(t)
|
|
|
|
defer mockCtrl.Finish()
|
|
|
|
|
2021-04-02 11:41:45 -05:00
|
|
|
mockPacketSender := NewMockStreamPacketSender(mockCtrl)
|
2021-03-23 12:24:08 -05:00
|
|
|
mockPresenceGetter := NewMockPresenceGetter(mockCtrl)
|
|
|
|
|
|
|
|
manager := NewStreamManager(
|
2021-04-02 11:41:45 -05:00
|
|
|
mockPacketSender,
|
2021-03-23 12:24:08 -05:00
|
|
|
mockPresenceGetter,
|
|
|
|
WithCheckConfig(10*time.Millisecond, 3),
|
|
|
|
)
|
|
|
|
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
defer cancel()
|
|
|
|
go func() {
|
|
|
|
_ = manager.Run(ctx)
|
|
|
|
}()
|
|
|
|
|
|
|
|
startedCh := make(chan struct{})
|
|
|
|
doneCh := make(chan struct{})
|
|
|
|
|
|
|
|
mockPresenceGetter.EXPECT().GetNumSubscribers("test").Return(0, nil).Times(3)
|
|
|
|
|
|
|
|
mockStreamRunner := NewMockStreamRunner(mockCtrl)
|
|
|
|
mockStreamRunner.EXPECT().RunStream(gomock.Any(), gomock.Any(), gomock.Any()).Do(func(ctx context.Context, req *backend.RunStreamRequest, sender backend.StreamPacketSender) error {
|
|
|
|
close(startedCh)
|
|
|
|
<-ctx.Done()
|
|
|
|
close(doneCh)
|
|
|
|
return ctx.Err()
|
|
|
|
}).Times(1)
|
|
|
|
|
2021-04-02 11:41:45 -05:00
|
|
|
_, err := manager.SubmitStream(context.Background(), "test", "test", backend.PluginContext{}, mockStreamRunner)
|
2021-03-23 12:24:08 -05:00
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
waitWithTimeout(t, startedCh, time.Second)
|
|
|
|
waitWithTimeout(t, doneCh, time.Second)
|
|
|
|
require.Len(t, manager.streams, 0)
|
|
|
|
}
|