2020-07-27 02:26:16 -05:00
|
|
|
package live
|
|
|
|
|
|
|
|
import (
|
|
|
|
"encoding/json"
|
|
|
|
"fmt"
|
|
|
|
"strings"
|
2020-10-01 12:46:14 -05:00
|
|
|
"sync"
|
2020-07-27 02:26:16 -05:00
|
|
|
|
|
|
|
"github.com/centrifugal/centrifuge"
|
|
|
|
"github.com/grafana/grafana/pkg/infra/log"
|
|
|
|
"github.com/grafana/grafana/pkg/models"
|
2020-10-01 12:46:14 -05:00
|
|
|
"github.com/grafana/grafana/pkg/plugins"
|
|
|
|
"github.com/grafana/grafana/pkg/services/live/features"
|
2020-07-27 02:26:16 -05:00
|
|
|
)
|
|
|
|
|
|
|
|
var (
|
|
|
|
logger = log.New("live")
|
|
|
|
loggerCF = log.New("live.centrifuge")
|
|
|
|
)
|
|
|
|
|
2020-10-01 12:46:14 -05:00
|
|
|
// CoreGrafanaScope list of core features
|
|
|
|
type CoreGrafanaScope struct {
|
2020-10-22 02:10:26 -05:00
|
|
|
Features map[string]models.ChannelHandlerFactory
|
2020-10-01 12:46:14 -05:00
|
|
|
|
|
|
|
// The generic service to advertise dashboard changes
|
|
|
|
Dashboards models.DashboardActivityChannel
|
|
|
|
}
|
|
|
|
|
2020-07-27 02:26:16 -05:00
|
|
|
// GrafanaLive pretends to be the server
|
|
|
|
type GrafanaLive struct {
|
2020-10-01 12:46:14 -05:00
|
|
|
node *centrifuge.Node
|
|
|
|
|
|
|
|
// The websocket handler
|
2020-10-05 01:53:52 -05:00
|
|
|
WebsocketHandler interface{}
|
2020-10-01 12:46:14 -05:00
|
|
|
|
|
|
|
// Full channel handler
|
|
|
|
channels map[string]models.ChannelHandler
|
|
|
|
channelsMu sync.RWMutex
|
|
|
|
|
|
|
|
// The core internal features
|
|
|
|
GrafanaScope CoreGrafanaScope
|
2020-07-27 02:26:16 -05:00
|
|
|
}
|
|
|
|
|
2020-10-01 12:46:14 -05:00
|
|
|
// InitializeBroker initializes the broker and starts listening for requests.
|
|
|
|
func InitializeBroker() (*GrafanaLive, error) {
|
|
|
|
glive := &GrafanaLive{
|
|
|
|
channels: make(map[string]models.ChannelHandler),
|
|
|
|
channelsMu: sync.RWMutex{},
|
|
|
|
GrafanaScope: CoreGrafanaScope{
|
2020-10-22 02:10:26 -05:00
|
|
|
Features: make(map[string]models.ChannelHandlerFactory),
|
2020-10-01 12:46:14 -05:00
|
|
|
},
|
|
|
|
}
|
|
|
|
|
2020-07-27 02:26:16 -05:00
|
|
|
// We use default config here as starting point. Default config contains
|
|
|
|
// reasonable values for available options.
|
|
|
|
cfg := centrifuge.DefaultConfig
|
|
|
|
|
|
|
|
// cfg.LogLevel = centrifuge.LogLevelDebug
|
|
|
|
cfg.LogHandler = handleLog
|
|
|
|
|
2020-10-01 12:46:14 -05:00
|
|
|
// This function is called fast and often -- it must be sychronized
|
|
|
|
cfg.ChannelOptionsFunc = func(channel string) (centrifuge.ChannelOptions, bool, error) {
|
|
|
|
handler, err := glive.GetChannelHandler(channel)
|
|
|
|
if err != nil {
|
|
|
|
logger.Error("ChannelOptionsFunc", "channel", channel, "err", err)
|
|
|
|
if err.Error() == "404" { // ????
|
|
|
|
return centrifuge.ChannelOptions{}, false, nil
|
|
|
|
}
|
|
|
|
return centrifuge.ChannelOptions{}, true, err
|
|
|
|
}
|
|
|
|
opts := handler.GetChannelOptions(channel)
|
|
|
|
return opts, true, nil
|
|
|
|
}
|
|
|
|
|
2020-07-27 02:26:16 -05:00
|
|
|
// Node is the core object in Centrifuge library responsible for many useful
|
|
|
|
// things. For example Node allows to publish messages to channels from server
|
|
|
|
// side with its Publish method, but in this example we will publish messages
|
|
|
|
// only from client side.
|
|
|
|
node, err := centrifuge.New(cfg)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2020-10-01 12:46:14 -05:00
|
|
|
glive.node = node
|
2020-07-27 02:26:16 -05:00
|
|
|
|
2020-10-01 12:46:14 -05:00
|
|
|
// Initialize the main features
|
2020-10-22 02:10:26 -05:00
|
|
|
dash := &features.DashboardHandler{
|
|
|
|
Publisher: glive.Publish,
|
|
|
|
}
|
2020-10-01 12:46:14 -05:00
|
|
|
|
2020-10-22 02:10:26 -05:00
|
|
|
glive.GrafanaScope.Dashboards = dash
|
|
|
|
glive.GrafanaScope.Features["dashboard"] = dash
|
|
|
|
glive.GrafanaScope.Features["testdata"] = &features.TestDataSupplier{
|
|
|
|
Publisher: glive.Publish,
|
|
|
|
}
|
2020-10-01 12:46:14 -05:00
|
|
|
glive.GrafanaScope.Features["broadcast"] = &features.BroadcastRunner{}
|
2020-10-22 02:10:26 -05:00
|
|
|
glive.GrafanaScope.Features["measurements"] = &features.MeasurementsRunner{}
|
2020-07-27 02:26:16 -05:00
|
|
|
|
|
|
|
// Set ConnectHandler called when client successfully connected to Node. Your code
|
|
|
|
// inside handler must be synchronized since it will be called concurrently from
|
|
|
|
// different goroutines (belonging to different client connections). This is also
|
|
|
|
// true for other event handlers.
|
|
|
|
node.OnConnect(func(c *centrifuge.Client) {
|
|
|
|
// In our example transport will always be Websocket but it can also be SockJS.
|
|
|
|
transportName := c.Transport().Name()
|
|
|
|
|
|
|
|
// In our example clients connect with JSON protocol but it can also be Protobuf.
|
|
|
|
transportEncoding := c.Transport().Encoding()
|
|
|
|
logger.Debug("client connected", "transport", transportName, "encoding", transportEncoding)
|
|
|
|
})
|
|
|
|
|
2020-10-01 12:46:14 -05:00
|
|
|
// Set Disconnect handler to react on client disconnect events.
|
|
|
|
node.OnDisconnect(func(c *centrifuge.Client, e centrifuge.DisconnectEvent) {
|
|
|
|
logger.Info("client disconnected")
|
|
|
|
})
|
|
|
|
|
2020-07-27 02:26:16 -05:00
|
|
|
// Set SubscribeHandler to react on every channel subscription attempt
|
|
|
|
// initiated by client. Here you can theoretically return an error or
|
|
|
|
// disconnect client from server if needed. But now we just accept
|
|
|
|
// all subscriptions to all channels. In real life you may use a more
|
|
|
|
// complex permission check here.
|
|
|
|
node.OnSubscribe(func(c *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) {
|
2020-10-01 12:46:14 -05:00
|
|
|
reply := centrifuge.SubscribeReply{}
|
|
|
|
|
|
|
|
handler, err := glive.GetChannelHandler(e.Channel)
|
|
|
|
if err != nil {
|
|
|
|
return reply, err
|
2020-07-27 02:26:16 -05:00
|
|
|
}
|
2020-10-01 12:46:14 -05:00
|
|
|
|
|
|
|
err = handler.OnSubscribe(c, e)
|
2020-07-27 02:26:16 -05:00
|
|
|
if err != nil {
|
2020-10-01 12:46:14 -05:00
|
|
|
return reply, err
|
2020-07-27 02:26:16 -05:00
|
|
|
}
|
|
|
|
|
2020-10-01 12:46:14 -05:00
|
|
|
return reply, nil
|
2020-07-27 02:26:16 -05:00
|
|
|
})
|
|
|
|
|
|
|
|
node.OnUnsubscribe(func(c *centrifuge.Client, e centrifuge.UnsubscribeEvent) {
|
2020-10-01 12:46:14 -05:00
|
|
|
logger.Debug("unsubscribe from channel", "channel", e.Channel, "user", c.UserID())
|
2020-07-27 02:26:16 -05:00
|
|
|
})
|
|
|
|
|
2020-10-01 12:46:14 -05:00
|
|
|
// Called when something is written to the websocket
|
2020-07-27 02:26:16 -05:00
|
|
|
node.OnPublish(func(c *centrifuge.Client, e centrifuge.PublishEvent) (centrifuge.PublishReply, error) {
|
2020-10-01 12:46:14 -05:00
|
|
|
reply := centrifuge.PublishReply{}
|
|
|
|
handler, err := glive.GetChannelHandler(e.Channel)
|
|
|
|
if err != nil {
|
|
|
|
return reply, err
|
|
|
|
}
|
2020-07-27 02:26:16 -05:00
|
|
|
|
2020-10-01 12:46:14 -05:00
|
|
|
data, err := handler.OnPublish(c, e)
|
|
|
|
if err != nil {
|
|
|
|
return reply, err
|
|
|
|
}
|
|
|
|
if len(data) > 0 {
|
|
|
|
_, err = node.Publish(e.Channel, e.Data)
|
|
|
|
}
|
2020-07-27 02:26:16 -05:00
|
|
|
return centrifuge.PublishReply{}, err // returns an error if it could not publish
|
|
|
|
})
|
|
|
|
|
|
|
|
// Run node. This method does not block.
|
|
|
|
if err := node.Run(); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// SockJS will find the best protocol possible for the browser
|
|
|
|
sockJsPrefix := "/live/sockjs"
|
|
|
|
sockjsHandler := centrifuge.NewSockjsHandler(node, centrifuge.SockjsConfig{
|
|
|
|
HandlerPrefix: sockJsPrefix,
|
|
|
|
WebsocketReadBufferSize: 1024,
|
|
|
|
WebsocketWriteBufferSize: 1024,
|
|
|
|
})
|
|
|
|
|
|
|
|
// Use a direct websocket from go clients
|
|
|
|
wsHandler := centrifuge.NewWebsocketHandler(node, centrifuge.WebsocketConfig{
|
|
|
|
ReadBufferSize: 1024,
|
|
|
|
WriteBufferSize: 1024,
|
|
|
|
})
|
|
|
|
|
2020-10-05 01:53:52 -05:00
|
|
|
glive.WebsocketHandler = func(ctx *models.ReqContext) {
|
2020-09-15 02:01:14 -05:00
|
|
|
user := ctx.SignedInUser
|
|
|
|
if user == nil {
|
|
|
|
ctx.Resp.WriteHeader(401)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
dto := models.UserProfileDTO{
|
|
|
|
Id: user.UserId,
|
|
|
|
Name: user.Name,
|
|
|
|
Email: user.Email,
|
|
|
|
Login: user.Login,
|
|
|
|
IsGrafanaAdmin: user.IsGrafanaAdmin,
|
|
|
|
OrgId: user.OrgId,
|
|
|
|
}
|
|
|
|
|
|
|
|
jsonData, err := json.Marshal(dto)
|
|
|
|
if err != nil {
|
|
|
|
logger.Debug("error reading user", "dto", dto)
|
|
|
|
ctx.Resp.WriteHeader(404)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
logger.Info("Logged in user", "user", user)
|
|
|
|
|
2020-07-27 02:26:16 -05:00
|
|
|
cred := ¢rifuge.Credentials{
|
2020-09-15 02:01:14 -05:00
|
|
|
UserID: fmt.Sprintf("%d", user.UserId),
|
|
|
|
Info: jsonData,
|
2020-07-27 02:26:16 -05:00
|
|
|
}
|
|
|
|
newCtx := centrifuge.SetCredentials(ctx.Req.Context(), cred)
|
|
|
|
|
|
|
|
r := ctx.Req.Request
|
|
|
|
r = r.WithContext(newCtx) // Set a user ID
|
|
|
|
|
|
|
|
// Check if this is a direct websocket connection
|
2020-09-15 02:01:14 -05:00
|
|
|
path := ctx.Req.URL.Path
|
2020-07-27 02:26:16 -05:00
|
|
|
if strings.Contains(path, "live/ws") {
|
|
|
|
wsHandler.ServeHTTP(ctx.Resp, r)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
if strings.Contains(path, sockJsPrefix) {
|
|
|
|
sockjsHandler.ServeHTTP(ctx.Resp, r)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Unknown path
|
|
|
|
ctx.Resp.WriteHeader(404)
|
|
|
|
}
|
2020-10-01 12:46:14 -05:00
|
|
|
return glive, nil
|
2020-07-27 02:26:16 -05:00
|
|
|
}
|
|
|
|
|
2020-10-01 12:46:14 -05:00
|
|
|
// GetChannelHandler gives threadsafe access to the channel
|
|
|
|
func (g *GrafanaLive) GetChannelHandler(channel string) (models.ChannelHandler, error) {
|
|
|
|
g.channelsMu.RLock()
|
|
|
|
c, ok := g.channels[channel]
|
|
|
|
g.channelsMu.RUnlock() // defer? but then you can't lock further down
|
|
|
|
if ok {
|
|
|
|
return c, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Parse the identifier ${scope}/${namespace}/${path}
|
2020-10-22 02:10:26 -05:00
|
|
|
addr := ParseChannelAddress(channel)
|
|
|
|
if !addr.IsValid() {
|
|
|
|
return nil, fmt.Errorf("invalid channel: %q", channel)
|
2020-10-01 12:46:14 -05:00
|
|
|
}
|
2020-10-22 02:10:26 -05:00
|
|
|
logger.Info("initChannel", "channel", channel, "address", addr)
|
2020-10-01 12:46:14 -05:00
|
|
|
|
|
|
|
g.channelsMu.Lock()
|
|
|
|
defer g.channelsMu.Unlock()
|
|
|
|
c, ok = g.channels[channel] // may have filled in while locked
|
|
|
|
if ok {
|
|
|
|
return c, nil
|
2020-07-27 02:26:16 -05:00
|
|
|
}
|
2020-10-01 12:46:14 -05:00
|
|
|
|
2020-10-22 02:10:26 -05:00
|
|
|
getter, err := g.GetChannelHandlerFactory(addr.Scope, addr.Namespace)
|
2020-10-01 12:46:14 -05:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2020-10-22 02:10:26 -05:00
|
|
|
|
|
|
|
// First access will initialize
|
|
|
|
c, err = getter.GetHandlerForPath(addr.Path)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2020-10-01 12:46:14 -05:00
|
|
|
g.channels[channel] = c
|
|
|
|
return c, nil
|
|
|
|
}
|
|
|
|
|
2020-10-22 02:10:26 -05:00
|
|
|
// GetChannelHandlerFactory gets a ChannelHandlerFactory for a namespace.
|
|
|
|
// It gives threadsafe access to the channel.
|
|
|
|
func (g *GrafanaLive) GetChannelHandlerFactory(scope string, name string) (models.ChannelHandlerFactory, error) {
|
|
|
|
if scope == "grafana" {
|
|
|
|
p, ok := g.GrafanaScope.Features[name]
|
2020-10-01 12:46:14 -05:00
|
|
|
if ok {
|
2020-10-22 02:10:26 -05:00
|
|
|
return p, nil
|
2020-10-01 12:46:14 -05:00
|
|
|
}
|
2020-10-22 02:10:26 -05:00
|
|
|
return nil, fmt.Errorf("unknown feature: %q", name)
|
2020-10-01 12:46:14 -05:00
|
|
|
}
|
|
|
|
|
2020-10-22 02:10:26 -05:00
|
|
|
if scope == "ds" {
|
|
|
|
return nil, fmt.Errorf("todo... look up datasource: %q", name)
|
2020-10-01 12:46:14 -05:00
|
|
|
}
|
|
|
|
|
2020-10-22 02:10:26 -05:00
|
|
|
if scope == "plugin" {
|
|
|
|
p, ok := plugins.Plugins[name]
|
2020-10-01 12:46:14 -05:00
|
|
|
if ok {
|
|
|
|
h := &PluginHandler{
|
|
|
|
Plugin: p,
|
|
|
|
}
|
2020-10-22 02:10:26 -05:00
|
|
|
return h, nil
|
2020-10-01 12:46:14 -05:00
|
|
|
}
|
2020-10-22 02:10:26 -05:00
|
|
|
return nil, fmt.Errorf("unknown plugin: %q", name)
|
2020-10-01 12:46:14 -05:00
|
|
|
}
|
|
|
|
|
2020-10-22 02:10:26 -05:00
|
|
|
return nil, fmt.Errorf("invalid scope: %q", scope)
|
2020-10-01 12:46:14 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
// Publish sends the data to the channel without checking permissions etc
|
|
|
|
func (g *GrafanaLive) Publish(channel string, data []byte) error {
|
|
|
|
_, err := g.node.Publish(channel, data)
|
|
|
|
return err
|
2020-07-27 02:26:16 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
// Write to the standard log15 logger
|
|
|
|
func handleLog(msg centrifuge.LogEntry) {
|
|
|
|
arr := make([]interface{}, 0)
|
|
|
|
for k, v := range msg.Fields {
|
|
|
|
if v == nil {
|
|
|
|
v = "<nil>"
|
|
|
|
} else if v == "" {
|
|
|
|
v = "<empty>"
|
|
|
|
}
|
|
|
|
arr = append(arr, k, v)
|
|
|
|
}
|
|
|
|
|
|
|
|
switch msg.Level {
|
|
|
|
case centrifuge.LogLevelDebug:
|
|
|
|
loggerCF.Debug(msg.Message, arr...)
|
|
|
|
case centrifuge.LogLevelError:
|
|
|
|
loggerCF.Error(msg.Message, arr...)
|
|
|
|
case centrifuge.LogLevelInfo:
|
|
|
|
loggerCF.Info(msg.Message, arr...)
|
|
|
|
}
|
|
|
|
}
|