Live: experimental HA with Redis (#34851)

This commit is contained in:
Alexander Emelin 2021-06-24 11:07:09 +03:00 committed by GitHub
parent 437faa72a9
commit 98893c0420
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 785 additions and 308 deletions

View File

@ -901,6 +901,17 @@ plugin_catalog_url = https://grafana.com/grafana/plugins/
# tuning. 0 disables Live, -1 means unlimited connections.
max_connections = 100
# engine defines an HA (high availability) engine to use for Grafana Live. By default no engine used - in
# this case Live features work only on a single Grafana server.
# Available options: "redis".
# Setting ha_engine is an EXPERIMENTAL feature.
ha_engine =
# ha_engine_address sets a connection address for Live HA engine. Depending on engine type address format can differ.
# For now we only support Redis connection address in "host:port" format.
# This option is EXPERIMENTAL.
ha_engine_address = "127.0.0.1:6379"
#################################### Grafana Image Renderer Plugin ##########################
[plugin.grafana-image-renderer]
# Instruct headless browser instance to use a default timezone when not provided by Grafana, e.g. when rendering panel image of alert.

View File

@ -887,6 +887,16 @@
# tuning. 0 disables Live, -1 means unlimited connections.
;max_connections = 100
# engine defines an HA (high availability) engine to use for Grafana Live. By default no engine used - in
# this case Live features work only on a single Grafana server. Available options: "redis".
# Setting ha_engine is an EXPERIMENTAL feature.
;ha_engine =
# ha_engine_address sets a connection address for Live HA engine. Depending on engine type address format can differ.
# For now we only support Redis connection address in "host:port" format.
# This option is EXPERIMENTAL.
;ha_engine_address = "127.0.0.1:6379"
#################################### Grafana Image Renderer Plugin ##########################
[plugin.grafana-image-renderer]
# Instruct headless browser instance to use a default timezone when not provided by Grafana, e.g. when rendering panel image of alert.

2
go.mod
View File

@ -23,7 +23,7 @@ require (
github.com/beevik/etree v1.1.0
github.com/benbjohnson/clock v1.1.0
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/centrifugal/centrifuge v0.17.0
github.com/centrifugal/centrifuge v0.17.1
github.com/cortexproject/cortex v1.8.2-0.20210428155238-d382e1d80eaf
github.com/crewjam/saml v0.4.6-0.20201227203850-bca570abb2ce
github.com/davecgh/go-spew v1.1.1

4
go.sum
View File

@ -292,8 +292,8 @@ github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm
github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/centrifugal/centrifuge v0.17.0 h1:ANZMhcR8pFbRUPdv45nrIhhZcsSOdtshT3YM4v1/NHY=
github.com/centrifugal/centrifuge v0.17.0/go.mod h1:AEFs3KPGRpvX1jCe24NDlGWQu7DPa7vdzeY/aUluOm0=
github.com/centrifugal/centrifuge v0.17.1 h1:UWORzEE5SEhJSy8omW50AVKbpKaqDBpUFp3kDnMgsXs=
github.com/centrifugal/centrifuge v0.17.1/go.mod h1:AEFs3KPGRpvX1jCe24NDlGWQu7DPa7vdzeY/aUluOm0=
github.com/centrifugal/protocol v0.5.0 h1:h71u2Q53yhplftmUk1tjc+Mu6TKJ/eO3YRD3h7Qjvj4=
github.com/centrifugal/protocol v0.5.0/go.mod h1:ru2N4pwiND/jE+XLtiLYbUo3YmgqgniGNW9f9aRgoVI=
github.com/certifi/gocertifi v0.0.0-20191021191039-0944d244cd40/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=

View File

@ -2,12 +2,10 @@ package live
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"
@ -26,10 +24,12 @@ import (
"github.com/grafana/grafana/pkg/services/live/database"
"github.com/grafana/grafana/pkg/services/live/features"
"github.com/grafana/grafana/pkg/services/live/livecontext"
"github.com/grafana/grafana/pkg/services/live/liveplugin"
"github.com/grafana/grafana/pkg/services/live/managedstream"
"github.com/grafana/grafana/pkg/services/live/orgchannel"
"github.com/grafana/grafana/pkg/services/live/pushws"
"github.com/grafana/grafana/pkg/services/live/runstream"
"github.com/grafana/grafana/pkg/services/live/survey"
"github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
"github.com/grafana/grafana/pkg/setting"
@ -38,8 +38,8 @@ import (
"github.com/centrifugal/centrifuge"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/live"
"gopkg.in/redis.v5"
)
var (
@ -84,7 +84,8 @@ type GrafanaLive struct {
DatasourceCache datasources.CacheService `inject:""`
SQLStore *sqlstore.SQLStore `inject:""`
node *centrifuge.Node
node *centrifuge.Node
surveyCaller *survey.Caller
// Websocket handlers
websocketHandler interface{}
@ -99,7 +100,7 @@ type GrafanaLive struct {
ManagedStreamRunner *managedstream.Runner
contextGetter *pluginContextGetter
contextGetter *liveplugin.ContextGetter
runStreamManager *runstream.Manager
storage *database.Storage
}
@ -128,17 +129,22 @@ func (g *GrafanaLive) AddMigration(mg *migrator.Migrator) {
func (g *GrafanaLive) Run(ctx context.Context) error {
if g.runStreamManager != nil {
// Only run stream manager if GrafanaLive properly initialized.
return g.runStreamManager.Run(ctx)
_ = g.runStreamManager.Run(ctx)
return g.node.Shutdown(context.Background())
}
return nil
}
var clientConcurrency = 8
func (g *GrafanaLive) IsHA() bool {
return g.Cfg != nil && g.Cfg.LiveHAEngine != ""
}
// Init initializes Live service.
// Required to implement the registry.Service interface.
func (g *GrafanaLive) Init() error {
logger.Debug("GrafanaLive initialization")
logger.Debug("GrafanaLive initialization", "ha", g.IsHA())
// We use default config here as starting point. Default config contains
// reasonable values for available options.
@ -158,10 +164,57 @@ func (g *GrafanaLive) Init() error {
}
g.node = node
g.contextGetter = newPluginContextGetter(g.PluginContextProvider)
channelSender := newPluginChannelSender(node)
presenceGetter := newPluginPresenceGetter(node)
g.runStreamManager = runstream.NewManager(channelSender, presenceGetter, g.contextGetter)
if g.IsHA() {
// Configure HA with Redis. In this case Centrifuge nodes
// will be connected over Redis PUB/SUB. Presence will work
// globally since kept inside Redis.
redisAddress := g.Cfg.LiveHAEngineAddress
redisShardConfigs := []centrifuge.RedisShardConfig{
{Address: redisAddress},
}
var redisShards []*centrifuge.RedisShard
for _, redisConf := range redisShardConfigs {
redisShard, err := centrifuge.NewRedisShard(node, redisConf)
if err != nil {
return fmt.Errorf("error connecting to Live Redis: %v", err)
}
redisShards = append(redisShards, redisShard)
}
broker, err := centrifuge.NewRedisBroker(node, centrifuge.RedisBrokerConfig{
Prefix: "gf_live",
// We are using Redis streams here for history. Require Redis >= 5.
UseStreams: true,
// Use reasonably large expiration interval for stream meta key,
// much bigger than maximum HistoryLifetime value in Node config.
// This way stream meta data will expire, in some cases you may want
// to prevent its expiration setting this to zero value.
HistoryMetaTTL: 7 * 24 * time.Hour,
// And configure a couple of shards to use.
Shards: redisShards,
})
if err != nil {
return fmt.Errorf("error creating Live Redis broker: %v", err)
}
node.SetBroker(broker)
presenceManager, err := centrifuge.NewRedisPresenceManager(node, centrifuge.RedisPresenceManagerConfig{
Prefix: "gf_live",
Shards: redisShards,
})
if err != nil {
return fmt.Errorf("error creating Live Redis presence manager: %v", err)
}
node.SetPresenceManager(presenceManager)
}
g.contextGetter = liveplugin.NewContextGetter(g.PluginContextProvider)
channelLocalPublisher := liveplugin.NewChannelLocalPublisher(node)
numLocalSubscribersGetter := liveplugin.NewNumLocalSubscribersGetter(node)
g.runStreamManager = runstream.NewManager(channelLocalPublisher, numLocalSubscribersGetter, g.contextGetter)
// Initialize the main features
dash := &features.DashboardHandler{
@ -173,7 +226,32 @@ func (g *GrafanaLive) Init() error {
g.GrafanaScope.Features["dashboard"] = dash
g.GrafanaScope.Features["broadcast"] = features.NewBroadcastRunner(g.storage)
g.ManagedStreamRunner = managedstream.NewRunner(g.Publish)
var managedStreamRunner *managedstream.Runner
if g.IsHA() {
redisClient := redis.NewClient(&redis.Options{
Addr: g.Cfg.LiveHAEngineAddress,
})
cmd := redisClient.Ping()
if _, err := cmd.Result(); err != nil {
return fmt.Errorf("error pinging Redis: %v", err)
}
managedStreamRunner = managedstream.NewRunner(
g.Publish,
managedstream.NewRedisFrameCache(redisClient),
)
} else {
managedStreamRunner = managedstream.NewRunner(
g.Publish,
managedstream.NewMemoryFrameCache(),
)
}
g.ManagedStreamRunner = managedStreamRunner
g.surveyCaller = survey.NewCaller(managedStreamRunner, node)
err = g.surveyCaller.SetupHandlers()
if err != nil {
return err
}
// Set ConnectHandler called when client successfully connected to Node. Your code
// inside handler must be synchronized since it will be called concurrently from
@ -222,6 +300,9 @@ func (g *GrafanaLive) Init() error {
reason := "normal"
if e.Disconnect != nil {
reason = e.Disconnect.Reason
if e.Disconnect.Code == 3001 { // Shutdown
return
}
}
logger.Debug("Client disconnected", "user", client.UserID(), "client", client.ID(), "reason", reason, "elapsed", time.Since(connectedAt))
})
@ -581,14 +662,7 @@ func (g *GrafanaLive) handleStreamScope(u *models.SignedInUser, namespace string
func (g *GrafanaLive) handleDatasourceScope(user *models.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) {
ds, err := g.DatasourceCache.GetDatasourceByUID(namespace, user, false)
if err != nil {
// the namespace may be an ID
id, _ := strconv.ParseInt(namespace, 10, 64)
if id > 0 {
ds, err = g.DatasourceCache.GetDatasource(id, user, false)
}
if err != nil {
return nil, fmt.Errorf("error getting datasource: %w", err)
}
return nil, fmt.Errorf("error getting datasource: %w", err)
}
streamHandler, err := g.getStreamPlugin(ds.Type)
if err != nil {
@ -603,13 +677,13 @@ func (g *GrafanaLive) handleDatasourceScope(user *models.SignedInUser, namespace
), nil
}
// Publish sends the data to the channel without checking permissions etc
// Publish sends the data to the channel without checking permissions etc.
func (g *GrafanaLive) Publish(orgID int64, channel string, data []byte) error {
_, err := g.node.Publish(orgchannel.PrependOrgID(orgID, channel), data)
return err
}
// ClientCount returns the number of clients
// ClientCount returns the number of clients.
func (g *GrafanaLive) ClientCount(orgID int64, channel string) (int, error) {
p, err := g.node.Presence(orgchannel.PrependOrgID(orgID, channel))
if err != nil {
@ -652,35 +726,25 @@ func (g *GrafanaLive) HandleHTTPPublish(ctx *models.ReqContext, cmd dtos.LivePub
return response.JSON(http.StatusOK, dtos.LivePublishResponse{})
}
type streamChannelListResponse struct {
Channels []*managedstream.ManagedChannel `json:"channels"`
}
// HandleListHTTP returns metadata so the UI can build a nice form
func (g *GrafanaLive) HandleListHTTP(c *models.ReqContext) response.Response {
info := util.DynMap{}
channels := make([]util.DynMap, 0)
for k, v := range g.ManagedStreamRunner.Streams(c.SignedInUser.OrgId) {
channels = append(channels, v.ListChannels(c.SignedInUser.OrgId, "stream/"+k+"/")...)
var channels []*managedstream.ManagedChannel
var err error
if g.IsHA() {
channels, err = g.surveyCaller.CallManagedStreams(c.SignedInUser.OrgId)
} else {
channels, err = g.ManagedStreamRunner.GetManagedChannels(c.SignedInUser.OrgId)
}
// Hardcode sample streams
frameJSON, err := data.FrameToJSON(data.NewFrame("testdata",
data.NewField("Time", nil, make([]time.Time, 0)),
data.NewField("Value", nil, make([]float64, 0)),
data.NewField("Min", nil, make([]float64, 0)),
data.NewField("Max", nil, make([]float64, 0)),
), data.IncludeSchemaOnly)
if err == nil {
channels = append(channels, util.DynMap{
"channel": "plugin/testdata/random-2s-stream",
"data": json.RawMessage(frameJSON),
}, util.DynMap{
"channel": "plugin/testdata/random-flakey-stream",
"data": json.RawMessage(frameJSON),
}, util.DynMap{
"channel": "plugin/testdata/random-20Hz-stream",
"data": json.RawMessage(frameJSON),
})
if err != nil {
return response.Error(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), err)
}
info := streamChannelListResponse{
Channels: channels,
}
info["channels"] = channels
return response.JSONStreaming(200, info)
}
@ -696,3 +760,27 @@ func (g *GrafanaLive) HandleInfoHTTP(ctx *models.ReqContext) response.Response {
"message": "Info is not supported for this channel",
})
}
// Write to the standard log15 logger
func handleLog(msg centrifuge.LogEntry) {
arr := make([]interface{}, 0)
for k, v := range msg.Fields {
if v == nil {
v = "<nil>"
} else if v == "" {
v = "<empty>"
}
arr = append(arr, k, v)
}
switch msg.Level {
case centrifuge.LogLevelDebug:
loggerCF.Debug(msg.Message, arr...)
case centrifuge.LogLevelError:
loggerCF.Error(msg.Message, arr...)
case centrifuge.LogLevelInfo:
loggerCF.Info(msg.Message, arr...)
default:
loggerCF.Debug(msg.Message, arr...)
}
}

View File

@ -0,0 +1,56 @@
package liveplugin
import (
"fmt"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins/plugincontext"
"github.com/centrifugal/centrifuge"
"github.com/grafana/grafana-plugin-sdk-go/backend"
)
type ChannelLocalPublisher struct {
node *centrifuge.Node
}
func NewChannelLocalPublisher(node *centrifuge.Node) *ChannelLocalPublisher {
return &ChannelLocalPublisher{node: node}
}
func (p *ChannelLocalPublisher) PublishLocal(channel string, data []byte) error {
pub := &centrifuge.Publication{
Data: data,
}
err := p.node.Hub().BroadcastPublication(channel, pub, centrifuge.StreamPosition{})
if err != nil {
return fmt.Errorf("error publishing %s: %w", string(data), err)
}
return nil
}
type NumLocalSubscribersGetter struct {
node *centrifuge.Node
}
func NewNumLocalSubscribersGetter(node *centrifuge.Node) *NumLocalSubscribersGetter {
return &NumLocalSubscribersGetter{node: node}
}
func (p *NumLocalSubscribersGetter) GetNumLocalSubscribers(channelID string) (int, error) {
return p.node.Hub().NumSubscribers(channelID), nil
}
type ContextGetter struct {
PluginContextProvider *plugincontext.Provider
}
func NewContextGetter(pluginContextProvider *plugincontext.Provider) *ContextGetter {
return &ContextGetter{
PluginContextProvider: pluginContextProvider,
}
}
func (g *ContextGetter) GetPluginContext(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) {
return g.PluginContextProvider.Get(pluginID, datasourceUID, user, skipCache)
}

View File

@ -1,27 +0,0 @@
package live
import "github.com/centrifugal/centrifuge"
// Write to the standard log15 logger
func handleLog(msg centrifuge.LogEntry) {
arr := make([]interface{}, 0)
for k, v := range msg.Fields {
if v == nil {
v = "<nil>"
} else if v == "" {
v = "<empty>"
}
arr = append(arr, k, v)
}
switch msg.Level {
case centrifuge.LogLevelDebug:
loggerCF.Debug(msg.Message, arr...)
case centrifuge.LogLevelError:
loggerCF.Error(msg.Message, arr...)
case centrifuge.LogLevelInfo:
loggerCF.Info(msg.Message, arr...)
default:
loggerCF.Debug(msg.Message, arr...)
}
}

View File

@ -0,0 +1,17 @@
package managedstream
import (
"encoding/json"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
// FrameCache allows updating frame schema. Returns true is schema not changed.
type FrameCache interface {
// GetActiveChannels returns active managed stream channels with JSON schema.
GetActiveChannels(orgID int64) (map[string]json.RawMessage, error)
// GetFrame returns full JSON frame for a path.
GetFrame(orgID int64, channel string) (json.RawMessage, bool, error)
// Update updates frame cache and returns true if schema changed.
Update(orgID int64, channel string, frameJson data.FrameJSONCache) (bool, error)
}

View File

@ -0,0 +1,54 @@
package managedstream
import (
"encoding/json"
"sync"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
// MemoryFrameCache ...
type MemoryFrameCache struct {
mu sync.RWMutex
frames map[int64]map[string]data.FrameJSONCache
}
// NewMemoryFrameCache ...
func NewMemoryFrameCache() *MemoryFrameCache {
return &MemoryFrameCache{
frames: map[int64]map[string]data.FrameJSONCache{},
}
}
func (c *MemoryFrameCache) GetActiveChannels(orgID int64) (map[string]json.RawMessage, error) {
c.mu.RLock()
defer c.mu.RUnlock()
frames, ok := c.frames[orgID]
if !ok {
return nil, nil
}
info := make(map[string]json.RawMessage, len(frames))
for k, v := range frames {
info[k] = v.Bytes(data.IncludeSchemaOnly)
}
return info, nil
}
func (c *MemoryFrameCache) GetFrame(orgID int64, channel string) (json.RawMessage, bool, error) {
c.mu.RLock()
defer c.mu.RUnlock()
cachedFrame, ok := c.frames[orgID][channel]
return cachedFrame.Bytes(data.IncludeAll), ok, nil
}
func (c *MemoryFrameCache) Update(orgID int64, channel string, jsonFrame data.FrameJSONCache) (bool, error) {
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.frames[orgID]; !ok {
c.frames[orgID] = map[string]data.FrameJSONCache{}
}
cachedJsonFrame, exists := c.frames[orgID][channel]
schemaUpdated := !exists || !cachedJsonFrame.SameSchema(&jsonFrame)
c.frames[orgID][channel] = jsonFrame
return schemaUpdated, nil
}

View File

@ -0,0 +1,69 @@
package managedstream
import (
"encoding/json"
"testing"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/stretchr/testify/require"
)
func testFrameCache(t *testing.T, c FrameCache) {
// Create new frame and update cache.
frame := data.NewFrame("hello")
frameJsonCache, err := data.FrameToJSONCache(frame)
require.NoError(t, err)
updated, err := c.Update(1, "test", frameJsonCache)
require.NoError(t, err)
require.True(t, updated)
// Make sure channel is active.
channels, err := c.GetActiveChannels(1)
require.NoError(t, err)
schema, ok := channels["test"]
require.True(t, ok)
require.NotZero(t, schema)
// Make sure the same frame does not update schema.
updated, err = c.Update(1, "test", frameJsonCache)
require.NoError(t, err)
require.False(t, updated)
// Now construct new frame with updated schema.
newFrame := data.NewFrame("hello", data.NewField("new_field", nil, []int64{}))
frameJsonCache, err = data.FrameToJSONCache(newFrame)
require.NoError(t, err)
// Make sure schema updated.
updated, err = c.Update(1, "test", frameJsonCache)
require.NoError(t, err)
require.True(t, updated)
// Add the same with another orgID and make sure schema updated.
updated, err = c.Update(2, "test", frameJsonCache)
require.NoError(t, err)
require.True(t, updated)
// Make sure that the last frame successfully saved in cache.
frameJSON, ok, err := c.GetFrame(1, "test")
require.NoError(t, err)
require.True(t, ok)
var f data.Frame
err = json.Unmarshal(frameJSON, &f)
require.NoError(t, err)
require.Equal(t, "new_field", f.Fields[0].Name)
// Make sure channel has updated schema.
channels, err = c.GetActiveChannels(1)
require.NoError(t, err)
require.NotEqual(t, string(channels["test"]), string(schema))
}
func TestMemoryFrameCache(t *testing.T) {
c := NewMemoryFrameCache()
require.NotNil(t, c)
testFrameCache(t, c)
}

View File

@ -0,0 +1,111 @@
package managedstream
import (
"encoding/json"
"errors"
"sync"
"time"
"github.com/grafana/grafana/pkg/services/live/orgchannel"
"github.com/grafana/grafana-plugin-sdk-go/data"
"gopkg.in/redis.v5"
)
// RedisFrameCache ...
type RedisFrameCache struct {
mu sync.RWMutex
redisClient *redis.Client
frames map[int64]map[string]data.FrameJSONCache
}
// NewRedisFrameCache ...
func NewRedisFrameCache(redisClient *redis.Client) *RedisFrameCache {
return &RedisFrameCache{
frames: map[int64]map[string]data.FrameJSONCache{},
redisClient: redisClient,
}
}
func (c *RedisFrameCache) GetActiveChannels(orgID int64) (map[string]json.RawMessage, error) {
c.mu.RLock()
defer c.mu.RUnlock()
frames, ok := c.frames[orgID]
if !ok {
return nil, nil
}
info := make(map[string]json.RawMessage, len(frames))
for k, v := range frames {
info[k] = v.Bytes(data.IncludeSchemaOnly)
}
return info, nil
}
func (c *RedisFrameCache) GetFrame(orgID int64, channel string) (json.RawMessage, bool, error) {
key := getCacheKey(orgchannel.PrependOrgID(orgID, channel))
cmd := c.redisClient.HGetAll(key)
result, err := cmd.Result()
if err != nil {
return nil, false, err
}
if len(result) == 0 {
return nil, false, nil
}
return json.RawMessage(result["frame"]), true, nil
}
const (
frameCacheTTL = 7 * 24 * time.Hour
)
func (c *RedisFrameCache) Update(orgID int64, channel string, jsonFrame data.FrameJSONCache) (bool, error) {
c.mu.Lock()
if _, ok := c.frames[orgID]; !ok {
c.frames[orgID] = map[string]data.FrameJSONCache{}
}
c.frames[orgID][channel] = jsonFrame
c.mu.Unlock()
stringSchema := string(jsonFrame.Bytes(data.IncludeSchemaOnly))
key := getCacheKey(orgchannel.PrependOrgID(orgID, channel))
pipe := c.redisClient.TxPipeline()
defer func() { _ = pipe.Close() }()
pipe.HGetAll(key)
pipe.HMSet(key, map[string]string{
"schema": stringSchema,
"frame": string(jsonFrame.Bytes(data.IncludeAll)),
})
pipe.Expire(key, frameCacheTTL)
replies, err := pipe.Exec()
if err != nil {
return false, err
}
if len(replies) == 0 {
return false, errors.New("no replies in response")
}
reply := replies[0]
if reply.Err() != nil {
return false, err
}
if mapReply, ok := reply.(*redis.StringStringMapCmd); ok {
result, err := mapReply.Result()
if err != nil {
return false, err
}
if len(result) == 0 {
return true, nil
}
return result["schema"] != stringSchema, nil
}
return true, nil
}
func getCacheKey(channelID string) string {
return "gf_live.managed_stream." + channelID
}

View File

@ -0,0 +1,19 @@
// +build redis
package managedstream
import (
"testing"
"github.com/stretchr/testify/require"
"gopkg.in/redis.v5"
)
func TestRedisCacheStorage(t *testing.T) {
redisClient := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
c := NewRedisFrameCache(redisClient)
require.NotNil(t, c)
testFrameCache(t, c)
}

View File

@ -3,6 +3,7 @@ package managedstream
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
@ -11,7 +12,6 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/live"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/util"
)
var (
@ -20,19 +20,53 @@ var (
// Runner keeps ManagedStream per streamID.
type Runner struct {
mu sync.RWMutex
streams map[int64]map[string]*ManagedStream
publisher models.ChannelPublisher
mu sync.RWMutex
streams map[int64]map[string]*ManagedStream
publisher models.ChannelPublisher
frameCache FrameCache
}
// NewRunner creates new Runner.
func NewRunner(publisher models.ChannelPublisher) *Runner {
func NewRunner(publisher models.ChannelPublisher, frameCache FrameCache) *Runner {
return &Runner{
publisher: publisher,
streams: map[int64]map[string]*ManagedStream{},
publisher: publisher,
streams: map[int64]map[string]*ManagedStream{},
frameCache: frameCache,
}
}
func (r *Runner) GetManagedChannels(orgID int64) ([]*ManagedChannel, error) {
channels := make([]*ManagedChannel, 0)
for _, v := range r.Streams(orgID) {
streamChannels, err := v.ListChannels(orgID)
if err != nil {
return nil, err
}
channels = append(channels, streamChannels...)
}
// Hardcode sample streams
frameJSON, err := data.FrameToJSON(data.NewFrame("testdata",
data.NewField("Time", nil, make([]time.Time, 0)),
data.NewField("Value", nil, make([]float64, 0)),
data.NewField("Min", nil, make([]float64, 0)),
data.NewField("Max", nil, make([]float64, 0)),
), data.IncludeSchemaOnly)
if err == nil {
channels = append(channels, &ManagedChannel{
Channel: "plugin/testdata/random-2s-stream",
Data: frameJSON,
}, &ManagedChannel{
Channel: "plugin/testdata/random-flakey-stream",
Data: frameJSON,
}, &ManagedChannel{
Channel: "plugin/testdata/random-20Hz-stream",
Data: frameJSON,
})
}
return channels, nil
}
// Streams returns a map of active managed streams (per streamID).
func (r *Runner) Streams(orgID int64) map[string]*ManagedStream {
r.mu.RLock()
@ -58,7 +92,7 @@ func (r *Runner) GetOrCreateStream(orgID int64, streamID string) (*ManagedStream
}
s, ok := r.streams[orgID][streamID]
if !ok {
s = NewManagedStream(streamID, r.publisher)
s = NewManagedStream(streamID, r.publisher, r.frameCache)
r.streams[orgID][streamID] = s
}
return s, nil
@ -66,112 +100,90 @@ func (r *Runner) GetOrCreateStream(orgID int64, streamID string) (*ManagedStream
// ManagedStream holds the state of a managed stream.
type ManagedStream struct {
mu sync.RWMutex
id string
start time.Time
last map[int64]map[string]data.FrameJSONCache
publisher models.ChannelPublisher
id string
start time.Time
publisher models.ChannelPublisher
frameCache FrameCache
}
// NewManagedStream creates new ManagedStream.
func NewManagedStream(id string, publisher models.ChannelPublisher) *ManagedStream {
func NewManagedStream(id string, publisher models.ChannelPublisher, schemaUpdater FrameCache) *ManagedStream {
return &ManagedStream{
id: id,
start: time.Now(),
last: map[int64]map[string]data.FrameJSONCache{},
publisher: publisher,
id: id,
start: time.Now(),
publisher: publisher,
frameCache: schemaUpdater,
}
}
// ManagedChannel represents a managed stream.
type ManagedChannel struct {
Channel string `json:"channel"`
Data json.RawMessage `json:"data"`
}
// ListChannels returns info for the UI about this stream.
func (s *ManagedStream) ListChannels(orgID int64, prefix string) []util.DynMap {
s.mu.RLock()
defer s.mu.RUnlock()
if _, ok := s.last[orgID]; !ok {
return []util.DynMap{}
func (s *ManagedStream) ListChannels(orgID int64) ([]*ManagedChannel, error) {
paths, err := s.frameCache.GetActiveChannels(orgID)
if err != nil {
return []*ManagedChannel{}, fmt.Errorf("error getting active managed stream paths: %v", err)
}
info := make([]util.DynMap, 0, len(s.last[orgID]))
for k, v := range s.last[orgID] {
ch := util.DynMap{}
ch["channel"] = prefix + k
ch["data"] = json.RawMessage(v.Bytes(data.IncludeSchemaOnly))
info = append(info, ch)
info := make([]*ManagedChannel, 0, len(paths))
for k, v := range paths {
managedChannel := &ManagedChannel{
Channel: k,
Data: v,
}
info = append(info, managedChannel)
}
return info
return info, nil
}
// Push sends frame to the stream and saves it for later retrieval by subscribers.
// unstableSchema flag can be set to disable schema caching for a path.
func (s *ManagedStream) Push(orgID int64, path string, frame *data.Frame) error {
// Keep schema + data for last packet.
msg, err := data.FrameToJSONCache(frame)
jsonFrameCache, err := data.FrameToJSONCache(frame)
if err != nil {
logger.Error("Error marshaling frame with data", "error", err)
return err
}
s.mu.Lock()
if _, ok := s.last[orgID]; !ok {
s.last[orgID] = map[string]data.FrameJSONCache{}
}
last, exists := s.last[orgID][path]
s.last[orgID][path] = msg
s.mu.Unlock()
include := data.IncludeAll
if exists && last.SameSchema(&msg) {
// When the schema has not changed, just send the data.
include = data.IncludeDataOnly
}
frameJSON := msg.Bytes(include)
// The channel this will be posted into.
channel := live.Channel{Scope: live.ScopeStream, Namespace: s.id, Path: path}.String()
isUpdated, err := s.frameCache.Update(orgID, channel, jsonFrameCache)
if err != nil {
logger.Error("Error updating managed stream schema", "error", err)
return err
}
// When the schema has not changed, just send the data.
include := data.IncludeDataOnly
if isUpdated {
// When the schema has been changed, send all.
include = data.IncludeAll
}
frameJSON := jsonFrameCache.Bytes(include)
logger.Debug("Publish data to channel", "channel", channel, "dataLength", len(frameJSON))
return s.publisher(orgID, channel, frameJSON)
}
// getLastPacket retrieves last packet channel.
func (s *ManagedStream) getLastPacket(orgId int64, path string) (json.RawMessage, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
_, ok := s.last[orgId]
if !ok {
return nil, false
}
msg, ok := s.last[orgId][path]
if ok {
return msg.Bytes(data.IncludeAll), ok
}
return nil, ok
}
func (s *ManagedStream) GetHandlerForPath(_ string) (models.ChannelHandler, error) {
return s, nil
}
func (s *ManagedStream) OnSubscribe(_ context.Context, u *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) {
reply := models.SubscribeReply{}
packet, ok := s.getLastPacket(u.OrgId, e.Path)
frameJSON, ok, err := s.frameCache.GetFrame(u.OrgId, e.Channel)
if err != nil {
return reply, 0, err
}
if ok {
reply.Data = packet
reply.Data = frameJSON
}
return reply, backend.SubscribeStreamStatusOK, nil
}
func (s *ManagedStream) OnPublish(_ context.Context, u *models.SignedInUser, evt models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) {
var frame data.Frame
err := json.Unmarshal(evt.Data, &frame)
if err != nil {
// Stream scope only deals with data frames.
return models.PublishReply{}, 0, err
}
err = s.Push(u.OrgId, evt.Path, &frame)
if err != nil {
// Stream scope only deals with data frames.
return models.PublishReply{}, 0, err
}
return models.PublishReply{}, backend.PublishStreamStatusOK, nil
func (s *ManagedStream) OnPublish(_ context.Context, _ *models.SignedInUser, _ models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) {
return models.PublishReply{}, backend.PublishStreamStatusPermissionDenied, nil
}

View File

@ -3,7 +3,6 @@ package managedstream
import (
"testing"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/stretchr/testify/require"
)
@ -19,21 +18,6 @@ func (p *testPublisher) publish(orgID int64, _ string, _ []byte) error {
func TestNewManagedStream(t *testing.T) {
publisher := &testPublisher{orgID: 1, t: t}
c := NewManagedStream("a", publisher.publish)
c := NewManagedStream("a", publisher.publish, NewMemoryFrameCache())
require.NotNil(t, c)
}
func TestManagedStream_GetLastPacket(t *testing.T) {
var orgID int64 = 1
publisher := &testPublisher{orgID: orgID, t: t}
c := NewManagedStream("a", publisher.publish)
_, ok := c.getLastPacket(orgID, "test")
require.False(t, ok)
err := c.Push(orgID, "test", data.NewFrame("hello"))
require.NoError(t, err)
s, ok := c.getLastPacket(orgID, "test")
require.NoError(t, err)
require.True(t, ok)
require.JSONEq(t, `{"schema":{"name":"hello","fields":[]},"data":{"values":[]}}`, string(s))
}

View File

@ -1,57 +0,0 @@
package live
import (
"fmt"
"github.com/grafana/grafana/pkg/models"
"github.com/centrifugal/centrifuge"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/plugins/plugincontext"
)
type pluginChannelSender struct {
node *centrifuge.Node
}
func newPluginChannelSender(node *centrifuge.Node) *pluginChannelSender {
return &pluginChannelSender{node: node}
}
func (p *pluginChannelSender) Send(channel string, data []byte) error {
_, err := p.node.Publish(channel, data)
if err != nil {
return fmt.Errorf("error publishing %s: %w", string(data), err)
}
return nil
}
type pluginPresenceGetter struct {
node *centrifuge.Node
}
func newPluginPresenceGetter(node *centrifuge.Node) *pluginPresenceGetter {
return &pluginPresenceGetter{node: node}
}
func (p *pluginPresenceGetter) GetNumSubscribers(channel string) (int, error) {
res, err := p.node.PresenceStats(channel)
if err != nil {
return 0, err
}
return res.NumClients, nil
}
type pluginContextGetter struct {
PluginContextProvider *plugincontext.Provider
}
func newPluginContextGetter(pluginContextProvider *plugincontext.Provider) *pluginContextGetter {
return &pluginContextGetter{
PluginContextProvider: pluginContextProvider,
}
}
func (g *pluginContextGetter) GetPluginContext(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) {
return g.PluginContextProvider.Get(pluginID, datasourceUID, user, skipCache)
}

View File

@ -18,18 +18,19 @@ var (
logger = log.New("live.runstream")
)
//go:generate mockgen -destination=mock.go -package=runstream github.com/grafana/grafana/pkg/services/live/runstream ChannelSender,PresenceGetter,StreamRunner,PluginContextGetter
//go:generate mockgen -destination=mock.go -package=runstream github.com/grafana/grafana/pkg/services/live/runstream ChannelLocalPublisher,NumLocalSubscribersGetter,StreamRunner,PluginContextGetter
type ChannelSender interface {
Send(channel string, data []byte) error
type ChannelLocalPublisher interface {
PublishLocal(channel string, data []byte) error
}
type PluginContextGetter interface {
GetPluginContext(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error)
}
type PresenceGetter interface {
GetNumSubscribers(channel string) (int, error)
type NumLocalSubscribersGetter interface {
// GetNumSubscribers returns number of channel subscribers throughout all nodes.
GetNumLocalSubscribers(channel string) (int, error)
}
type StreamRunner interface {
@ -37,12 +38,12 @@ type StreamRunner interface {
}
type packetSender struct {
channelSender ChannelSender
channel string
channelLocalPublisher ChannelLocalPublisher
channel string
}
func (p *packetSender) Send(packet *backend.StreamPacket) error {
return p.channelSender.Send(p.channel, packet.Data)
return p.channelLocalPublisher.PublishLocal(p.channel, packet.Data)
}
// Manager manages streams from Grafana to plugins (i.e. RunStream method).
@ -51,9 +52,9 @@ type Manager struct {
baseCtx context.Context
streams map[string]streamContext
datasourceStreams map[string]map[string]struct{}
presenceGetter PresenceGetter
presenceGetter NumLocalSubscribersGetter
pluginContextGetter PluginContextGetter
channelSender ChannelSender
channelSender ChannelLocalPublisher
registerCh chan submitRequest
closedCh chan struct{}
checkInterval time.Duration
@ -79,7 +80,7 @@ const (
)
// NewManager creates new Manager.
func NewManager(channelSender ChannelSender, presenceGetter PresenceGetter, pluginContextGetter PluginContextGetter, opts ...ManagerOption) *Manager {
func NewManager(channelSender ChannelLocalPublisher, presenceGetter NumLocalSubscribersGetter, pluginContextGetter PluginContextGetter, opts ...ManagerOption) *Manager {
sm := &Manager{
streams: make(map[string]streamContext),
datasourceStreams: map[string]map[string]struct{}{},
@ -201,9 +202,9 @@ func (s *Manager) watchStream(ctx context.Context, cancelFn func(), sr streamReq
}
}
case <-presenceTicker.C:
numSubscribers, err := s.presenceGetter.GetNumSubscribers(sr.Channel)
numSubscribers, err := s.presenceGetter.GetNumLocalSubscribers(sr.Channel)
if err != nil {
logger.Error("Error checking num subscribers", "channel", sr.Channel, "path", sr.Path)
logger.Error("Error checking num subscribers", "channel", sr.Channel, "path", sr.Path, "error", err)
continue
}
if numSubscribers > 0 {
@ -301,7 +302,7 @@ func (s *Manager) runStream(ctx context.Context, cancelFn func(), sr streamReque
PluginContext: pluginCtx,
Path: sr.Path,
},
backend.NewStreamSender(&packetSender{channelSender: s.channelSender, channel: sr.Channel}),
backend.NewStreamSender(&packetSender{channelLocalPublisher: s.channelSender, channel: sr.Channel}),
)
if err != nil {
if errors.Is(ctx.Err(), context.Canceled) {

View File

@ -27,11 +27,11 @@ func TestStreamManager_Run(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mockChannelSender := NewMockChannelSender(mockCtrl)
mockPresenceGetter := NewMockPresenceGetter(mockCtrl)
mockChannelPublisher := NewMockChannelLocalPublisher(mockCtrl)
mockNumSubscribersGetter := NewMockNumLocalSubscribersGetter(mockCtrl)
mockContextGetter := NewMockPluginContextGetter(mockCtrl)
manager := NewManager(mockChannelSender, mockPresenceGetter, mockContextGetter)
manager := NewManager(mockChannelPublisher, mockNumSubscribersGetter, mockContextGetter)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -48,11 +48,11 @@ func TestStreamManager_SubmitStream_Send(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mockPacketSender := NewMockChannelSender(mockCtrl)
mockPresenceGetter := NewMockPresenceGetter(mockCtrl)
mockPacketSender := NewMockChannelLocalPublisher(mockCtrl)
mockNumSubscribersGetter := NewMockNumLocalSubscribersGetter(mockCtrl)
mockContextGetter := NewMockPluginContextGetter(mockCtrl)
manager := NewManager(mockPacketSender, mockPresenceGetter, mockContextGetter)
manager := NewManager(mockPacketSender, mockNumSubscribersGetter, mockContextGetter)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -79,7 +79,7 @@ func TestStreamManager_SubmitStream_Send(t *testing.T) {
return testPluginContext, true, nil
}).Times(0)
mockPacketSender.EXPECT().Send("1/test", gomock.Any()).Times(1)
mockPacketSender.EXPECT().PublishLocal("1/test", gomock.Any()).Times(1)
mockStreamRunner := NewMockStreamRunner(mockCtrl)
mockStreamRunner.EXPECT().RunStream(
@ -113,11 +113,11 @@ func TestStreamManager_SubmitStream_DifferentOrgID(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mockPacketSender := NewMockChannelSender(mockCtrl)
mockPresenceGetter := NewMockPresenceGetter(mockCtrl)
mockPacketSender := NewMockChannelLocalPublisher(mockCtrl)
mockNumSubscribersGetter := NewMockNumLocalSubscribersGetter(mockCtrl)
mockContextGetter := NewMockPluginContextGetter(mockCtrl)
manager := NewManager(mockPacketSender, mockPresenceGetter, mockContextGetter)
manager := NewManager(mockPacketSender, mockNumSubscribersGetter, mockContextGetter)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -130,8 +130,8 @@ func TestStreamManager_SubmitStream_DifferentOrgID(t *testing.T) {
doneCh1 := make(chan struct{})
doneCh2 := make(chan struct{})
mockPacketSender.EXPECT().Send("1/test", gomock.Any()).Times(1)
mockPacketSender.EXPECT().Send("2/test", gomock.Any()).Times(1)
mockPacketSender.EXPECT().PublishLocal("1/test", gomock.Any()).Times(1)
mockPacketSender.EXPECT().PublishLocal("2/test", gomock.Any()).Times(1)
mockContextGetter.EXPECT().GetPluginContext(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(user *models.SignedInUser, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, bool, error) {
return backend.PluginContext{}, true, nil
@ -184,14 +184,14 @@ func TestStreamManager_SubmitStream_CloseNoSubscribers(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mockPacketSender := NewMockChannelSender(mockCtrl)
mockPresenceGetter := NewMockPresenceGetter(mockCtrl)
mockPacketSender := NewMockChannelLocalPublisher(mockCtrl)
mockNumSubscribersGetter := NewMockNumLocalSubscribersGetter(mockCtrl)
mockContextGetter := NewMockPluginContextGetter(mockCtrl)
// Create manager with very fast num subscribers checks.
manager := NewManager(
mockPacketSender,
mockPresenceGetter,
mockNumSubscribersGetter,
mockContextGetter,
WithCheckConfig(10*time.Millisecond, 3),
)
@ -209,7 +209,7 @@ func TestStreamManager_SubmitStream_CloseNoSubscribers(t *testing.T) {
return backend.PluginContext{}, true, nil
}).Times(0)
mockPresenceGetter.EXPECT().GetNumSubscribers("1/test").Return(0, nil).Times(3)
mockNumSubscribersGetter.EXPECT().GetNumLocalSubscribers("1/test").Return(0, nil).Times(3)
mockStreamRunner := NewMockStreamRunner(mockCtrl)
mockStreamRunner.EXPECT().RunStream(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
@ -231,11 +231,11 @@ func TestStreamManager_SubmitStream_ErrorRestartsRunStream(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mockPacketSender := NewMockChannelSender(mockCtrl)
mockPresenceGetter := NewMockPresenceGetter(mockCtrl)
mockPacketSender := NewMockChannelLocalPublisher(mockCtrl)
mockNumSubscribersGetter := NewMockNumLocalSubscribersGetter(mockCtrl)
mockContextGetter := NewMockPluginContextGetter(mockCtrl)
manager := NewManager(mockPacketSender, mockPresenceGetter, mockContextGetter)
manager := NewManager(mockPacketSender, mockNumSubscribersGetter, mockContextGetter)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -284,11 +284,11 @@ func TestStreamManager_SubmitStream_NilErrorStopsRunStream(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mockPacketSender := NewMockChannelSender(mockCtrl)
mockPresenceGetter := NewMockPresenceGetter(mockCtrl)
mockPacketSender := NewMockChannelLocalPublisher(mockCtrl)
mockNumSubscribersGetter := NewMockNumLocalSubscribersGetter(mockCtrl)
mockContextGetter := NewMockPluginContextGetter(mockCtrl)
manager := NewManager(mockPacketSender, mockPresenceGetter, mockContextGetter)
manager := NewManager(mockPacketSender, mockNumSubscribersGetter, mockContextGetter)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -317,11 +317,11 @@ func TestStreamManager_HandleDatasourceUpdate(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mockPacketSender := NewMockChannelSender(mockCtrl)
mockPresenceGetter := NewMockPresenceGetter(mockCtrl)
mockPacketSender := NewMockChannelLocalPublisher(mockCtrl)
mockNumSubscribersGetter := NewMockNumLocalSubscribersGetter(mockCtrl)
mockContextGetter := NewMockPluginContextGetter(mockCtrl)
manager := NewManager(mockPacketSender, mockPresenceGetter, mockContextGetter)
manager := NewManager(mockPacketSender, mockNumSubscribersGetter, mockContextGetter)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -383,11 +383,11 @@ func TestStreamManager_HandleDatasourceDelete(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mockPacketSender := NewMockChannelSender(mockCtrl)
mockPresenceGetter := NewMockPresenceGetter(mockCtrl)
mockPacketSender := NewMockChannelLocalPublisher(mockCtrl)
mockNumSubscribersGetter := NewMockNumLocalSubscribersGetter(mockCtrl)
mockContextGetter := NewMockPluginContextGetter(mockCtrl)
manager := NewManager(mockPacketSender, mockPresenceGetter, mockContextGetter)
manager := NewManager(mockPacketSender, mockNumSubscribersGetter, mockContextGetter)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

View File

@ -1,5 +1,5 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/grafana/grafana/pkg/services/live/runstream (interfaces: ChannelSender,PresenceGetter,StreamRunner,PluginContextGetter)
// Source: github.com/grafana/grafana/pkg/services/live/runstream (interfaces: ChannelLocalPublisher,NumLocalSubscribersGetter,StreamRunner,PluginContextGetter)
// Package runstream is a generated GoMock package.
package runstream
@ -13,79 +13,79 @@ import (
models "github.com/grafana/grafana/pkg/models"
)
// MockChannelSender is a mock of ChannelSender interface.
type MockChannelSender struct {
// MockChannelLocalPublisher is a mock of ChannelLocalPublisher interface.
type MockChannelLocalPublisher struct {
ctrl *gomock.Controller
recorder *MockChannelSenderMockRecorder
recorder *MockChannelLocalPublisherMockRecorder
}
// MockChannelSenderMockRecorder is the mock recorder for MockChannelSender.
type MockChannelSenderMockRecorder struct {
mock *MockChannelSender
// MockChannelLocalPublisherMockRecorder is the mock recorder for MockChannelLocalPublisher.
type MockChannelLocalPublisherMockRecorder struct {
mock *MockChannelLocalPublisher
}
// NewMockChannelSender creates a new mock instance.
func NewMockChannelSender(ctrl *gomock.Controller) *MockChannelSender {
mock := &MockChannelSender{ctrl: ctrl}
mock.recorder = &MockChannelSenderMockRecorder{mock}
// NewMockChannelLocalPublisher creates a new mock instance.
func NewMockChannelLocalPublisher(ctrl *gomock.Controller) *MockChannelLocalPublisher {
mock := &MockChannelLocalPublisher{ctrl: ctrl}
mock.recorder = &MockChannelLocalPublisherMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockChannelSender) EXPECT() *MockChannelSenderMockRecorder {
func (m *MockChannelLocalPublisher) EXPECT() *MockChannelLocalPublisherMockRecorder {
return m.recorder
}
// Send mocks base method.
func (m *MockChannelSender) Send(arg0 string, arg1 []byte) error {
// PublishLocal mocks base method.
func (m *MockChannelLocalPublisher) PublishLocal(arg0 string, arg1 []byte) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Send", arg0, arg1)
ret := m.ctrl.Call(m, "PublishLocal", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// Send indicates an expected call of Send.
func (mr *MockChannelSenderMockRecorder) Send(arg0, arg1 interface{}) *gomock.Call {
// PublishLocal indicates an expected call of PublishLocal.
func (mr *MockChannelLocalPublisherMockRecorder) PublishLocal(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockChannelSender)(nil).Send), arg0, arg1)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PublishLocal", reflect.TypeOf((*MockChannelLocalPublisher)(nil).PublishLocal), arg0, arg1)
}
// MockPresenceGetter is a mock of PresenceGetter interface.
type MockPresenceGetter struct {
// MockNumLocalSubscribersGetter is a mock of NumLocalSubscribersGetter interface.
type MockNumLocalSubscribersGetter struct {
ctrl *gomock.Controller
recorder *MockPresenceGetterMockRecorder
recorder *MockNumLocalSubscribersGetterMockRecorder
}
// MockPresenceGetterMockRecorder is the mock recorder for MockPresenceGetter.
type MockPresenceGetterMockRecorder struct {
mock *MockPresenceGetter
// MockNumLocalSubscribersGetterMockRecorder is the mock recorder for MockNumLocalSubscribersGetter.
type MockNumLocalSubscribersGetterMockRecorder struct {
mock *MockNumLocalSubscribersGetter
}
// NewMockPresenceGetter creates a new mock instance.
func NewMockPresenceGetter(ctrl *gomock.Controller) *MockPresenceGetter {
mock := &MockPresenceGetter{ctrl: ctrl}
mock.recorder = &MockPresenceGetterMockRecorder{mock}
// NewMockNumLocalSubscribersGetter creates a new mock instance.
func NewMockNumLocalSubscribersGetter(ctrl *gomock.Controller) *MockNumLocalSubscribersGetter {
mock := &MockNumLocalSubscribersGetter{ctrl: ctrl}
mock.recorder = &MockNumLocalSubscribersGetterMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockPresenceGetter) EXPECT() *MockPresenceGetterMockRecorder {
func (m *MockNumLocalSubscribersGetter) EXPECT() *MockNumLocalSubscribersGetterMockRecorder {
return m.recorder
}
// GetNumSubscribers mocks base method.
func (m *MockPresenceGetter) GetNumSubscribers(arg0 string) (int, error) {
// GetNumLocalSubscribers mocks base method.
func (m *MockNumLocalSubscribersGetter) GetNumLocalSubscribers(arg0 string) (int, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetNumSubscribers", arg0)
ret := m.ctrl.Call(m, "GetNumLocalSubscribers", arg0)
ret0, _ := ret[0].(int)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetNumSubscribers indicates an expected call of GetNumSubscribers.
func (mr *MockPresenceGetterMockRecorder) GetNumSubscribers(arg0 interface{}) *gomock.Call {
// GetNumLocalSubscribers indicates an expected call of GetNumLocalSubscribers.
func (mr *MockNumLocalSubscribersGetterMockRecorder) GetNumLocalSubscribers(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetNumSubscribers", reflect.TypeOf((*MockPresenceGetter)(nil).GetNumSubscribers), arg0)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetNumLocalSubscribers", reflect.TypeOf((*MockNumLocalSubscribersGetter)(nil).GetNumLocalSubscribers), arg0)
}
// MockStreamRunner is a mock of StreamRunner interface.

View File

@ -0,0 +1,117 @@
package survey
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/centrifugal/centrifuge"
"github.com/grafana/grafana/pkg/services/live/managedstream"
)
type Caller struct {
managedStreamRunner *managedstream.Runner
node *centrifuge.Node
}
const (
managedStreamsCall = "managed_streams"
)
func NewCaller(managedStreamRunner *managedstream.Runner, node *centrifuge.Node) *Caller {
return &Caller{managedStreamRunner: managedStreamRunner, node: node}
}
func (c *Caller) SetupHandlers() error {
c.node.OnSurvey(c.handleSurvey)
return nil
}
type NodeManagedChannelsRequest struct {
OrgID int64 `json:"orgId"`
}
type NodeManagedChannelsResponse struct {
Channels []*managedstream.ManagedChannel `json:"channels"`
}
func (c *Caller) handleSurvey(e centrifuge.SurveyEvent, cb centrifuge.SurveyCallback) {
var (
resp interface{}
err error
)
switch e.Op {
case managedStreamsCall:
resp, err = c.handleManagedStreams(e.Data)
default:
err = errors.New("method not found")
}
if err != nil {
cb(centrifuge.SurveyReply{Code: 1})
return
}
jsonData, err := json.Marshal(resp)
if err != nil {
cb(centrifuge.SurveyReply{Code: 1})
return
}
cb(centrifuge.SurveyReply{
Code: 0,
Data: jsonData,
})
}
func (c *Caller) handleManagedStreams(data []byte) (interface{}, error) {
var req NodeManagedChannelsRequest
err := json.Unmarshal(data, &req)
if err != nil {
return nil, err
}
channels, err := c.managedStreamRunner.GetManagedChannels(req.OrgID)
if err != nil {
return nil, err
}
return NodeManagedChannelsResponse{
Channels: channels,
}, nil
}
func (c *Caller) CallManagedStreams(orgID int64) ([]*managedstream.ManagedChannel, error) {
req := NodeManagedChannelsRequest{OrgID: orgID}
jsonData, err := json.Marshal(req)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
resp, err := c.node.Survey(ctx, managedStreamsCall, jsonData)
if err != nil {
return nil, err
}
channels := make([]*managedstream.ManagedChannel, 0)
duplicatesCheck := map[string]struct{}{}
for _, result := range resp {
if result.Code != 0 {
return nil, fmt.Errorf("unexpected survey code: %d", result.Code)
}
var res NodeManagedChannelsResponse
err := json.Unmarshal(result.Data, &res)
if err != nil {
return nil, err
}
for _, ch := range res.Channels {
if _, ok := duplicatesCheck[ch.Channel]; ok {
continue
}
channels = append(channels, ch)
duplicatesCheck[ch.Channel] = struct{}{}
}
}
return channels, nil
}

View File

@ -386,6 +386,11 @@ type Cfg struct {
// Grafana Live ws endpoint (per Grafana server instance). 0 disables
// Live, -1 means unlimited connections.
LiveMaxConnections int
// LiveHAEngine is a type of engine to use to achieve HA with Grafana Live.
// Zero value means in-memory single node setup.
LiveHAEngine string
// LiveHAEngineAddress is a connection address for Live HA engine.
LiveHAEngineAddress string
// Grafana.com URL
GrafanaComURL string
@ -1447,5 +1452,12 @@ func (cfg *Cfg) readLiveSettings(iniFile *ini.File) error {
if cfg.LiveMaxConnections < -1 {
return fmt.Errorf("unexpected value %d for [live] max_connections", cfg.LiveMaxConnections)
}
cfg.LiveHAEngine = section.Key("ha_engine").MustString("")
switch cfg.LiveHAEngine {
case "", "redis":
default:
return fmt.Errorf("unsupported live HA engine type: %s", cfg.LiveHAEngine)
}
cfg.LiveHAEngineAddress = section.Key("ha_engine_address").MustString("127.0.0.1:6379")
return nil
}