2020-10-01 12:46:14 -05:00
|
|
|
package features
|
|
|
|
|
|
|
|
import (
|
2021-03-30 05:23:29 -05:00
|
|
|
"context"
|
2020-11-05 12:37:04 -06:00
|
|
|
|
2021-04-05 11:04:46 -05:00
|
|
|
"github.com/grafana/grafana/pkg/infra/log"
|
2021-04-30 13:06:33 -05:00
|
|
|
"github.com/grafana/grafana/pkg/models"
|
2022-08-10 04:56:48 -05:00
|
|
|
"github.com/grafana/grafana/pkg/services/user"
|
2021-04-05 11:04:46 -05:00
|
|
|
|
2021-04-02 11:41:45 -05:00
|
|
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
2020-10-01 12:46:14 -05:00
|
|
|
)
|
|
|
|
|
2021-04-05 11:04:46 -05:00
|
|
|
var (
|
|
|
|
logger = log.New("live.features") // scoped to all features?
|
|
|
|
)
|
|
|
|
|
2021-04-30 13:06:33 -05:00
|
|
|
//go:generate mockgen -destination=broadcast_mock.go -package=features github.com/grafana/grafana/pkg/services/live/features LiveMessageStore
|
|
|
|
|
|
|
|
type LiveMessageStore interface {
|
|
|
|
SaveLiveMessage(query *models.SaveLiveMessageQuery) error
|
|
|
|
GetLiveMessage(query *models.GetLiveMessageQuery) (models.LiveMessage, bool, error)
|
|
|
|
}
|
|
|
|
|
2020-10-01 12:46:14 -05:00
|
|
|
// BroadcastRunner will simply broadcast all events to `grafana/broadcast/*` channels
|
2020-10-22 02:10:26 -05:00
|
|
|
// This assumes that data is a JSON object
|
2021-04-30 13:06:33 -05:00
|
|
|
type BroadcastRunner struct {
|
|
|
|
liveMessageStore LiveMessageStore
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewBroadcastRunner(liveMessageStore LiveMessageStore) *BroadcastRunner {
|
|
|
|
return &BroadcastRunner{liveMessageStore: liveMessageStore}
|
|
|
|
}
|
2020-10-01 12:46:14 -05:00
|
|
|
|
|
|
|
// GetHandlerForPath called on init
|
2021-04-30 13:06:33 -05:00
|
|
|
func (b *BroadcastRunner) GetHandlerForPath(_ string) (models.ChannelHandler, error) {
|
2020-11-05 12:37:04 -06:00
|
|
|
return b, nil // all dashboards share the same handler
|
2020-10-01 12:46:14 -05:00
|
|
|
}
|
|
|
|
|
2020-11-05 12:37:04 -06:00
|
|
|
// OnSubscribe will let anyone connect to the path
|
2022-08-10 04:56:48 -05:00
|
|
|
func (b *BroadcastRunner) OnSubscribe(_ context.Context, u *user.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) {
|
2021-04-30 13:06:33 -05:00
|
|
|
reply := models.SubscribeReply{
|
2021-03-30 05:23:29 -05:00
|
|
|
Presence: true,
|
|
|
|
JoinLeave: true,
|
2021-04-30 13:06:33 -05:00
|
|
|
}
|
|
|
|
query := &models.GetLiveMessageQuery{
|
2022-08-11 06:28:55 -05:00
|
|
|
OrgId: u.OrgID,
|
2021-04-30 13:06:33 -05:00
|
|
|
Channel: e.Channel,
|
|
|
|
}
|
|
|
|
msg, ok, err := b.liveMessageStore.GetLiveMessage(query)
|
|
|
|
if err != nil {
|
|
|
|
return models.SubscribeReply{}, 0, err
|
|
|
|
}
|
|
|
|
if ok {
|
|
|
|
reply.Data = msg.Data
|
|
|
|
}
|
|
|
|
return reply, backend.SubscribeStreamStatusOK, nil
|
2020-10-01 12:46:14 -05:00
|
|
|
}
|
|
|
|
|
2020-11-05 12:37:04 -06:00
|
|
|
// OnPublish is called when a client wants to broadcast on the websocket
|
2022-08-10 04:56:48 -05:00
|
|
|
func (b *BroadcastRunner) OnPublish(_ context.Context, u *user.SignedInUser, e models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) {
|
2021-04-30 13:06:33 -05:00
|
|
|
query := &models.SaveLiveMessageQuery{
|
2022-08-11 06:28:55 -05:00
|
|
|
OrgId: u.OrgID,
|
2021-04-30 13:06:33 -05:00
|
|
|
Channel: e.Channel,
|
|
|
|
Data: e.Data,
|
|
|
|
}
|
|
|
|
if err := b.liveMessageStore.SaveLiveMessage(query); err != nil {
|
|
|
|
return models.PublishReply{}, 0, err
|
|
|
|
}
|
2021-09-01 10:54:34 -05:00
|
|
|
return models.PublishReply{Data: e.Data}, backend.PublishStreamStatusOK, nil
|
2020-10-01 12:46:14 -05:00
|
|
|
}
|