mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Live: RunStream improvements – backoff, better layout, tests (#33029)
This commit is contained in:
parent
1ed724ea6e
commit
7d5a46ffda
@ -3,62 +3,36 @@ package features
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/grafana/grafana/pkg/services/live/runstream"
|
||||
|
||||
"github.com/centrifugal/centrifuge"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana/pkg/models"
|
||||
)
|
||||
|
||||
//go:generate mockgen -destination=mock.go -package=features github.com/grafana/grafana/pkg/services/live/features StreamPacketSender,PresenceGetter,PluginContextGetter,StreamRunner
|
||||
|
||||
type StreamPacketSender interface {
|
||||
Send(channel string, packet *backend.StreamPacket) error
|
||||
}
|
||||
|
||||
type PresenceGetter interface {
|
||||
GetNumSubscribers(channel string) (int, error)
|
||||
}
|
||||
//go:generate mockgen -destination=plugin_mock.go -package=features github.com/grafana/grafana/pkg/services/live/features PluginContextGetter
|
||||
|
||||
type PluginContextGetter interface {
|
||||
GetPluginContext(user *models.SignedInUser, pluginID string, datasourceUID string) (backend.PluginContext, bool, error)
|
||||
}
|
||||
|
||||
type StreamRunner interface {
|
||||
RunStream(ctx context.Context, request *backend.RunStreamRequest, sender backend.StreamPacketSender) error
|
||||
}
|
||||
|
||||
type streamSender struct {
|
||||
channel string
|
||||
packetSender StreamPacketSender
|
||||
}
|
||||
|
||||
func newStreamSender(channel string, packetSender StreamPacketSender) *streamSender {
|
||||
return &streamSender{
|
||||
channel: channel,
|
||||
packetSender: packetSender,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *streamSender) Send(packet *backend.StreamPacket) error {
|
||||
return p.packetSender.Send(p.channel, packet)
|
||||
}
|
||||
|
||||
// PluginRunner can handle streaming operations for channels belonging to plugins.
|
||||
type PluginRunner struct {
|
||||
pluginID string
|
||||
datasourceUID string
|
||||
pluginContextGetter PluginContextGetter
|
||||
handler backend.StreamHandler
|
||||
streamManager *StreamManager
|
||||
runStreamManager *runstream.Manager
|
||||
}
|
||||
|
||||
// NewPluginRunner creates new PluginRunner.
|
||||
func NewPluginRunner(pluginID string, datasourceUID string, streamManager *StreamManager, pluginContextGetter PluginContextGetter, handler backend.StreamHandler) *PluginRunner {
|
||||
func NewPluginRunner(pluginID string, datasourceUID string, runStreamManager *runstream.Manager, pluginContextGetter PluginContextGetter, handler backend.StreamHandler) *PluginRunner {
|
||||
return &PluginRunner{
|
||||
pluginID: pluginID,
|
||||
datasourceUID: datasourceUID,
|
||||
pluginContextGetter: pluginContextGetter,
|
||||
handler: handler,
|
||||
streamManager: streamManager,
|
||||
runStreamManager: runStreamManager,
|
||||
}
|
||||
}
|
||||
|
||||
@ -68,7 +42,7 @@ func (m *PluginRunner) GetHandlerForPath(path string) (models.ChannelHandler, er
|
||||
path: path,
|
||||
pluginID: m.pluginID,
|
||||
datasourceUID: m.datasourceUID,
|
||||
streamManager: m.streamManager,
|
||||
runStreamManager: m.runStreamManager,
|
||||
handler: m.handler,
|
||||
pluginContextGetter: m.pluginContextGetter,
|
||||
}, nil
|
||||
@ -79,7 +53,7 @@ type PluginPathRunner struct {
|
||||
path string
|
||||
pluginID string
|
||||
datasourceUID string
|
||||
streamManager *StreamManager
|
||||
runStreamManager *runstream.Manager
|
||||
handler backend.StreamHandler
|
||||
pluginContextGetter PluginContextGetter
|
||||
}
|
||||
@ -108,7 +82,7 @@ func (r *PluginPathRunner) OnSubscribe(ctx context.Context, user *models.SignedI
|
||||
}
|
||||
|
||||
if resp.UseRunStream {
|
||||
submitResult, err := r.streamManager.SubmitStream(ctx, e.Channel, r.path, pCtx, r.handler)
|
||||
submitResult, err := r.runStreamManager.SubmitStream(ctx, e.Channel, r.path, pCtx, r.handler)
|
||||
if err != nil {
|
||||
logger.Error("Error submitting stream to manager", "error", err, "path", r.path)
|
||||
return models.SubscribeReply{}, 0, centrifuge.ErrorInternal
|
||||
|
52
pkg/services/live/features/plugin_mock.go
Normal file
52
pkg/services/live/features/plugin_mock.go
Normal file
@ -0,0 +1,52 @@
|
||||
// Code generated by MockGen. DO NOT EDIT.
|
||||
// Source: github.com/grafana/grafana/pkg/services/live/features (interfaces: PluginContextGetter)
|
||||
|
||||
// Package features is a generated GoMock package.
|
||||
package features
|
||||
|
||||
import (
|
||||
reflect "reflect"
|
||||
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
backend "github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
models "github.com/grafana/grafana/pkg/models"
|
||||
)
|
||||
|
||||
// MockPluginContextGetter is a mock of PluginContextGetter interface.
|
||||
type MockPluginContextGetter struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *MockPluginContextGetterMockRecorder
|
||||
}
|
||||
|
||||
// MockPluginContextGetterMockRecorder is the mock recorder for MockPluginContextGetter.
|
||||
type MockPluginContextGetterMockRecorder struct {
|
||||
mock *MockPluginContextGetter
|
||||
}
|
||||
|
||||
// NewMockPluginContextGetter creates a new mock instance.
|
||||
func NewMockPluginContextGetter(ctrl *gomock.Controller) *MockPluginContextGetter {
|
||||
mock := &MockPluginContextGetter{ctrl: ctrl}
|
||||
mock.recorder = &MockPluginContextGetterMockRecorder{mock}
|
||||
return mock
|
||||
}
|
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use.
|
||||
func (m *MockPluginContextGetter) EXPECT() *MockPluginContextGetterMockRecorder {
|
||||
return m.recorder
|
||||
}
|
||||
|
||||
// GetPluginContext mocks base method.
|
||||
func (m *MockPluginContextGetter) GetPluginContext(arg0 *models.SignedInUser, arg1, arg2 string) (backend.PluginContext, bool, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "GetPluginContext", arg0, arg1, arg2)
|
||||
ret0, _ := ret[0].(backend.PluginContext)
|
||||
ret1, _ := ret[1].(bool)
|
||||
ret2, _ := ret[2].(error)
|
||||
return ret0, ret1, ret2
|
||||
}
|
||||
|
||||
// GetPluginContext indicates an expected call of GetPluginContext.
|
||||
func (mr *MockPluginContextGetterMockRecorder) GetPluginContext(arg0, arg1, arg2 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPluginContext", reflect.TypeOf((*MockPluginContextGetter)(nil).GetPluginContext), arg0, arg1, arg2)
|
||||
}
|
@ -1,207 +0,0 @@
|
||||
package features
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
)
|
||||
|
||||
// StreamManager manages streams from Grafana to plugins.
|
||||
type StreamManager struct {
|
||||
mu sync.RWMutex
|
||||
streams map[string]struct{}
|
||||
presenceGetter PresenceGetter
|
||||
packetSender StreamPacketSender
|
||||
registerCh chan submitRequest
|
||||
closedCh chan struct{}
|
||||
checkInterval time.Duration
|
||||
maxChecks int
|
||||
}
|
||||
|
||||
// StreamManagerOption modifies StreamManager behavior (used for tests for example).
|
||||
type StreamManagerOption func(*StreamManager)
|
||||
|
||||
// WithCheckConfig allows setting custom check rules.
|
||||
func WithCheckConfig(interval time.Duration, maxChecks int) StreamManagerOption {
|
||||
return func(sm *StreamManager) {
|
||||
sm.checkInterval = interval
|
||||
sm.maxChecks = maxChecks
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
defaultCheckInterval = 5 * time.Second
|
||||
defaultMaxChecks = 3
|
||||
)
|
||||
|
||||
// NewStreamManager creates new StreamManager.
|
||||
func NewStreamManager(packetSender StreamPacketSender, presenceGetter PresenceGetter, opts ...StreamManagerOption) *StreamManager {
|
||||
sm := &StreamManager{
|
||||
streams: make(map[string]struct{}),
|
||||
packetSender: packetSender,
|
||||
presenceGetter: presenceGetter,
|
||||
registerCh: make(chan submitRequest),
|
||||
closedCh: make(chan struct{}),
|
||||
checkInterval: defaultCheckInterval,
|
||||
maxChecks: defaultMaxChecks,
|
||||
}
|
||||
for _, opt := range opts {
|
||||
opt(sm)
|
||||
}
|
||||
return sm
|
||||
}
|
||||
|
||||
func (s *StreamManager) stopStream(sr streamRequest, cancelFn func()) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
delete(s.streams, sr.Channel)
|
||||
cancelFn()
|
||||
}
|
||||
|
||||
func (s *StreamManager) watchStream(ctx context.Context, cancelFn func(), sr streamRequest) {
|
||||
numNoSubscribersChecks := 0
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(s.checkInterval):
|
||||
numSubscribers, err := s.presenceGetter.GetNumSubscribers(sr.Channel)
|
||||
if err != nil {
|
||||
logger.Error("Error checking num subscribers", "channel", sr.Channel, "path", sr.Path)
|
||||
continue
|
||||
}
|
||||
if numSubscribers > 0 {
|
||||
// reset counter since channel has active subscribers.
|
||||
numNoSubscribersChecks = 0
|
||||
continue
|
||||
}
|
||||
numNoSubscribersChecks++
|
||||
if numNoSubscribersChecks >= s.maxChecks {
|
||||
logger.Debug("Stop stream since no active subscribers", "channel", sr.Channel, "path", sr.Path)
|
||||
s.stopStream(sr, cancelFn)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// run stream until context canceled.
|
||||
func (s *StreamManager) runStream(ctx context.Context, sr streamRequest) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
err := sr.StreamRunner.RunStream(
|
||||
ctx,
|
||||
&backend.RunStreamRequest{
|
||||
PluginContext: sr.PluginContext,
|
||||
Path: sr.Path,
|
||||
},
|
||||
newStreamSender(sr.Channel, s.packetSender),
|
||||
)
|
||||
if err != nil {
|
||||
if errors.Is(ctx.Err(), context.Canceled) {
|
||||
logger.Debug("Stream cleanly finished", "path", sr.Path)
|
||||
return
|
||||
}
|
||||
logger.Error("Error running stream, retrying", "path", sr.Path, "error", err)
|
||||
continue
|
||||
}
|
||||
logger.Warn("Stream finished without error?", "path", sr.Path)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
var errClosed = errors.New("stream manager closed")
|
||||
|
||||
func (s *StreamManager) registerStream(ctx context.Context, sr submitRequest) {
|
||||
s.mu.Lock()
|
||||
if _, ok := s.streams[sr.streamRequest.Channel]; ok {
|
||||
s.mu.Unlock()
|
||||
sr.responseCh <- submitResponse{Result: submitResult{StreamExists: true}}
|
||||
return
|
||||
}
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
s.streams[sr.streamRequest.Channel] = struct{}{}
|
||||
s.mu.Unlock()
|
||||
sr.responseCh <- submitResponse{Result: submitResult{StreamExists: false}}
|
||||
go s.watchStream(ctx, cancel, sr.streamRequest)
|
||||
s.runStream(ctx, sr.streamRequest)
|
||||
}
|
||||
|
||||
// Run StreamManager till context canceled.
|
||||
func (s *StreamManager) Run(ctx context.Context) error {
|
||||
for {
|
||||
select {
|
||||
case sr := <-s.registerCh:
|
||||
go s.registerStream(ctx, sr)
|
||||
case <-ctx.Done():
|
||||
close(s.closedCh)
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type streamRequest struct {
|
||||
Channel string
|
||||
Path string
|
||||
PluginContext backend.PluginContext
|
||||
StreamRunner StreamRunner
|
||||
}
|
||||
|
||||
type submitRequest struct {
|
||||
responseCh chan submitResponse
|
||||
streamRequest streamRequest
|
||||
}
|
||||
|
||||
type submitResult struct {
|
||||
StreamExists bool
|
||||
}
|
||||
|
||||
type submitResponse struct {
|
||||
Error error
|
||||
Result submitResult
|
||||
}
|
||||
|
||||
// SubmitStream submits stream handler in StreamManager to manage.
|
||||
// The stream will be opened and kept till channel has active subscribers.
|
||||
func (s *StreamManager) SubmitStream(ctx context.Context, channel string, path string, pCtx backend.PluginContext, streamRunner StreamRunner) (*submitResult, error) {
|
||||
req := submitRequest{
|
||||
responseCh: make(chan submitResponse, 1),
|
||||
streamRequest: streamRequest{
|
||||
Channel: channel,
|
||||
Path: path,
|
||||
PluginContext: pCtx,
|
||||
StreamRunner: streamRunner,
|
||||
},
|
||||
}
|
||||
|
||||
// Send submit request.
|
||||
select {
|
||||
case s.registerCh <- req:
|
||||
case <-s.closedCh:
|
||||
close(s.registerCh)
|
||||
return nil, errClosed
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
|
||||
// Wait for submit response.
|
||||
select {
|
||||
case resp := <-req.responseCh:
|
||||
if resp.Error != nil {
|
||||
return nil, resp.Error
|
||||
}
|
||||
return &resp.Result, nil
|
||||
case <-s.closedCh:
|
||||
return nil, errClosed
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
@ -8,6 +8,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/services/live/runstream"
|
||||
|
||||
"github.com/centrifugal/centrifuge"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
@ -74,8 +76,8 @@ type GrafanaLive struct {
|
||||
|
||||
ManagedStreamRunner *ManagedStreamRunner
|
||||
|
||||
contextGetter *pluginContextGetter
|
||||
streamManager *features.StreamManager
|
||||
contextGetter *pluginContextGetter
|
||||
runStreamManager *runstream.Manager
|
||||
}
|
||||
|
||||
func (g *GrafanaLive) getStreamPlugin(pluginID string) (backend.StreamHandler, error) {
|
||||
@ -91,9 +93,9 @@ func (g *GrafanaLive) getStreamPlugin(pluginID string) (backend.StreamHandler, e
|
||||
}
|
||||
|
||||
func (g *GrafanaLive) Run(ctx context.Context) error {
|
||||
if g.streamManager != nil {
|
||||
if g.runStreamManager != nil {
|
||||
// Only run stream manager if GrafanaLive properly initialized.
|
||||
return g.streamManager.Run(ctx)
|
||||
return g.runStreamManager.Run(ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -128,7 +130,7 @@ func (g *GrafanaLive) Init() error {
|
||||
g.contextGetter = newPluginContextGetter(g.PluginContextProvider)
|
||||
packetSender := newPluginPacketSender(node)
|
||||
presenceGetter := newPluginPresenceGetter(node)
|
||||
g.streamManager = features.NewStreamManager(packetSender, presenceGetter)
|
||||
g.runStreamManager = runstream.NewManager(packetSender, presenceGetter)
|
||||
|
||||
// Initialize the main features
|
||||
dash := &features.DashboardHandler{
|
||||
@ -388,7 +390,7 @@ func (g *GrafanaLive) handlePluginScope(_ *models.SignedInUser, namespace string
|
||||
return features.NewPluginRunner(
|
||||
namespace,
|
||||
"", // No instance uid for non-datasource plugins.
|
||||
g.streamManager,
|
||||
g.runStreamManager,
|
||||
g.contextGetter,
|
||||
streamHandler,
|
||||
), nil
|
||||
@ -421,7 +423,7 @@ func (g *GrafanaLive) handleDatasourceScope(user *models.SignedInUser, namespace
|
||||
return features.NewPluginRunner(
|
||||
ds.Type,
|
||||
ds.Uid,
|
||||
g.streamManager,
|
||||
g.runStreamManager,
|
||||
g.contextGetter,
|
||||
streamHandler,
|
||||
), nil
|
||||
|
289
pkg/services/live/runstream/manager.go
Normal file
289
pkg/services/live/runstream/manager.go
Normal file
@ -0,0 +1,289 @@
|
||||
package runstream
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
)
|
||||
|
||||
var (
|
||||
logger = log.New("live.runstream")
|
||||
)
|
||||
|
||||
//go:generate mockgen -destination=mock.go -package=runstream github.com/grafana/grafana/pkg/services/live/runstream StreamPacketSender,PresenceGetter,StreamRunner
|
||||
|
||||
type StreamPacketSender interface {
|
||||
Send(channel string, packet *backend.StreamPacket) error
|
||||
}
|
||||
|
||||
type PresenceGetter interface {
|
||||
GetNumSubscribers(channel string) (int, error)
|
||||
}
|
||||
|
||||
type StreamRunner interface {
|
||||
RunStream(ctx context.Context, request *backend.RunStreamRequest, sender backend.StreamPacketSender) error
|
||||
}
|
||||
|
||||
type streamSender struct {
|
||||
channel string
|
||||
packetSender StreamPacketSender
|
||||
}
|
||||
|
||||
func newStreamSender(channel string, packetSender StreamPacketSender) *streamSender {
|
||||
return &streamSender{
|
||||
channel: channel,
|
||||
packetSender: packetSender,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *streamSender) Send(packet *backend.StreamPacket) error {
|
||||
return p.packetSender.Send(p.channel, packet)
|
||||
}
|
||||
|
||||
// Manager manages streams from Grafana to plugins (i.e. RunStream method).
|
||||
type Manager struct {
|
||||
mu sync.RWMutex
|
||||
streams map[string]chan struct{}
|
||||
presenceGetter PresenceGetter
|
||||
packetSender StreamPacketSender
|
||||
registerCh chan submitRequest
|
||||
closedCh chan struct{}
|
||||
checkInterval time.Duration
|
||||
maxChecks int
|
||||
}
|
||||
|
||||
// ManagerOption modifies Manager behavior (used for tests for example).
|
||||
type ManagerOption func(*Manager)
|
||||
|
||||
// WithCheckConfig allows setting custom check rules.
|
||||
func WithCheckConfig(interval time.Duration, maxChecks int) ManagerOption {
|
||||
return func(sm *Manager) {
|
||||
sm.checkInterval = interval
|
||||
sm.maxChecks = maxChecks
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
defaultCheckInterval = 5 * time.Second
|
||||
defaultMaxChecks = 3
|
||||
)
|
||||
|
||||
// NewManager creates new Manager.
|
||||
func NewManager(packetSender StreamPacketSender, presenceGetter PresenceGetter, opts ...ManagerOption) *Manager {
|
||||
sm := &Manager{
|
||||
streams: make(map[string]chan struct{}),
|
||||
packetSender: packetSender,
|
||||
presenceGetter: presenceGetter,
|
||||
registerCh: make(chan submitRequest),
|
||||
closedCh: make(chan struct{}),
|
||||
checkInterval: defaultCheckInterval,
|
||||
maxChecks: defaultMaxChecks,
|
||||
}
|
||||
for _, opt := range opts {
|
||||
opt(sm)
|
||||
}
|
||||
return sm
|
||||
}
|
||||
|
||||
func (s *Manager) stopStream(sr streamRequest, cancelFn func()) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
closeCh, ok := s.streams[sr.Channel]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
delete(s.streams, sr.Channel)
|
||||
cancelFn()
|
||||
close(closeCh)
|
||||
}
|
||||
|
||||
func (s *Manager) watchStream(ctx context.Context, cancelFn func(), sr streamRequest) {
|
||||
numNoSubscribersChecks := 0
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(s.checkInterval):
|
||||
numSubscribers, err := s.presenceGetter.GetNumSubscribers(sr.Channel)
|
||||
if err != nil {
|
||||
logger.Error("Error checking num subscribers", "channel", sr.Channel, "path", sr.Path)
|
||||
continue
|
||||
}
|
||||
if numSubscribers > 0 {
|
||||
// reset counter since channel has active subscribers.
|
||||
numNoSubscribersChecks = 0
|
||||
continue
|
||||
}
|
||||
numNoSubscribersChecks++
|
||||
if numNoSubscribersChecks >= s.maxChecks {
|
||||
logger.Debug("Stop stream since no active subscribers", "channel", sr.Channel, "path", sr.Path)
|
||||
s.stopStream(sr, cancelFn)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const streamDurationThreshold = 100 * time.Millisecond
|
||||
const coolDownDelay = 100 * time.Millisecond
|
||||
const maxDelay = 5 * time.Second
|
||||
|
||||
func getDelay(numErrors int) time.Duration {
|
||||
if numErrors == 0 {
|
||||
return 0
|
||||
}
|
||||
delay := coolDownDelay * time.Duration(math.Pow(2, float64(numErrors)))
|
||||
if delay > maxDelay {
|
||||
return maxDelay
|
||||
}
|
||||
return delay
|
||||
}
|
||||
|
||||
// run stream until context canceled or stream finished without an error.
|
||||
func (s *Manager) runStream(ctx context.Context, cancelFn func(), sr streamRequest) {
|
||||
var numFastErrors int
|
||||
var delay time.Duration
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
startTime := time.Now()
|
||||
err := sr.StreamRunner.RunStream(
|
||||
ctx,
|
||||
&backend.RunStreamRequest{
|
||||
PluginContext: sr.PluginContext,
|
||||
Path: sr.Path,
|
||||
},
|
||||
newStreamSender(sr.Channel, s.packetSender),
|
||||
)
|
||||
if err != nil {
|
||||
if errors.Is(ctx.Err(), context.Canceled) {
|
||||
logger.Debug("Stream cleanly finished", "path", sr.Path)
|
||||
return
|
||||
}
|
||||
// Best effort to cool down re-establishment process. We don't have a
|
||||
// nice way to understand whether we really need to wait here - so relying
|
||||
// on RunStream duration time.
|
||||
if time.Since(startTime) < streamDurationThreshold {
|
||||
if delay < maxDelay {
|
||||
// Due to not calling getDelay after we have delay larger than maxDelay
|
||||
// we avoid possible float overflow errors while calculating delay duration
|
||||
// based on numFastErrors.
|
||||
delay = getDelay(numFastErrors)
|
||||
}
|
||||
numFastErrors++
|
||||
} else {
|
||||
// Assuming that stream successfully started.
|
||||
delay = 0
|
||||
numFastErrors = 0
|
||||
}
|
||||
logger.Error("Error running stream, re-establishing", "path", sr.Path, "error", err, "wait", delay)
|
||||
time.Sleep(delay)
|
||||
continue
|
||||
}
|
||||
logger.Debug("Stream finished without error, stopping it", "path", sr.Path)
|
||||
s.stopStream(sr, cancelFn)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
var errClosed = errors.New("stream manager closed")
|
||||
|
||||
func (s *Manager) registerStream(ctx context.Context, sr submitRequest) {
|
||||
s.mu.Lock()
|
||||
if closeCh, ok := s.streams[sr.streamRequest.Channel]; ok {
|
||||
s.mu.Unlock()
|
||||
sr.responseCh <- submitResponse{Result: submitResult{StreamExists: true, CloseNotify: closeCh}}
|
||||
return
|
||||
}
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
closeCh := make(chan struct{})
|
||||
s.streams[sr.streamRequest.Channel] = closeCh
|
||||
s.mu.Unlock()
|
||||
sr.responseCh <- submitResponse{Result: submitResult{StreamExists: false, CloseNotify: closeCh}}
|
||||
go s.watchStream(ctx, cancel, sr.streamRequest)
|
||||
s.runStream(ctx, cancel, sr.streamRequest)
|
||||
}
|
||||
|
||||
// Run Manager till context canceled.
|
||||
func (s *Manager) Run(ctx context.Context) error {
|
||||
for {
|
||||
select {
|
||||
case sr := <-s.registerCh:
|
||||
go s.registerStream(ctx, sr)
|
||||
case <-ctx.Done():
|
||||
close(s.closedCh)
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type streamRequest struct {
|
||||
Channel string
|
||||
Path string
|
||||
PluginContext backend.PluginContext
|
||||
StreamRunner StreamRunner
|
||||
}
|
||||
|
||||
type submitRequest struct {
|
||||
responseCh chan submitResponse
|
||||
streamRequest streamRequest
|
||||
}
|
||||
|
||||
type submitResult struct {
|
||||
// StreamExists tells whether stream have been already opened.
|
||||
StreamExists bool
|
||||
// CloseNotify will be closed as soon as stream cleanly exited.
|
||||
CloseNotify chan struct{}
|
||||
}
|
||||
|
||||
type submitResponse struct {
|
||||
Error error
|
||||
Result submitResult
|
||||
}
|
||||
|
||||
// SubmitStream submits stream handler in Manager to manage.
|
||||
// The stream will be opened and kept till channel has active subscribers.
|
||||
func (s *Manager) SubmitStream(ctx context.Context, channel string, path string, pCtx backend.PluginContext, streamRunner StreamRunner) (*submitResult, error) {
|
||||
req := submitRequest{
|
||||
responseCh: make(chan submitResponse, 1),
|
||||
streamRequest: streamRequest{
|
||||
Channel: channel,
|
||||
Path: path,
|
||||
PluginContext: pCtx,
|
||||
StreamRunner: streamRunner,
|
||||
},
|
||||
}
|
||||
|
||||
// Send submit request.
|
||||
select {
|
||||
case s.registerCh <- req:
|
||||
case <-s.closedCh:
|
||||
close(s.registerCh)
|
||||
return nil, errClosed
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
|
||||
// Wait for submit response.
|
||||
select {
|
||||
case resp := <-req.responseCh:
|
||||
if resp.Error != nil {
|
||||
return nil, resp.Error
|
||||
}
|
||||
return &resp.Result, nil
|
||||
case <-s.closedCh:
|
||||
return nil, errClosed
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
@ -1,7 +1,8 @@
|
||||
package features
|
||||
package runstream
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -27,7 +28,7 @@ func TestStreamManager_Run(t *testing.T) {
|
||||
mockPacketSender := NewMockStreamPacketSender(mockCtrl)
|
||||
mockPresenceGetter := NewMockPresenceGetter(mockCtrl)
|
||||
|
||||
manager := NewStreamManager(mockPacketSender, mockPresenceGetter)
|
||||
manager := NewManager(mockPacketSender, mockPresenceGetter)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
@ -47,7 +48,7 @@ func TestStreamManager_SubmitStream_Send(t *testing.T) {
|
||||
mockPacketSender := NewMockStreamPacketSender(mockCtrl)
|
||||
mockPresenceGetter := NewMockPresenceGetter(mockCtrl)
|
||||
|
||||
manager := NewStreamManager(mockPacketSender, mockPresenceGetter)
|
||||
manager := NewManager(mockPacketSender, mockPresenceGetter)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
@ -63,7 +64,7 @@ func TestStreamManager_SubmitStream_Send(t *testing.T) {
|
||||
mockStreamRunner := NewMockStreamRunner(mockCtrl)
|
||||
mockStreamRunner.EXPECT().RunStream(
|
||||
gomock.Any(), gomock.Any(), gomock.Any(),
|
||||
).Do(func(ctx context.Context, req *backend.RunStreamRequest, sender backend.StreamPacketSender) error {
|
||||
).DoAndReturn(func(ctx context.Context, req *backend.RunStreamRequest, sender backend.StreamPacketSender) error {
|
||||
require.Equal(t, "test", req.Path)
|
||||
close(startedCh)
|
||||
err := sender.Send(&backend.StreamPacket{
|
||||
@ -97,7 +98,8 @@ func TestStreamManager_SubmitStream_CloseNoSubscribers(t *testing.T) {
|
||||
mockPacketSender := NewMockStreamPacketSender(mockCtrl)
|
||||
mockPresenceGetter := NewMockPresenceGetter(mockCtrl)
|
||||
|
||||
manager := NewStreamManager(
|
||||
// Create manager with very fast num subscribers checks.
|
||||
manager := NewManager(
|
||||
mockPacketSender,
|
||||
mockPresenceGetter,
|
||||
WithCheckConfig(10*time.Millisecond, 3),
|
||||
@ -115,7 +117,7 @@ func TestStreamManager_SubmitStream_CloseNoSubscribers(t *testing.T) {
|
||||
mockPresenceGetter.EXPECT().GetNumSubscribers("test").Return(0, nil).Times(3)
|
||||
|
||||
mockStreamRunner := NewMockStreamRunner(mockCtrl)
|
||||
mockStreamRunner.EXPECT().RunStream(gomock.Any(), gomock.Any(), gomock.Any()).Do(func(ctx context.Context, req *backend.RunStreamRequest, sender backend.StreamPacketSender) error {
|
||||
mockStreamRunner.EXPECT().RunStream(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req *backend.RunStreamRequest, sender backend.StreamPacketSender) error {
|
||||
close(startedCh)
|
||||
<-ctx.Done()
|
||||
close(doneCh)
|
||||
@ -129,3 +131,67 @@ func TestStreamManager_SubmitStream_CloseNoSubscribers(t *testing.T) {
|
||||
waitWithTimeout(t, doneCh, time.Second)
|
||||
require.Len(t, manager.streams, 0)
|
||||
}
|
||||
|
||||
func TestStreamManager_SubmitStream_ErrorRestartsRunStream(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
mockPacketSender := NewMockStreamPacketSender(mockCtrl)
|
||||
mockPresenceGetter := NewMockPresenceGetter(mockCtrl)
|
||||
|
||||
manager := NewManager(mockPacketSender, mockPresenceGetter)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go func() {
|
||||
_ = manager.Run(ctx)
|
||||
}()
|
||||
|
||||
numErrors := 3
|
||||
currentErrors := 0
|
||||
|
||||
mockStreamRunner := NewMockStreamRunner(mockCtrl)
|
||||
mockStreamRunner.EXPECT().RunStream(
|
||||
gomock.Any(), gomock.Any(), gomock.Any(),
|
||||
).DoAndReturn(func(ctx context.Context, req *backend.RunStreamRequest, sender backend.StreamPacketSender) error {
|
||||
if currentErrors >= numErrors {
|
||||
return nil
|
||||
}
|
||||
currentErrors++
|
||||
return errors.New("boom")
|
||||
}).Times(numErrors + 1)
|
||||
|
||||
result, err := manager.SubmitStream(context.Background(), "test", "test", backend.PluginContext{}, mockStreamRunner)
|
||||
require.NoError(t, err)
|
||||
require.False(t, result.StreamExists)
|
||||
|
||||
waitWithTimeout(t, result.CloseNotify, time.Second)
|
||||
}
|
||||
|
||||
func TestStreamManager_SubmitStream_NilErrorStopsRunStream(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
mockPacketSender := NewMockStreamPacketSender(mockCtrl)
|
||||
mockPresenceGetter := NewMockPresenceGetter(mockCtrl)
|
||||
|
||||
manager := NewManager(mockPacketSender, mockPresenceGetter)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go func() {
|
||||
_ = manager.Run(ctx)
|
||||
}()
|
||||
|
||||
mockStreamRunner := NewMockStreamRunner(mockCtrl)
|
||||
mockStreamRunner.EXPECT().RunStream(
|
||||
gomock.Any(), gomock.Any(), gomock.Any(),
|
||||
).DoAndReturn(func(ctx context.Context, req *backend.RunStreamRequest, sender backend.StreamPacketSender) error {
|
||||
return nil
|
||||
}).Times(1)
|
||||
|
||||
result, err := manager.SubmitStream(context.Background(), "test", "test", backend.PluginContext{}, mockStreamRunner)
|
||||
require.NoError(t, err)
|
||||
require.False(t, result.StreamExists)
|
||||
waitWithTimeout(t, result.CloseNotify, time.Second)
|
||||
}
|
@ -1,8 +1,8 @@
|
||||
// Code generated by MockGen. DO NOT EDIT.
|
||||
// Source: github.com/grafana/grafana/pkg/services/live/features (interfaces: StreamPacketSender,PresenceGetter,PluginContextGetter,StreamRunner)
|
||||
// Source: github.com/grafana/grafana/pkg/services/live/runstream (interfaces: StreamPacketSender,PresenceGetter,StreamRunner)
|
||||
|
||||
// Package features is a generated GoMock package.
|
||||
package features
|
||||
// Package runstream is a generated GoMock package.
|
||||
package runstream
|
||||
|
||||
import (
|
||||
context "context"
|
||||
@ -10,7 +10,6 @@ import (
|
||||
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
backend "github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
models "github.com/grafana/grafana/pkg/models"
|
||||
)
|
||||
|
||||
// MockStreamPacketSender is a mock of StreamPacketSender interface.
|
||||
@ -88,45 +87,6 @@ func (mr *MockPresenceGetterMockRecorder) GetNumSubscribers(arg0 interface{}) *g
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetNumSubscribers", reflect.TypeOf((*MockPresenceGetter)(nil).GetNumSubscribers), arg0)
|
||||
}
|
||||
|
||||
// MockPluginContextGetter is a mock of PluginContextGetter interface.
|
||||
type MockPluginContextGetter struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *MockPluginContextGetterMockRecorder
|
||||
}
|
||||
|
||||
// MockPluginContextGetterMockRecorder is the mock recorder for MockPluginContextGetter.
|
||||
type MockPluginContextGetterMockRecorder struct {
|
||||
mock *MockPluginContextGetter
|
||||
}
|
||||
|
||||
// NewMockPluginContextGetter creates a new mock instance.
|
||||
func NewMockPluginContextGetter(ctrl *gomock.Controller) *MockPluginContextGetter {
|
||||
mock := &MockPluginContextGetter{ctrl: ctrl}
|
||||
mock.recorder = &MockPluginContextGetterMockRecorder{mock}
|
||||
return mock
|
||||
}
|
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use.
|
||||
func (m *MockPluginContextGetter) EXPECT() *MockPluginContextGetterMockRecorder {
|
||||
return m.recorder
|
||||
}
|
||||
|
||||
// GetPluginContext mocks base method.
|
||||
func (m *MockPluginContextGetter) GetPluginContext(arg0 *models.SignedInUser, arg1, arg2 string) (backend.PluginContext, bool, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "GetPluginContext", arg0, arg1, arg2)
|
||||
ret0, _ := ret[0].(backend.PluginContext)
|
||||
ret1, _ := ret[1].(bool)
|
||||
ret2, _ := ret[2].(error)
|
||||
return ret0, ret1, ret2
|
||||
}
|
||||
|
||||
// GetPluginContext indicates an expected call of GetPluginContext.
|
||||
func (mr *MockPluginContextGetterMockRecorder) GetPluginContext(arg0, arg1, arg2 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPluginContext", reflect.TypeOf((*MockPluginContextGetter)(nil).GetPluginContext), arg0, arg1, arg2)
|
||||
}
|
||||
|
||||
// MockStreamRunner is a mock of StreamRunner interface.
|
||||
type MockStreamRunner struct {
|
||||
ctrl *gomock.Controller
|
Loading…
Reference in New Issue
Block a user