Live: http publish, refactor live interfaces (#32317)

This commit is contained in:
Alexander Emelin 2021-03-30 13:23:29 +03:00 committed by GitHub
parent 84ea3a73c0
commit da05b7a07b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 191 additions and 95 deletions

View File

@ -397,6 +397,10 @@ func (hs *HTTPServer) registerRoutes() {
annotationsRoute.Post("/graphite", reqEditorRole, bind(dtos.PostGraphiteAnnotationsCmd{}), routing.Wrap(PostGraphiteAnnotation))
})
if hs.Live.IsEnabled() {
apiRoute.Post("/live/publish", bind(dtos.LivePublishCmd{}), routing.Wrap(hs.Live.HandleHTTPPublish))
}
// short urls
apiRoute.Post("/short-urls", bind(dtos.CreateShortURLCmd{}), routing.Wrap(hs.createShortURL))
}, reqSignedIn)

11
pkg/api/dtos/live.go Normal file
View File

@ -0,0 +1,11 @@
package dtos
import "encoding/json"
type LivePublishCmd struct {
Channel string `json:"channel"`
Data json.RawMessage `json:"data,omitempty"`
}
type LivePublishResponse struct {
}

View File

@ -1,17 +1,43 @@
package models
import "github.com/centrifugal/centrifuge"
import (
"context"
"encoding/json"
"time"
)
// ChannelPublisher writes data into a channel. Note that pemissions are not checked.
type ChannelPublisher func(channel string, data []byte) error
type SubscribeEvent struct {
Channel string
Path string
}
type SubscribeReply struct {
Presence bool
JoinLeave bool
Recover bool
}
type PublishEvent struct {
Channel string
Path string
Data json.RawMessage
}
type PublishReply struct {
HistorySize int
HistoryTTL time.Duration
}
// ChannelHandler defines the core channel behavior
type ChannelHandler interface {
// OnSubscribe is called when a client wants to subscribe to a channel
OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error)
OnSubscribe(ctx context.Context, user *SignedInUser, e SubscribeEvent) (SubscribeReply, bool, error)
// OnPublish is called when a client writes a message to the channel websocket.
OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) (centrifuge.PublishReply, error)
OnPublish(ctx context.Context, user *SignedInUser, e PublishEvent) (PublishReply, bool, error)
}
// ChannelHandlerFactory should be implemented by all core features.

View File

