Live: pure websocket push endpoint (#33339)

This commit is contained in:
Alexander Emelin 2021-04-26 13:17:49 +03:00 committed by GitHub
parent 7f53dfad88
commit 7501a2deb6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 274 additions and 33 deletions

1
go.mod
View File

@ -44,6 +44,7 @@ require (
github.com/golang/mock v1.5.0
github.com/google/go-cmp v0.5.5
github.com/google/uuid v1.2.0
github.com/gorilla/websocket v1.4.2
github.com/gosimple/slug v1.9.0
github.com/grafana/grafana-aws-sdk v0.4.0
github.com/grafana/grafana-live-sdk v0.0.5

View File

@ -36,7 +36,7 @@ import (
"github.com/grafana/grafana/pkg/services/hooks"
"github.com/grafana/grafana/pkg/services/librarypanels"
"github.com/grafana/grafana/pkg/services/live"
"github.com/grafana/grafana/pkg/services/live/push"
"github.com/grafana/grafana/pkg/services/live/pushhttp"
"github.com/grafana/grafana/pkg/services/login"
"github.com/grafana/grafana/pkg/services/provisioning"
"github.com/grafana/grafana/pkg/services/quota"
@ -90,7 +90,7 @@ type HTTPServer struct {
SearchService *search.SearchService `inject:""`
ShortURLService *shorturls.ShortURLService `inject:""`
Live *live.GrafanaLive `inject:""`
LivePushGateway *push.Gateway `inject:""`
LivePushGateway *pushhttp.Gateway `inject:""`
ContextHandler *contexthandler.ContextHandler `inject:""`
SQLStore *sqlstore.SQLStore `inject:""`
LibraryPanelService *librarypanels.LibraryPanelService `inject:""`

View File

@ -1,23 +1,30 @@
package live
package demultiplexer
import (
"context"
"errors"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/live/convert"
"github.com/grafana/grafana/pkg/services/live/livecontext"
"github.com/grafana/grafana/pkg/services/live/managedstream"
"github.com/grafana/grafana/pkg/services/live/pushurl"
"github.com/grafana/grafana-plugin-sdk-go/backend"
)
var (
logger = log.New("live.push_ws")
)
type Demultiplexer struct {
streamID string
managedStreamRunner *ManagedStreamRunner
managedStreamRunner *managedstream.Runner
converter *convert.Converter
}
func NewDemultiplexer(streamID string, managedStreamRunner *ManagedStreamRunner) *Demultiplexer {
func New(streamID string, managedStreamRunner *managedstream.Runner) *Demultiplexer {
return &Demultiplexer{
streamID: streamID,
managedStreamRunner: managedStreamRunner,
@ -34,7 +41,7 @@ func (s *Demultiplexer) OnSubscribe(_ context.Context, _ *models.SignedInUser, _
}
func (s *Demultiplexer) OnPublish(ctx context.Context, _ *models.SignedInUser, evt models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) {
urlValues, ok := getContextValues(ctx)
urlValues, ok := livecontext.GetContextValues(ctx)
if !ok {
return models.PublishReply{}, 0, errors.New("error extracting context url values")
}

View File

@ -17,7 +17,11 @@ import (
"github.com/grafana/grafana/pkg/plugins/plugincontext"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/live/demultiplexer"
"github.com/grafana/grafana/pkg/services/live/features"
"github.com/grafana/grafana/pkg/services/live/livecontext"
"github.com/grafana/grafana/pkg/services/live/managedstream"
"github.com/grafana/grafana/pkg/services/live/pushws"
"github.com/grafana/grafana/pkg/services/live/runstream"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch"
@ -63,8 +67,9 @@ type GrafanaLive struct {
node *centrifuge.Node
// The websocket handler
WebsocketHandler interface{}
// Websocket handlers
websocketHandler interface{}
pushWebsocketHandler interface{}
// Full channel handler
channels map[string]models.ChannelHandler
@ -73,7 +78,7 @@ type GrafanaLive struct {
// The core internal features
GrafanaScope CoreGrafanaScope
ManagedStreamRunner *ManagedStreamRunner
ManagedStreamRunner *managedstream.Runner
contextGetter *pluginContextGetter
runStreamManager *runstream.Manager
@ -140,7 +145,7 @@ func (g *GrafanaLive) Init() error {
g.GrafanaScope.Features["dashboard"] = dash
g.GrafanaScope.Features["broadcast"] = &features.BroadcastRunner{}
g.ManagedStreamRunner = NewManagedStreamRunner(g.Publish)
g.ManagedStreamRunner = managedstream.NewRunner(g.Publish)
// Set ConnectHandler called when client successfully connected to Node. Your code
// inside handler must be synchronized since it will be called concurrently from
@ -152,7 +157,7 @@ func (g *GrafanaLive) Init() error {
client.OnSubscribe(func(e centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) {
logger.Debug("Client wants to subscribe", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
user, ok := getContextSignedUser(client.Context())
user, ok := livecontext.GetContextSignedUser(client.Context())
if !ok {
logger.Error("Unauthenticated live connection")
cb(centrifuge.SubscribeReply{}, centrifuge.ErrorInternal)
@ -196,7 +201,7 @@ func (g *GrafanaLive) Init() error {
// allows some simple prototypes to work quickly.
client.OnPublish(func(e centrifuge.PublishEvent, cb centrifuge.PublishCallback) {
logger.Debug("Client wants to publish", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
user, ok := getContextSignedUser(client.Context())
user, ok := livecontext.GetContextSignedUser(client.Context())
if !ok {
logger.Error("Unauthenticated live connection")
cb(centrifuge.PublishReply{}, centrifuge.ErrorInternal)
@ -260,7 +265,12 @@ func (g *GrafanaLive) Init() error {
WriteBufferSize: 1024,
})
g.WebsocketHandler = func(ctx *models.ReqContext) {
pushWSHandler := pushws.NewHandler(g.ManagedStreamRunner, pushws.Config{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
})
g.websocketHandler = func(ctx *models.ReqContext) {
user := ctx.SignedInUser
if user == nil {
ctx.Resp.WriteHeader(401)
@ -272,8 +282,8 @@ func (g *GrafanaLive) Init() error {
UserID: fmt.Sprintf("%d", user.UserId),
}
newCtx := centrifuge.SetCredentials(ctx.Req.Context(), cred)
newCtx = setContextSignedUser(newCtx, user)
newCtx = setContextValues(newCtx, ctx.Req.URL.Query())
newCtx = livecontext.SetContextSignedUser(newCtx, user)
newCtx = livecontext.SetContextValues(newCtx, ctx.Req.URL.Query())
r := ctx.Req.Request
r = r.WithContext(newCtx) // Set a user ID.
@ -281,8 +291,29 @@ func (g *GrafanaLive) Init() error {
wsHandler.ServeHTTP(ctx.Resp, r)
}
g.RouteRegister.Get("/live/ws", g.WebsocketHandler)
g.pushWebsocketHandler = func(ctx *models.ReqContext) {
user := ctx.SignedInUser
if user == nil {
ctx.Resp.WriteHeader(401)
return
}
// Centrifuge expects Credentials in context with a current user ID.
cred := &centrifuge.Credentials{
UserID: fmt.Sprintf("%d", user.UserId),
}
newCtx := centrifuge.SetCredentials(ctx.Req.Context(), cred)
newCtx = livecontext.SetContextSignedUser(newCtx, user)
newCtx = livecontext.SetContextValues(newCtx, ctx.Req.URL.Query())
r := ctx.Req.Request
r = r.WithContext(newCtx) // Set a user ID.
pushWSHandler.ServeHTTP(ctx.Resp, r)
}
g.RouteRegister.Get("/live/ws", g.websocketHandler)
g.RouteRegister.Get("/live/push", g.pushWebsocketHandler)
return nil
}
@ -402,7 +433,7 @@ func (g *GrafanaLive) handleStreamScope(_ *models.SignedInUser, namespace string
}
func (g *GrafanaLive) handlePushScope(_ *models.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) {
return NewDemultiplexer(namespace, g.ManagedStreamRunner), nil
return demultiplexer.New(namespace, g.ManagedStreamRunner), nil
}
func (g *GrafanaLive) handleDatasourceScope(user *models.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) {

View File

@ -1,4 +1,4 @@
package live
package livecontext
import (
"context"
@ -11,12 +11,12 @@ type signedUserContextKeyType int
var signedUserContextKey signedUserContextKeyType
func setContextSignedUser(ctx context.Context, user *models.SignedInUser) context.Context {
func SetContextSignedUser(ctx context.Context, user *models.SignedInUser) context.Context {
ctx = context.WithValue(ctx, signedUserContextKey, user)
return ctx
}
func getContextSignedUser(ctx context.Context) (*models.SignedInUser, bool) {
func GetContextSignedUser(ctx context.Context) (*models.SignedInUser, bool) {
if val := ctx.Value(signedUserContextKey); val != nil {
user, ok := val.(*models.SignedInUser)
return user, ok
@ -26,12 +26,12 @@ func getContextSignedUser(ctx context.Context) (*models.SignedInUser, bool) {
type valuesContextKey struct{}
func setContextValues(ctx context.Context, values url.Values) context.Context {
func SetContextValues(ctx context.Context, values url.Values) context.Context {
ctx = context.WithValue(ctx, valuesContextKey{}, values)
return ctx
}
func getContextValues(ctx context.Context) (url.Values, bool) {
func GetContextValues(ctx context.Context) (url.Values, bool) {
if val := ctx.Value(valuesContextKey{}); val != nil {
values, ok := val.(url.Values)
return values, ok

View File

@ -1,4 +1,4 @@
package live
package managedstream
import (
"context"
@ -9,27 +9,32 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/live"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/util"
)
// ManagedStreamRunner keeps ManagedStream per streamID.
type ManagedStreamRunner struct {
var (
logger = log.New("live.managed_stream")
)
// Runner keeps ManagedStream per streamID.
type Runner struct {
mu sync.RWMutex
streams map[string]*ManagedStream
publisher models.ChannelPublisher
}
// NewManagedStreamRunner creates new ManagedStreamRunner.
func NewManagedStreamRunner(publisher models.ChannelPublisher) *ManagedStreamRunner {
return &ManagedStreamRunner{
// NewRunner creates new Runner.
func NewRunner(publisher models.ChannelPublisher) *Runner {
return &Runner{
publisher: publisher,
streams: map[string]*ManagedStream{},
}
}
// Streams returns a map of active managed streams (per streamID).
func (r *ManagedStreamRunner) Streams() map[string]*ManagedStream {
func (r *Runner) Streams() map[string]*ManagedStream {
r.mu.RLock()
defer r.mu.RUnlock()
streams := make(map[string]*ManagedStream, len(r.streams))
@ -41,7 +46,7 @@ func (r *ManagedStreamRunner) Streams() map[string]*ManagedStream {
// GetOrCreateStream -- for now this will create new manager for each key.
// Eventually, the stream behavior will need to be configured explicitly
func (r *ManagedStreamRunner) GetOrCreateStream(streamID string) (*ManagedStream, error) {
func (r *Runner) GetOrCreateStream(streamID string) (*ManagedStream, error) {
r.mu.Lock()
defer r.mu.Unlock()
s, ok := r.streams[streamID]

View File

@ -1,4 +1,4 @@
package live
package managedstream
import (
"testing"

View File

@ -1,4 +1,4 @@
package push
package pushhttp
import (
"context"
@ -15,7 +15,7 @@ import (
)
var (
logger = log.New("live_push")
logger = log.New("live.push_http")
)
func init() {

View File

@ -0,0 +1,197 @@
package pushws
import (
"fmt"
"net/http"
"net/url"
"strings"
"time"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/live/convert"
"github.com/grafana/grafana/pkg/services/live/managedstream"
"github.com/grafana/grafana/pkg/services/live/pushurl"
"github.com/gorilla/websocket"
)
var (
logger = log.New("live.push_ws")
)
// Handler handles WebSocket client connections that push data to Live.
type Handler struct {
managedStreamRunner *managedstream.Runner
config Config
upgrade *websocket.Upgrader
converter *convert.Converter
}
// Config represents config for Handler.
type Config struct {
// ReadBufferSize is a parameter that is used for raw websocket Upgrader.
// If set to zero reasonable default value will be used.
ReadBufferSize int
// WriteBufferSize is a parameter that is used for raw websocket Upgrader.
// If set to zero reasonable default value will be used.
WriteBufferSize int
// MessageSizeLimit sets the maximum size in bytes of allowed message from client.
// By default DefaultWebsocketMessageSizeLimit will be used.
MessageSizeLimit int
// CheckOrigin func to provide custom origin check logic,
// zero value means same host check.
CheckOrigin func(r *http.Request) bool
// PingInterval sets interval server will send ping messages to clients.
// By default DefaultWebsocketPingInterval will be used.
PingInterval time.Duration
}
// NewHandler creates new Handler.
func NewHandler(managedStreamRunner *managedstream.Runner, c Config) *Handler {
if c.CheckOrigin == nil {
c.CheckOrigin = sameHostOriginCheck()
}
upgrade := &websocket.Upgrader{
ReadBufferSize: c.ReadBufferSize,
WriteBufferSize: c.WriteBufferSize,
CheckOrigin: c.CheckOrigin,
}
return &Handler{
managedStreamRunner: managedStreamRunner,
config: c,
upgrade: upgrade,
converter: convert.NewConverter(),
}
}
func sameHostOriginCheck() func(r *http.Request) bool {
return func(r *http.Request) bool {
err := checkSameHost(r)
if err != nil {
log.Warn("Origin check failure", "origin", r.Header.Get("origin"), "error", err)
return false
}
return true
}
}
func checkSameHost(r *http.Request) error {
origin := r.Header.Get("Origin")
if origin == "" {
return nil
}
u, err := url.Parse(origin)
if err != nil {
return fmt.Errorf("failed to parse Origin header %q: %w", origin, err)
}
if strings.EqualFold(r.Host, u.Host) {
return nil
}
return fmt.Errorf("request Origin %q is not authorized for Host %q", origin, r.Host)
}
// Defaults.
const (
DefaultWebsocketPingInterval = 25 * time.Second
DefaultWebsocketMessageSizeLimit = 1024 * 1024 // 1MB
)
func (s *Handler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
var streamID string
streamID = r.Header.Get("X-Grafana-Live-Stream")
if streamID == "" {
streamID = r.URL.Query().Get("gf_live_stream")
}
if streamID == "" {
logger.Warn("Push request without stream ID")
rw.WriteHeader(http.StatusBadRequest)
return
}
conn, err := s.upgrade.Upgrade(rw, r, nil)
if err != nil {
return
}
pingInterval := s.config.PingInterval
if pingInterval == 0 {
pingInterval = DefaultWebsocketPingInterval
}
messageSizeLimit := s.config.MessageSizeLimit
if messageSizeLimit == 0 {
messageSizeLimit = DefaultWebsocketMessageSizeLimit
}
if messageSizeLimit > 0 {
conn.SetReadLimit(int64(messageSizeLimit))
}
if pingInterval > 0 {
pongWait := pingInterval * 10 / 9
_ = conn.SetReadDeadline(time.Now().Add(pongWait))
conn.SetPongHandler(func(string) error {
_ = conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
}
go func() {
ticker := time.NewTicker(25 * time.Second)
defer ticker.Stop()
for {
select {
case <-r.Context().Done():
return
case <-ticker.C:
deadline := time.Now().Add(pingInterval / 2)
err := conn.WriteControl(websocket.PingMessage, nil, deadline)
if err != nil {
return
}
}
}
}()
for {
_, body, err := conn.ReadMessage()
if err != nil {
break
}
stream, err := s.managedStreamRunner.GetOrCreateStream(streamID)
if err != nil {
logger.Error("Error getting stream", "error", err)
continue
}
// TODO Grafana 8: decide which formats to use or keep all.
urlValues := r.URL.Query()
frameFormat := pushurl.FrameFormatFromValues(urlValues)
stableSchema := pushurl.StableSchemaFromValues(urlValues)
logger.Debug("Live Push request",
"protocol", "http",
"streamId", streamID,
"bodyLength", len(body),
"stableSchema", stableSchema,
"frameFormat", frameFormat,
)
metricFrames, err := s.converter.Convert(body, frameFormat)
if err != nil {
logger.Error("Error converting metrics", "error", err, "frameFormat", frameFormat)
continue
}
for _, mf := range metricFrames {
err := stream.Push(mf.Key(), mf.Frame(), stableSchema)
if err != nil {
return
}
}
}
}