Live: include a streaming event manager (#26537)

This commit is contained in:
Ryan McKinley
2020-07-27 00:26:16 -07:00
committed by GitHub
parent 3defb4441e
commit 339138d61a
23 changed files with 895 additions and 432 deletions

View File

@@ -56,6 +56,7 @@ func (hs *HTTPServer) registerRoutes() {
r.Get("/admin/orgs", reqGrafanaAdmin, hs.Index)
r.Get("/admin/orgs/edit/:id", reqGrafanaAdmin, hs.Index)
r.Get("/admin/stats", reqGrafanaAdmin, hs.Index)
r.Get("/admin/live", reqGrafanaAdmin, hs.Index)
r.Get("/admin/ldap", reqGrafanaAdmin, hs.Index)
r.Get("/styleguide", reqSignedIn, hs.Index)
@@ -422,11 +423,10 @@ func (hs *HTTPServer) registerRoutes() {
avatarCacheServer := avatar.NewCacheServer()
r.Get("/avatar/:hash", avatarCacheServer.Handler)
// Websocket
r.Any("/ws", hs.streamManager.Serve)
// streams
//r.Post("/api/streams/push", reqSignedIn, bind(dtos.StreamMessage{}), liveConn.PushToStream)
// Live streaming
if hs.Live != nil {
r.Any("/live/*", hs.Live.Handler)
}
// Snapshots
r.Post("/api/snapshots/", reqSnapshotPublicModeOrSignedIn, bind(models.CreateDashboardSnapshotCommand{}), CreateDashboardSnapshot)

View File

@@ -10,11 +10,11 @@ import (
"path"
"sync"
"github.com/grafana/grafana/pkg/services/live"
"github.com/grafana/grafana/pkg/services/search"
"github.com/grafana/grafana/pkg/plugins/backendplugin"
"github.com/grafana/grafana/pkg/api/live"
"github.com/grafana/grafana/pkg/api/routing"
httpstatic "github.com/grafana/grafana/pkg/api/static"
"github.com/grafana/grafana/pkg/bus"
@@ -48,12 +48,11 @@ func init() {
}
type HTTPServer struct {
log log.Logger
macaron *macaron.Macaron
context context.Context
streamManager *live.StreamManager
httpSrv *http.Server
middlewares []macaron.Handler
log log.Logger
macaron *macaron.Macaron
context context.Context
httpSrv *http.Server
middlewares []macaron.Handler
RouteRegister routing.RouteRegister `inject:""`
Bus bus.Bus `inject:""`
@@ -71,12 +70,25 @@ type HTTPServer struct {
BackendPluginManager backendplugin.Manager `inject:""`
PluginManager *plugins.PluginManager `inject:""`
SearchService *search.SearchService `inject:""`
Live *live.GrafanaLive
}
func (hs *HTTPServer) Init() error {
hs.log = log.New("http.server")
hs.streamManager = live.NewStreamManager()
// Set up a websocket broker
if hs.Cfg.IsLiveEnabled() { // feature flag
node, err := live.InitalizeBroker()
if err != nil {
return err
}
hs.Live = node
// Spit random walk to example
go live.RunRandomCSV(hs.Live, "random-2s-stream", 2000, 0)
go live.RunRandomCSV(hs.Live, "random-flakey-stream", 400, .6)
}
hs.macaron = hs.newMacaron()
hs.registerRoutes()
@@ -91,7 +103,6 @@ func (hs *HTTPServer) Run(ctx context.Context) error {
hs.context = ctx
hs.applyRoutes()
hs.streamManager.Run(ctx)
hs.httpSrv = &http.Server{
Addr: fmt.Sprintf("%s:%s", setting.HttpAddr, setting.HttpPort),

View File

@@ -342,6 +342,12 @@ func (hs *HTTPServer) setIndexViewData(c *models.ReqContext) (*dtos.IndexViewDat
{Text: "Stats", Id: "server-stats", Url: setting.AppSubUrl + "/admin/stats", Icon: "graph-bar"},
}
if hs.Live != nil {
adminNavLinks = append(adminNavLinks, &dtos.NavLink{
Text: "Live", Id: "live", Url: setting.AppSubUrl + "/admin/live", Icon: "water",
})
}
if setting.LDAPEnabled {
adminNavLinks = append(adminNavLinks, &dtos.NavLink{
Text: "LDAP", Id: "ldap", Url: setting.AppSubUrl + "/admin/ldap", Icon: "book",

View File

@@ -1,130 +0,0 @@
package live
import (
"net/http"
"time"
"github.com/gorilla/websocket"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/log"
)
const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
// Maximum message size allowed from peer.
maxMessageSize = 512
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}
type connection struct {
hub *hub
ws *websocket.Conn
send chan []byte
log log.Logger
}
func newConnection(ws *websocket.Conn, hub *hub, logger log.Logger) *connection {
return &connection{
hub: hub,
send: make(chan []byte, 256),
ws: ws,
log: logger,
}
}
func (c *connection) readPump() {
defer func() {
c.hub.unregister <- c
c.ws.Close()
}()
c.ws.SetReadLimit(maxMessageSize)
if err := c.ws.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
c.log.Warn("Setting read deadline failed", "err", err)
}
c.ws.SetPongHandler(func(string) error {
return c.ws.SetReadDeadline(time.Now().Add(pongWait))
})
for {
_, message, err := c.ws.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
c.log.Info("error", "err", err)
}
break
}
c.handleMessage(message)
}
}
func (c *connection) handleMessage(message []byte) {
json, err := simplejson.NewJson(message)
if err != nil {
log.Errorf(3, "Unreadable message on websocket channel. error: %v", err)
}
msgType := json.Get("action").MustString()
streamName := json.Get("stream").MustString()
if len(streamName) == 0 {
log.Errorf(3, "Not allowed to subscribe to empty stream name")
return
}
switch msgType {
case "subscribe":
c.hub.subChannel <- &streamSubscription{name: streamName, conn: c}
case "unsubscribe":
c.hub.subChannel <- &streamSubscription{name: streamName, conn: c, remove: true}
}
}
func (c *connection) write(mt int, payload []byte) error {
if err := c.ws.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
return err
}
return c.ws.WriteMessage(mt, payload)
}
// writePump pumps messages from the hub to the websocket connection.
func (c *connection) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.ws.Close()
}()
for {
select {
case message, ok := <-c.send:
if !ok {
if err := c.write(websocket.CloseMessage, []byte{}); err != nil {
c.log.Warn("Failed to write close message to connection", "err", err)
}
return
}
if err := c.write(websocket.TextMessage, message); err != nil {
return
}
case <-ticker.C:
if err := c.write(websocket.PingMessage, []byte{}); err != nil {
return
}
}
}
}

View File

@@ -1,99 +0,0 @@
package live
import (
"context"
"github.com/grafana/grafana/pkg/api/dtos"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/log"
)
type hub struct {
log log.Logger
connections map[*connection]bool
streams map[string]map[*connection]bool
register chan *connection
unregister chan *connection
streamChannel chan *dtos.StreamMessage
subChannel chan *streamSubscription
}
type streamSubscription struct {
conn *connection
name string
remove bool
}
func newHub() *hub {
return &hub{
connections: make(map[*connection]bool),
streams: make(map[string]map[*connection]bool),
register: make(chan *connection),
unregister: make(chan *connection),
streamChannel: make(chan *dtos.StreamMessage),
subChannel: make(chan *streamSubscription),
log: log.New("stream.hub"),
}
}
func (h *hub) run(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case c := <-h.register:
h.connections[c] = true
h.log.Info("New connection", "total", len(h.connections))
case c := <-h.unregister:
if _, ok := h.connections[c]; ok {
h.log.Info("Closing connection", "total", len(h.connections))
delete(h.connections, c)
close(c.send)
}
// hand stream subscriptions
case sub := <-h.subChannel:
h.log.Info("Subscribing", "channel", sub.name, "remove", sub.remove)
subscribers, exists := h.streams[sub.name]
// handle unsubscribe
if exists && sub.remove {
delete(subscribers, sub.conn)
continue
}
if !exists {
subscribers = make(map[*connection]bool)
h.streams[sub.name] = subscribers
}
subscribers[sub.conn] = true
// handle stream messages
case message := <-h.streamChannel:
subscribers, exists := h.streams[message.Stream]
if !exists || len(subscribers) == 0 {
h.log.Info("Message to stream without subscribers", "stream", message.Stream)
continue
}
messageBytes, _ := simplejson.NewFromAny(message).Encode()
for sub := range subscribers {
// check if channel is open
if _, ok := h.connections[sub]; !ok {
delete(subscribers, sub)
continue
}
select {
case sub.send <- messageBytes:
default:
close(sub.send)
delete(h.connections, sub)
delete(subscribers, sub)
}
}
}
}
}

View File

@@ -1,102 +0,0 @@
package live
import (
"context"
"net/http"
"sync"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
)
type StreamManager struct {
log log.Logger
streams map[string]*Stream
streamRWMutex *sync.RWMutex
hub *hub
}
func NewStreamManager() *StreamManager {
return &StreamManager{
hub: newHub(),
log: log.New("stream.manager"),
streams: make(map[string]*Stream),
streamRWMutex: &sync.RWMutex{},
}
}
func (sm *StreamManager) Run(context context.Context) {
log.Debugf("Initializing Stream Manager")
go func() {
sm.hub.run(context)
log.Infof("Stopped Stream Manager")
}()
}
func (sm *StreamManager) Serve(w http.ResponseWriter, r *http.Request) {
sm.log.Info("Upgrading to WebSocket")
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
sm.log.Error("Failed to upgrade connection to WebSocket", "error", err)
return
}
c := newConnection(ws, sm.hub, sm.log)
sm.hub.register <- c
go c.writePump()
c.readPump()
}
func (s *StreamManager) GetStreamList() models.StreamList {
list := make(models.StreamList, 0)
for _, stream := range s.streams {
list = append(list, &models.StreamInfo{
Name: stream.name,
})
}
return list
}
func (s *StreamManager) Push(packet *models.StreamPacket) {
stream, exist := s.streams[packet.Stream]
if !exist {
s.log.Info("Creating metric stream", "name", packet.Stream)
stream = NewStream(packet.Stream)
s.streams[stream.name] = stream
}
stream.Push(packet)
}
type Stream struct {
subscribers []*connection
name string
}
func NewStream(name string) *Stream {
return &Stream{
subscribers: make([]*connection, 0),
name: name,
}
}
func (s *Stream) Push(packet *models.StreamPacket) {
messageBytes, _ := simplejson.NewFromAny(packet).Encode()
for _, sub := range s.subscribers {
// check if channel is open
// if _, ok := h.connections[sub]; !ok {
// delete(s.subscribers, sub)
// continue
// }
sub.send <- messageBytes
}
}

View File

@@ -1,29 +0,0 @@
package models
import (
"github.com/grafana/grafana/pkg/components/null"
)
type TimePoint [2]null.Float
type TimeSeriesPoints []TimePoint
type StreamPacket struct {
Stream string `json:"stream"`
Series []StreamSeries `json:"series"`
}
type StreamSeries struct {
Name string `json:"name"`
Points TimeSeriesPoints `json:"points"`
}
type StreamInfo struct {
Name string
}
type StreamList []*StreamInfo
type StreamManager interface {
GetStreamList() StreamList
Push(data *StreamPacket)
}

View File

@@ -0,0 +1,54 @@
package live
import (
"encoding/json"
"math/rand"
"time"
)
// channelInfo holds metadata about each channel and is returned on connection.
// Eventually each plugin should control exactly what is in this structure.
type channelInfo struct {
Description string
}
type randomWalkMessage struct {
Time int64
Value float64
Min float64
Max float64
}
// RunRandomCSV just for an example
func RunRandomCSV(broker *GrafanaLive, channel string, speedMillis int, dropPercent float64) {
spread := 50.0
walker := rand.Float64() * 100
ticker := time.NewTicker(time.Duration(speedMillis) * time.Millisecond)
line := randomWalkMessage{}
for t := range ticker.C {
if rand.Float64() <= dropPercent {
continue //
}
delta := rand.Float64() - 0.5
walker += delta
line.Time = t.UnixNano() / int64(time.Millisecond)
line.Value = walker
line.Min = walker - ((rand.Float64() * spread) + 0.01)
line.Max = walker + ((rand.Float64() * spread) + 0.01)
bytes, err := json.Marshal(&line)
if err != nil {
logger.Warn("unable to marshal line", "error", err)
continue
}
v := broker.Publish(channel, bytes)
if !v {
logger.Warn("write", "channel", channel, "line", line, "ok", v)
}
}
}

192
pkg/services/live/live.go Normal file
View File

@@ -0,0 +1,192 @@
package live
import (
"encoding/json"
"fmt"
"strings"
"github.com/centrifugal/centrifuge"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
)
var (
logger = log.New("live")
loggerCF = log.New("live.centrifuge")
)
// GrafanaLive pretends to be the server
type GrafanaLive struct {
node *centrifuge.Node
Handler interface{} // handler func
}
// InitalizeBroker initializes the broker and starts listening for requests.
func InitalizeBroker() (*GrafanaLive, error) {
// We use default config here as starting point. Default config contains
// reasonable values for available options.
cfg := centrifuge.DefaultConfig
// cfg.LogLevel = centrifuge.LogLevelDebug
cfg.LogHandler = handleLog
// Node is the core object in Centrifuge library responsible for many useful
// things. For example Node allows to publish messages to channels from server
// side with its Publish method, but in this example we will publish messages
// only from client side.
node, err := centrifuge.New(cfg)
if err != nil {
return nil, err
}
b := &GrafanaLive{
node: node,
}
// Set ConnectHandler called when client successfully connected to Node. Your code
// inside handler must be synchronized since it will be called concurrently from
// different goroutines (belonging to different client connections). This is also
// true for other event handlers.
node.OnConnect(func(c *centrifuge.Client) {
// In our example transport will always be Websocket but it can also be SockJS.
transportName := c.Transport().Name()
// In our example clients connect with JSON protocol but it can also be Protobuf.
transportEncoding := c.Transport().Encoding()
logger.Debug("client connected", "transport", transportName, "encoding", transportEncoding)
})
// Set SubscribeHandler to react on every channel subscription attempt
// initiated by client. Here you can theoretically return an error or
// disconnect client from server if needed. But now we just accept
// all subscriptions to all channels. In real life you may use a more
// complex permission check here.
node.OnSubscribe(func(c *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) {
info := &channelInfo{
Description: fmt.Sprintf("channel: %s", e.Channel),
}
bytes, err := json.Marshal(&info)
if err != nil {
return centrifuge.SubscribeReply{}, err
}
logger.Debug("client subscribes on channel", "channel", e.Channel, "info", string(bytes))
return centrifuge.SubscribeReply{
ExpireAt: 0, // does not expire
ChannelInfo: bytes,
}, nil
})
node.OnUnsubscribe(func(c *centrifuge.Client, e centrifuge.UnsubscribeEvent) {
s, err := node.PresenceStats(e.Channel)
if err != nil {
logger.Warn("unable to get presence stats", "channel", e.Channel, "error", err)
}
logger.Debug("unsubscribe from channel", "channel", e.Channel, "clients", s.NumClients, "users", s.NumUsers)
})
// By default, clients can not publish messages into channels. By setting
// PublishHandler we tell Centrifuge that publish from client side is possible.
// Now each time client calls publish method this handler will be called and
// you have a possibility to validate publication request before message will
// be published into channel and reach active subscribers. In our simple chat
// app we allow everyone to publish into any channel.
node.OnPublish(func(c *centrifuge.Client, e centrifuge.PublishEvent) (centrifuge.PublishReply, error) {
// logger.Debug("client publishes into channel", "channel", e.Channel, "body", string(e.Data))
// For now, broadcast any messages to everyone
_, err := node.Publish(e.Channel, e.Data)
return centrifuge.PublishReply{}, err // returns an error if it could not publish
})
// Set Disconnect handler to react on client disconnect events.
node.OnDisconnect(func(c *centrifuge.Client, e centrifuge.DisconnectEvent) {
logger.Info("client disconnected")
})
// Run node. This method does not block.
if err := node.Run(); err != nil {
return nil, err
}
// SockJS will find the best protocol possible for the browser
sockJsPrefix := "/live/sockjs"
sockjsHandler := centrifuge.NewSockjsHandler(node, centrifuge.SockjsConfig{
HandlerPrefix: sockJsPrefix,
WebsocketReadBufferSize: 1024,
WebsocketWriteBufferSize: 1024,
})
// Use a direct websocket from go clients
wsHandler := centrifuge.NewWebsocketHandler(node, centrifuge.WebsocketConfig{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
})
b.Handler = func(ctx *models.ReqContext) {
// Put authentication Credentials into request Context. Since we don't
// have any session backend here we simply set user ID as empty string.
// Users with empty ID called anonymous users, in real app you should
// decide whether anonymous users allowed to connect to your server
// or not. There is also another way to set Credentials - returning them
// from ConnectingHandler which is called after client sent first command
// to server called Connect. See _examples folder in repo to find real-life
// auth samples (OAuth2, Gin sessions, JWT etc).
cred := &centrifuge.Credentials{
UserID: "",
}
newCtx := centrifuge.SetCredentials(ctx.Req.Context(), cred)
path := ctx.Req.URL.Path
logger.Debug("Handle", "path", path)
r := ctx.Req.Request
r = r.WithContext(newCtx) // Set a user ID
// Check if this is a direct websocket connection
if strings.Contains(path, "live/ws") {
wsHandler.ServeHTTP(ctx.Resp, r)
return
}
if strings.Contains(path, sockJsPrefix) {
sockjsHandler.ServeHTTP(ctx.Resp, r)
return
}
// Unknown path
ctx.Resp.WriteHeader(404)
}
return b, nil
}
// Publish sends the data to the channel
func (b *GrafanaLive) Publish(channel string, data []byte) bool {
_, err := b.node.Publish(channel, data)
if err != nil {
logger.Warn("error writing to channel", "channel", channel, "err", err)
}
return err == nil
}
// Write to the standard log15 logger
func handleLog(msg centrifuge.LogEntry) {
arr := make([]interface{}, 0)
for k, v := range msg.Fields {
if v == nil {
v = "<nil>"
} else if v == "" {
v = "<empty>"
}
arr = append(arr, k, v)
}
switch msg.Level {
case centrifuge.LogLevelDebug:
loggerCF.Debug(msg.Message, arr...)
case centrifuge.LogLevelError:
loggerCF.Error(msg.Message, arr...)
case centrifuge.LogLevelInfo:
loggerCF.Info(msg.Message, arr...)
}
}

View File

@@ -313,6 +313,11 @@ func (c Cfg) IsStandaloneAlertsEnabled() bool {
return c.FeatureToggles["standaloneAlerts"]
}
// IsLiveEnabled returns if grafana live should be enabled
func (c Cfg) IsLiveEnabled() bool {
return c.FeatureToggles["live"]
}
type CommandLineArgs struct {
Config string
HomePath string