@ -1,9 +1,9 @@
package features
import (
"context"
"time"
"github.com/centrifugal/centrifuge"
"github.com/grafana/grafana/pkg/models"
)
@ -17,22 +17,18 @@ func (b *BroadcastRunner) GetHandlerForPath(path string) (models.ChannelHandler,
}
// OnSubscribe will let anyone connect to the path
func (b *BroadcastRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) {
return centrifuge.SubscribeReply{
Options: centrifuge.SubscribeOptions{
Presence: true,
JoinLeave: true,
Recover: true, // loads the saved value from history
},
}, nil
func (b *BroadcastRunner) OnSubscribe(ctx context.Context, _ *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, bool, error) {
return models.SubscribeReply{
Presence: true,
JoinLeave: true,
Recover: true, // loads the saved value from history
}, true, nil
}
// OnPublish is called when a client wants to broadcast on the websocket
func (b *BroadcastRunner) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) (centrifuge.PublishReply, error) {
return centrifuge.PublishReply{
Options: centrifuge.PublishOptions{
HistorySize: 1, // The last message is saved for 10 mins
HistoryTTL: 10 * time.Minute,
},
}, nil
func (b *BroadcastRunner) OnPublish(ctx context.Context, _ *models.SignedInUser, e models.PublishEvent) (models.PublishReply, bool, error) {
return models.PublishReply{
HistorySize: 1, // The last message is saved for 10 min.
HistoryTTL: 10 * time.Minute,
}, true, nil
}

View File

@ -1,9 +1,9 @@
package features
import (
"context"
"encoding/json"
"github.com/centrifugal/centrifuge"
"github.com/grafana/grafana/pkg/models"
)
@ -26,20 +26,16 @@ func (h *DashboardHandler) GetHandlerForPath(path string) (models.ChannelHandler
}
// OnSubscribe for now allows anyone to subscribe to any dashboard
func (h *DashboardHandler) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) {
return centrifuge.SubscribeReply{
Options: centrifuge.SubscribeOptions{
Presence: true,
JoinLeave: true,
},
}, nil
func (h *DashboardHandler) OnSubscribe(ctx context.Context, _ *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, bool, error) {
return models.SubscribeReply{
Presence: true,
JoinLeave: true,
}, true, nil
}
// OnPublish is called when someone begins to edit a dashoard
func (h *DashboardHandler) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) (centrifuge.PublishReply, error) {
return centrifuge.PublishReply{
Options: centrifuge.PublishOptions{},
}, nil
func (h *DashboardHandler) OnPublish(ctx context.Context, _ *models.SignedInUser, e models.PublishEvent) (models.PublishReply, bool, error) {
return models.PublishReply{}, true, nil
}
// DashboardSaved should broadcast to the appropriate stream

View File

@ -1,7 +1,8 @@
package features
import (
"github.com/centrifugal/centrifuge"
"context"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
)
@ -22,14 +23,12 @@ func (m *MeasurementsRunner) GetHandlerForPath(path string) (models.ChannelHandl
}
// OnSubscribe will let anyone connect to the path
func (m *MeasurementsRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) {
return centrifuge.SubscribeReply{}, nil
func (m *MeasurementsRunner) OnSubscribe(ctx context.Context, _ *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, bool, error) {
return models.SubscribeReply{}, true, nil
}
// OnPublish is called when a client wants to broadcast on the websocket
// Currently this sends measurements over websocket -- should be replaced with the HTTP interface
func (m *MeasurementsRunner) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) (centrifuge.PublishReply, error) {
return centrifuge.PublishReply{
Options: centrifuge.PublishOptions{},
}, nil
func (m *MeasurementsRunner) OnPublish(ctx context.Context, _ *models.SignedInUser, e models.PublishEvent) (models.PublishReply, bool, error) {
return models.PublishReply{}, true, nil
}

View File

@ -20,7 +20,7 @@ type PresenceGetter interface {
}
type PluginContextGetter interface {
GetPluginContext(ctx context.Context, pluginID string, datasourceUID string) (backend.PluginContext, bool, error)
GetPluginContext(user *models.SignedInUser, pluginID string, datasourceUID string) (backend.PluginContext, bool, error)
}
type StreamRunner interface {
@ -83,40 +83,39 @@ type PluginPathRunner struct {
}
// OnSubscribe passes control to a plugin.
func (r *PluginPathRunner) OnSubscribe(client *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) {
pCtx, found, err := r.pluginContextGetter.GetPluginContext(client.Context(), r.pluginID, r.datasourceUID)
func (r *PluginPathRunner) OnSubscribe(ctx context.Context, user *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, bool, error) {
pCtx, found, err := r.pluginContextGetter.GetPluginContext(user, r.pluginID, r.datasourceUID)
if err != nil {
logger.Error("Get plugin context error", "error", err, "path", r.path)
return centrifuge.SubscribeReply{}, err
return models.SubscribeReply{}, false, err
}
if !found {
logger.Error("Plugin context not found", "path", r.path)
return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal
return models.SubscribeReply{}, false, centrifuge.ErrorInternal
}
resp, err := r.handler.CanSubscribeToStream(client.Context(), &backend.SubscribeToStreamRequest{
resp, err := r.handler.CanSubscribeToStream(ctx, &backend.SubscribeToStreamRequest{
PluginContext: pCtx,
Path: r.path,
})
if err != nil {
logger.Error("Plugin CanSubscribeToStream call error", "error", err, "path", r.path)
return centrifuge.SubscribeReply{}, err
return models.SubscribeReply{}, false, err
}
if !resp.OK {
return centrifuge.SubscribeReply{}, centrifuge.ErrorPermissionDenied
return models.SubscribeReply{}, false, nil
}
err = r.streamManager.SubmitStream(e.Channel, r.path, pCtx, r.handler)
if err != nil {
logger.Error("Error submitting stream to manager", "error", err, "path", r.path)
return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal
return models.SubscribeReply{}, false, centrifuge.ErrorInternal
}
return centrifuge.SubscribeReply{
Options: centrifuge.SubscribeOptions{
Presence: true,
},
}, nil
return models.SubscribeReply{
Presence: true,
}, true, nil
}
// OnPublish passes control to a plugin.
func (r *PluginPathRunner) OnPublish(_ *centrifuge.Client, _ centrifuge.PublishEvent) (centrifuge.PublishReply, error) {
return centrifuge.PublishReply{}, fmt.Errorf("not implemented yet")
func (r *PluginPathRunner) OnPublish(_ context.Context, _ *models.SignedInUser, _ models.PublishEvent) (models.PublishReply, bool, error) {
// TODO: pass control to a plugin.
return models.PublishReply{}, false, fmt.Errorf("not implemented yet")
}

View File

@ -3,12 +3,15 @@ package live
import (
"context"
"fmt"
"net/http"
"sync"
"time"
"github.com/centrifugal/centrifuge"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/api/dtos"
"github.com/grafana/grafana/pkg/api/response"
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
@ -140,12 +143,36 @@ func (g *GrafanaLive) Init() error {
client.OnSubscribe(func(e centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) {
logger.Debug("Client wants to subscribe", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
handler, err := g.GetChannelHandler(client.Context(), e.Channel)
user, ok := getContextSignedUser(client.Context())
if !ok {
logger.Error("Unauthenticated live connection")
cb(centrifuge.SubscribeReply{}, centrifuge.ErrorInternal)
return
}
handler, addr, err := g.GetChannelHandler(user, e.Channel)
if err != nil {
logger.Error("Error getting channel handler", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
cb(centrifuge.SubscribeReply{}, err)
} else {
cb(handler.OnSubscribe(client, e))
reply, allowed, err := handler.OnSubscribe(client.Context(), user, models.SubscribeEvent{
Channel: e.Channel,
Path: addr.Path,
})
if err != nil {
cb(centrifuge.SubscribeReply{}, centrifuge.ErrorInternal)
return
}
if !allowed {
cb(centrifuge.SubscribeReply{}, centrifuge.ErrorPermissionDenied)
return
}
cb(centrifuge.SubscribeReply{
Options: centrifuge.SubscribeOptions{
Presence: reply.Presence,
JoinLeave: reply.JoinLeave,
Recover: reply.Recover,
},
}, nil)
}
})
@ -154,12 +181,35 @@ func (g *GrafanaLive) Init() error {
// allows some simple prototypes to work quickly.
client.OnPublish(func(e centrifuge.PublishEvent, cb centrifuge.PublishCallback) {
logger.Debug("Client wants to publish", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
handler, err := g.GetChannelHandler(client.Context(), e.Channel)
user, ok := getContextSignedUser(client.Context())
if !ok {
logger.Error("Unauthenticated live connection")
cb(centrifuge.PublishReply{}, centrifuge.ErrorInternal)
return
}
handler, addr, err := g.GetChannelHandler(user, e.Channel)
if err != nil {
logger.Error("Error getting channel handler", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
cb(centrifuge.PublishReply{}, err)
} else {
cb(handler.OnPublish(client, e))
reply, allowed, err := handler.OnPublish(client.Context(), user, models.PublishEvent{
Channel: e.Channel,
Path: addr.Path,
})
if err != nil {
cb(centrifuge.PublishReply{}, centrifuge.ErrorInternal)
return
}
if !allowed {
cb(centrifuge.PublishReply{}, centrifuge.ErrorPermissionDenied)
return
}
cb(centrifuge.PublishReply{
Options: centrifuge.PublishOptions{
HistorySize: reply.HistorySize,
HistoryTTL: reply.HistoryTTL,
},
}, nil)
}
})
@ -205,19 +255,19 @@ func (g *GrafanaLive) Init() error {
}
// GetChannelHandler gives thread-safe access to the channel.
func (g *GrafanaLive) GetChannelHandler(ctx context.Context, channel string) (models.ChannelHandler, error) {
func (g *GrafanaLive) GetChannelHandler(user *models.SignedInUser, channel string) (models.ChannelHandler, ChannelAddress, error) {
// Parse the identifier ${scope}/${namespace}/${path}
addr := ParseChannelAddress(channel)
if !addr.IsValid() {
return nil, ChannelAddress{}, fmt.Errorf("invalid channel: %q", channel)
}
g.channelsMu.RLock()
c, ok := g.channels[channel]
g.channelsMu.RUnlock() // defer? but then you can't lock further down
if ok {
logger.Debug("Found cached channel handler", "channel", channel)
return c, nil
}
// Parse the identifier ${scope}/${namespace}/${path}
addr := ParseChannelAddress(channel)
if !addr.IsValid() {
return nil, fmt.Errorf("invalid channel: %q", channel)
return c, addr, nil
}
g.channelsMu.Lock()
@ -225,48 +275,48 @@ func (g *GrafanaLive) GetChannelHandler(ctx context.Context, channel string) (mo
c, ok = g.channels[channel] // may have filled in while locked
if ok {
logger.Debug("Found cached channel handler", "channel", channel)
return c, nil
return c, addr, nil
}
getter, err := g.GetChannelHandlerFactory(ctx, addr.Scope, addr.Namespace)
getter, err := g.GetChannelHandlerFactory(user, addr.Scope, addr.Namespace)
if err != nil {
return nil, fmt.Errorf("error getting channel handler factory: %w", err)
return nil, addr, fmt.Errorf("error getting channel handler factory: %w", err)
}
// First access will initialize.
c, err = getter.GetHandlerForPath(addr.Path)
if err != nil {
return nil, fmt.Errorf("error getting handler for path: %w", err)
return nil, addr, fmt.Errorf("error getting handler for path: %w", err)
}
logger.Info("Initialized channel handler", "channel", channel, "address", addr)
g.channels[channel] = c
return c, nil
return c, addr, nil
}
// GetChannelHandlerFactory gets a ChannelHandlerFactory for a namespace.
// It gives thread-safe access to the channel.
func (g *GrafanaLive) GetChannelHandlerFactory(ctx context.Context, scope string, namespace string) (models.ChannelHandlerFactory, error) {
func (g *GrafanaLive) GetChannelHandlerFactory(user *models.SignedInUser, scope string, namespace string) (models.ChannelHandlerFactory, error) {
switch scope {
case ScopeGrafana:
return g.handleGrafanaScope(ctx, namespace)
return g.handleGrafanaScope(user, namespace)
case ScopePlugin:
return g.handlePluginScope(ctx, namespace)
return g.handlePluginScope(user, namespace)
case ScopeDatasource:
return g.handleDatasourceScope(ctx, namespace)
return g.handleDatasourceScope(user, namespace)
default:
return nil, fmt.Errorf("invalid scope: %q", scope)
}
}
func (g *GrafanaLive) handleGrafanaScope(_ context.Context, namespace string) (models.ChannelHandlerFactory, error) {
func (g *GrafanaLive) handleGrafanaScope(_ *models.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) {
if p, ok := g.GrafanaScope.Features[namespace]; ok {
return p, nil
}
return nil, fmt.Errorf("unknown feature: %q", namespace)
}
func (g *GrafanaLive) handlePluginScope(_ context.Context, namespace string) (models.ChannelHandlerFactory, error) {
func (g *GrafanaLive) handlePluginScope(_ *models.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) {
// Temporary hack until we have a more generic solution later on
if namespace == "cloudwatch" {
return &cloudwatch.LogQueryRunnerSupplier{
@ -287,11 +337,7 @@ func (g *GrafanaLive) handlePluginScope(_ context.Context, namespace string) (mo
), nil
}
func (g *GrafanaLive) handleDatasourceScope(ctx context.Context, namespace string) (models.ChannelHandlerFactory, error) {
user, ok := getContextSignedUser(ctx)
if !ok {
return nil, fmt.Errorf("no signed user found in context")
}
func (g *GrafanaLive) handleDatasourceScope(user *models.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) {
ds, err := g.DatasourceCache.GetDatasourceByUID(namespace, user, false)
if err != nil {
return nil, fmt.Errorf("error getting datasource: %w", err)
@ -320,6 +366,31 @@ func (g *GrafanaLive) IsEnabled() bool {
return g.Cfg.IsLiveEnabled()
}
func (g *GrafanaLive) HandleHTTPPublish(ctx *models.ReqContext, cmd dtos.LivePublishCmd) response.Response {
addr := ParseChannelAddress(cmd.Channel)
if !addr.IsValid() {
return response.Error(http.StatusBadRequest, "Bad channel address", nil)
}
logger.Debug("Publish API cmd", "cmd", cmd)
channelHandler, addr, err := g.GetChannelHandler(ctx.SignedInUser, cmd.Channel)
if err != nil {
logger.Error("Error getting channels handler", "error", err, "channel", cmd.Channel)
return response.Error(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), nil)
}
_, allowed, err := channelHandler.OnPublish(ctx.Req.Context(), ctx.SignedInUser, models.PublishEvent{Channel: cmd.Channel, Path: addr.Path, Data: cmd.Data})
if err != nil {
logger.Error("Error calling OnPublish", "error", err, "channel", cmd.Channel)
return response.Error(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), nil)
}
if !allowed {
return response.Error(http.StatusForbidden, http.StatusText(http.StatusForbidden), nil)
}
return response.JSON(200, dtos.LivePublishResponse{})
}
// Write to the standard log15 logger
func handleLog(msg centrifuge.LogEntry) {
arr := make([]interface{}, 0)

View File

@ -1,8 +1,7 @@
package live
import (
"context"
"fmt"
"github.com/grafana/grafana/pkg/models"
"github.com/centrifugal/centrifuge"
"github.com/grafana/grafana-plugin-sdk-go/backend"
@ -48,10 +47,6 @@ func newPluginContextGetter(pluginContextProvider *plugincontext.Provider) *plug
}
}
func (g *pluginContextGetter) GetPluginContext(ctx context.Context, pluginID string, datasourceUID string) (backend.PluginContext, bool, error) {
user, ok := getContextSignedUser(ctx)
if !ok {
return backend.PluginContext{}, false, fmt.Errorf("no signed user found in context")
}
func (g *pluginContextGetter) GetPluginContext(user *models.SignedInUser, pluginID string, datasourceUID string) (backend.PluginContext, bool, error) {
return g.PluginContextProvider.Get(pluginID, datasourceUID, user)
}

View File

@ -13,7 +13,6 @@ import (
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go/service/servicequotas"
"github.com/aws/aws-sdk-go/service/servicequotas/servicequotasiface"
"github.com/centrifugal/centrifuge"
"github.com/google/uuid"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
@ -57,12 +56,12 @@ func (s *LogQueryRunnerSupplier) GetHandlerForPath(path string) (models.ChannelH
}
// OnSubscribe publishes results from the corresponding CloudWatch Logs query to the provided channel
func (r *logQueryRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) {
func (r *logQueryRunner) OnSubscribe(ctx context.Context, user *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, bool, error) {
r.runningMu.Lock()
defer r.runningMu.Unlock()
if _, ok := r.running[e.Channel]; ok {
return centrifuge.SubscribeReply{}, nil
return models.SubscribeReply{}, true, nil
}
r.running[e.Channel] = true
@ -72,12 +71,12 @@ func (r *logQueryRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.Subscrib
}
}()
return centrifuge.SubscribeReply{}, nil
return models.SubscribeReply{}, true, nil
}
// OnPublish checks if a message from the websocket can be broadcast on this channel
func (r *logQueryRunner) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) (centrifuge.PublishReply, error) {
return centrifuge.PublishReply{}, fmt.Errorf("can not publish")
func (r *logQueryRunner) OnPublish(ctx context.Context, user *models.SignedInUser, e models.PublishEvent) (models.PublishReply, bool, error) {
return models.PublishReply{}, false, nil
}
func (r *logQueryRunner) publishResults(channelName string) error {