Chore: Remove live from models (#61711)

This commit is contained in:
idafurjes 2023-01-19 10:03:14 +01:00 committed by GitHub
parent 0d42edddbf
commit cacc55ba06
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 216 additions and 129 deletions

View File

@ -6,7 +6,7 @@ import (
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/localcache"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/live/model"
)
type Storage struct {
@ -22,7 +22,7 @@ func getLiveMessageCacheKey(orgID int64, channel string) string {
return fmt.Sprintf("live_message_%d_%s", orgID, channel)
}
func (s *Storage) SaveLiveMessage(query *models.SaveLiveMessageQuery) error {
func (s *Storage) SaveLiveMessage(query *model.SaveLiveMessageQuery) error {
// Come back to saving into database after evaluating database structure.
//err := s.store.WithDbSession(context.Background(), func(sess *db.Session) error {
// params := []interface{}{query.OrgId, query.Channel, query.Data, time.Now()}
@ -34,7 +34,7 @@ func (s *Storage) SaveLiveMessage(query *models.SaveLiveMessageQuery) error {
// return err
//})
// return err
s.cache.Set(getLiveMessageCacheKey(query.OrgId, query.Channel), models.LiveMessage{
s.cache.Set(getLiveMessageCacheKey(query.OrgId, query.Channel), model.LiveMessage{
Id: 0, // Not used actually.
OrgId: query.OrgId,
Channel: query.Channel,
@ -44,7 +44,7 @@ func (s *Storage) SaveLiveMessage(query *models.SaveLiveMessageQuery) error {
return nil
}
func (s *Storage) GetLiveMessage(query *models.GetLiveMessageQuery) (models.LiveMessage, bool, error) {
func (s *Storage) GetLiveMessage(query *model.GetLiveMessageQuery) (model.LiveMessage, bool, error) {
// Come back to saving into database after evaluating database structure.
//var msg models.LiveMessage
//var exists bool
@ -56,11 +56,11 @@ func (s *Storage) GetLiveMessage(query *models.GetLiveMessageQuery) (models.Live
//return msg, exists, err
m, ok := s.cache.Get(getLiveMessageCacheKey(query.OrgId, query.Channel))
if !ok {
return models.LiveMessage{}, false, nil
return model.LiveMessage{}, false, nil
}
msg, ok := m.(models.LiveMessage)
msg, ok := m.(model.LiveMessage)
if !ok {
return models.LiveMessage{}, false, fmt.Errorf("unexpected live message type in cache: %T", m)
return model.LiveMessage{}, false, fmt.Errorf("unexpected live message type in cache: %T", m)
}
return msg, true, nil
}

View File

@ -4,8 +4,7 @@ import (
"encoding/json"
"testing"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/live/model"
"github.com/stretchr/testify/require"
)
@ -15,7 +14,7 @@ func TestIntegrationLiveMessage(t *testing.T) {
}
storage := SetupTestStorage(t)
getQuery := &models.GetLiveMessageQuery{
getQuery := &model.GetLiveMessageQuery{
OrgId: 1,
Channel: "test_channel",
}
@ -23,7 +22,7 @@ func TestIntegrationLiveMessage(t *testing.T) {
require.NoError(t, err)
require.False(t, ok)
saveQuery := &models.SaveLiveMessageQuery{
saveQuery := &model.SaveLiveMessageQuery{
OrgId: 1,
Channel: "test_channel",
Data: []byte(`{}`),
@ -40,7 +39,7 @@ func TestIntegrationLiveMessage(t *testing.T) {
require.NotZero(t, msg.Published)
// try saving again, should be replaced.
saveQuery2 := &models.SaveLiveMessageQuery{
saveQuery2 := &model.SaveLiveMessageQuery{
OrgId: 1,
Channel: "test_channel",
Data: []byte(`{"input": "hello"}`),
@ -48,7 +47,7 @@ func TestIntegrationLiveMessage(t *testing.T) {
err = storage.SaveLiveMessage(saveQuery2)
require.NoError(t, err)
getQuery2 := &models.GetLiveMessageQuery{
getQuery2 := &model.GetLiveMessageQuery{
OrgId: 1,
Channel: "test_channel",
}

View File

@ -4,7 +4,7 @@ import (
"context"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/live/model"
"github.com/grafana/grafana/pkg/services/user"
"github.com/grafana/grafana-plugin-sdk-go/backend"
@ -17,8 +17,8 @@ var (
//go:generate mockgen -destination=broadcast_mock.go -package=features github.com/grafana/grafana/pkg/services/live/features LiveMessageStore
type LiveMessageStore interface {
SaveLiveMessage(query *models.SaveLiveMessageQuery) error
GetLiveMessage(query *models.GetLiveMessageQuery) (models.LiveMessage, bool, error)
SaveLiveMessage(query *model.SaveLiveMessageQuery) error
GetLiveMessage(query *model.GetLiveMessageQuery) (model.LiveMessage, bool, error)
}
// BroadcastRunner will simply broadcast all events to `grafana/broadcast/*` channels
@ -32,23 +32,23 @@ func NewBroadcastRunner(liveMessageStore LiveMessageStore) *BroadcastRunner {
}
// GetHandlerForPath called on init
func (b *BroadcastRunner) GetHandlerForPath(_ string) (models.ChannelHandler, error) {
func (b *BroadcastRunner) GetHandlerForPath(_ string) (model.ChannelHandler, error) {
return b, nil // all dashboards share the same handler
}
// OnSubscribe will let anyone connect to the path
func (b *BroadcastRunner) OnSubscribe(_ context.Context, u *user.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) {
reply := models.SubscribeReply{
func (b *BroadcastRunner) OnSubscribe(_ context.Context, u *user.SignedInUser, e model.SubscribeEvent) (model.SubscribeReply, backend.SubscribeStreamStatus, error) {
reply := model.SubscribeReply{
Presence: true,
JoinLeave: true,
}
query := &models.GetLiveMessageQuery{
query := &model.GetLiveMessageQuery{
OrgId: u.OrgID,
Channel: e.Channel,
}
msg, ok, err := b.liveMessageStore.GetLiveMessage(query)
if err != nil {
return models.SubscribeReply{}, 0, err
return model.SubscribeReply{}, 0, err
}
if ok {
reply.Data = msg.Data
@ -57,14 +57,14 @@ func (b *BroadcastRunner) OnSubscribe(_ context.Context, u *user.SignedInUser, e
}
// OnPublish is called when a client wants to broadcast on the websocket
func (b *BroadcastRunner) OnPublish(_ context.Context, u *user.SignedInUser, e models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) {
query := &models.SaveLiveMessageQuery{
func (b *BroadcastRunner) OnPublish(_ context.Context, u *user.SignedInUser, e model.PublishEvent) (model.PublishReply, backend.PublishStreamStatus, error) {
query := &model.SaveLiveMessageQuery{
OrgId: u.OrgID,
Channel: e.Channel,
Data: e.Data,
}
if err := b.liveMessageStore.SaveLiveMessage(query); err != nil {
return models.PublishReply{}, 0, err
return model.PublishReply{}, 0, err
}
return models.PublishReply{Data: e.Data}, backend.PublishStreamStatusOK, nil
return model.PublishReply{Data: e.Data}, backend.PublishStreamStatusOK, nil
}

View File

@ -8,7 +8,7 @@ import (
reflect "reflect"
gomock "github.com/golang/mock/gomock"
models "github.com/grafana/grafana/pkg/models"
model "github.com/grafana/grafana/pkg/services/live/model"
)
// MockLiveMessageStore is a mock of LiveMessageStore interface.
@ -35,10 +35,10 @@ func (m *MockLiveMessageStore) EXPECT() *MockLiveMessageStoreMockRecorder {
}
// GetLiveMessage mocks base method.
func (m *MockLiveMessageStore) GetLiveMessage(arg0 *models.GetLiveMessageQuery) (models.LiveMessage, bool, error) {
func (m *MockLiveMessageStore) GetLiveMessage(arg0 *model.GetLiveMessageQuery) (model.LiveMessage, bool, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetLiveMessage", arg0)
ret0, _ := ret[0].(models.LiveMessage)
ret0, _ := ret[0].(model.LiveMessage)
ret1, _ := ret[1].(bool)
ret2, _ := ret[2].(error)
return ret0, ret1, ret2
@ -51,7 +51,7 @@ func (mr *MockLiveMessageStoreMockRecorder) GetLiveMessage(arg0 interface{}) *go
}
// SaveLiveMessage mocks base method.
func (m *MockLiveMessageStore) SaveLiveMessage(arg0 *models.SaveLiveMessageQuery) error {
func (m *MockLiveMessageStore) SaveLiveMessage(arg0 *model.SaveLiveMessageQuery) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SaveLiveMessage", arg0)
ret0, _ := ret[0].(error)

View File

@ -7,7 +7,7 @@ import (
"github.com/golang/mock/gomock"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/live/model"
"github.com/grafana/grafana/pkg/services/user"
"github.com/stretchr/testify/require"
)
@ -28,11 +28,11 @@ func TestBroadcastRunner_OnSubscribe(t *testing.T) {
channel := "stream/channel/test"
data := json.RawMessage(`{}`)
mockDispatcher.EXPECT().GetLiveMessage(&models.GetLiveMessageQuery{
mockDispatcher.EXPECT().GetLiveMessage(&model.GetLiveMessageQuery{
OrgId: 1,
Channel: channel,
}).DoAndReturn(func(query *models.GetLiveMessageQuery) (models.LiveMessage, bool, error) {
return models.LiveMessage{
}).DoAndReturn(func(query *model.GetLiveMessageQuery) (model.LiveMessage, bool, error) {
return model.LiveMessage{
Data: data,
}, true, nil
}).Times(1)
@ -44,7 +44,7 @@ func TestBroadcastRunner_OnSubscribe(t *testing.T) {
reply, status, err := handler.OnSubscribe(
context.Background(),
&user.SignedInUser{OrgID: 1, UserID: 2},
models.SubscribeEvent{Channel: channel, Path: "test"},
model.SubscribeEvent{Channel: channel, Path: "test"},
)
require.NoError(t, err)
require.Equal(t, backend.SubscribeStreamStatusOK, status)
@ -63,11 +63,11 @@ func TestBroadcastRunner_OnPublish(t *testing.T) {
data := json.RawMessage(`{}`)
var orgID int64 = 1
mockDispatcher.EXPECT().SaveLiveMessage(&models.SaveLiveMessageQuery{
mockDispatcher.EXPECT().SaveLiveMessage(&model.SaveLiveMessageQuery{
OrgId: orgID,
Channel: channel,
Data: data,
}).DoAndReturn(func(query *models.SaveLiveMessageQuery) error {
}).DoAndReturn(func(query *model.SaveLiveMessageQuery) error {
return nil
}).Times(1)
@ -78,7 +78,7 @@ func TestBroadcastRunner_OnPublish(t *testing.T) {
reply, status, err := handler.OnPublish(
context.Background(),
&user.SignedInUser{OrgID: 1, UserID: 2},
models.PublishEvent{Channel: channel, Path: "test", Data: data},
model.PublishEvent{Channel: channel, Path: "test", Data: data},
)
require.NoError(t, err)
require.Equal(t, backend.PublishStreamStatusOK, status)

View File

@ -4,8 +4,8 @@ import (
"context"
"strings"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/comments/commentmodel"
"github.com/grafana/grafana/pkg/services/live/model"
"github.com/grafana/grafana/pkg/services/user"
"github.com/grafana/grafana-plugin-sdk-go/backend"
@ -21,29 +21,29 @@ func NewCommentHandler(permissionChecker *commentmodel.PermissionChecker) *Comme
}
// GetHandlerForPath called on init.
func (h *CommentHandler) GetHandlerForPath(_ string) (models.ChannelHandler, error) {
func (h *CommentHandler) GetHandlerForPath(_ string) (model.ChannelHandler, error) {
return h, nil // all chats share the same handler
}
// OnSubscribe handles subscription to comment group channel.
func (h *CommentHandler) OnSubscribe(ctx context.Context, user *user.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) {
func (h *CommentHandler) OnSubscribe(ctx context.Context, user *user.SignedInUser, e model.SubscribeEvent) (model.SubscribeReply, backend.SubscribeStreamStatus, error) {
parts := strings.Split(e.Path, "/")
if len(parts) != 2 {
return models.SubscribeReply{}, backend.SubscribeStreamStatusNotFound, nil
return model.SubscribeReply{}, backend.SubscribeStreamStatusNotFound, nil
}
objectType := parts[0]
objectID := parts[1]
ok, err := h.permissionChecker.CheckReadPermissions(ctx, user.OrgID, user, objectType, objectID)
if err != nil {
return models.SubscribeReply{}, 0, err
return model.SubscribeReply{}, 0, err
}
if !ok {
return models.SubscribeReply{}, backend.SubscribeStreamStatusPermissionDenied, nil
return model.SubscribeReply{}, backend.SubscribeStreamStatusPermissionDenied, nil
}
return models.SubscribeReply{}, backend.SubscribeStreamStatusOK, nil
return model.SubscribeReply{}, backend.SubscribeStreamStatusOK, nil
}
// OnPublish is not used for comments.
func (h *CommentHandler) OnPublish(_ context.Context, _ *user.SignedInUser, _ models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) {
return models.PublishReply{}, backend.PublishStreamStatusPermissionDenied, nil
func (h *CommentHandler) OnPublish(_ context.Context, _ *user.SignedInUser, _ model.PublishEvent) (model.PublishReply, backend.PublishStreamStatus, error) {
return model.PublishReply{}, backend.PublishStreamStatusPermissionDenied, nil
}

View File

@ -9,9 +9,9 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/dashboards"
"github.com/grafana/grafana/pkg/services/guardian"
"github.com/grafana/grafana/pkg/services/live/model"
"github.com/grafana/grafana/pkg/services/org"
"github.com/grafana/grafana/pkg/services/user"
)
@ -40,26 +40,26 @@ type dashboardEvent struct {
// DashboardHandler manages all the `grafana/dashboard/*` channels
type DashboardHandler struct {
Publisher models.ChannelPublisher
ClientCount models.ChannelClientCount
Publisher model.ChannelPublisher
ClientCount model.ChannelClientCount
Store db.DB
DashboardService dashboards.DashboardService
}
// GetHandlerForPath called on init
func (h *DashboardHandler) GetHandlerForPath(_ string) (models.ChannelHandler, error) {
func (h *DashboardHandler) GetHandlerForPath(_ string) (model.ChannelHandler, error) {
return h, nil // all dashboards share the same handler
}
// OnSubscribe for now allows anyone to subscribe to any dashboard
func (h *DashboardHandler) OnSubscribe(ctx context.Context, user *user.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) {
func (h *DashboardHandler) OnSubscribe(ctx context.Context, user *user.SignedInUser, e model.SubscribeEvent) (model.SubscribeReply, backend.SubscribeStreamStatus, error) {
parts := strings.Split(e.Path, "/")
if parts[0] == "gitops" {
// gitops gets all changes for everything, so lets make sure it is an admin user
if !user.HasRole(org.RoleAdmin) {
return models.SubscribeReply{}, backend.SubscribeStreamStatusPermissionDenied, nil
return model.SubscribeReply{}, backend.SubscribeStreamStatusPermissionDenied, nil
}
return models.SubscribeReply{
return model.SubscribeReply{
Presence: true,
}, backend.SubscribeStreamStatusOK, nil
}
@ -69,19 +69,19 @@ func (h *DashboardHandler) OnSubscribe(ctx context.Context, user *user.SignedInU
query := dashboards.GetDashboardQuery{UID: parts[1], OrgID: user.OrgID}
if err := h.DashboardService.GetDashboard(ctx, &query); err != nil {
logger.Error("Error getting dashboard", "query", query, "error", err)
return models.SubscribeReply{}, backend.SubscribeStreamStatusNotFound, nil
return model.SubscribeReply{}, backend.SubscribeStreamStatusNotFound, nil
}
dash := query.Result
guard, err := guardian.NewByDashboard(ctx, dash, user.OrgID, user)
if err != nil {
return models.SubscribeReply{}, backend.SubscribeStreamStatusPermissionDenied, err
return model.SubscribeReply{}, backend.SubscribeStreamStatusPermissionDenied, err
}
if canView, err := guard.CanView(); err != nil || !canView {
return models.SubscribeReply{}, backend.SubscribeStreamStatusPermissionDenied, nil
return model.SubscribeReply{}, backend.SubscribeStreamStatusPermissionDenied, nil
}
return models.SubscribeReply{
return model.SubscribeReply{
Presence: true,
JoinLeave: true,
}, backend.SubscribeStreamStatusOK, nil
@ -89,20 +89,20 @@ func (h *DashboardHandler) OnSubscribe(ctx context.Context, user *user.SignedInU
// Unknown path
logger.Error("Unknown dashboard channel", "path", e.Path)
return models.SubscribeReply{}, backend.SubscribeStreamStatusNotFound, nil
return model.SubscribeReply{}, backend.SubscribeStreamStatusNotFound, nil
}
// OnPublish is called when someone begins to edit a dashboard
func (h *DashboardHandler) OnPublish(ctx context.Context, user *user.SignedInUser, e models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) {
func (h *DashboardHandler) OnPublish(ctx context.Context, user *user.SignedInUser, e model.PublishEvent) (model.PublishReply, backend.PublishStreamStatus, error) {
parts := strings.Split(e.Path, "/")
if parts[0] == "gitops" {
// gitops gets all changes for everything, so lets make sure it is an admin user
if !user.HasRole(org.RoleAdmin) {
return models.PublishReply{}, backend.PublishStreamStatusPermissionDenied, nil
return model.PublishReply{}, backend.PublishStreamStatusPermissionDenied, nil
}
// Eventually this could broadcast a message back to the dashboard saying a pull request exists
return models.PublishReply{}, backend.PublishStreamStatusNotFound, fmt.Errorf("not implemented yet")
return model.PublishReply{}, backend.PublishStreamStatusNotFound, fmt.Errorf("not implemented yet")
}
// make sure can view this dashboard
@ -110,32 +110,32 @@ func (h *DashboardHandler) OnPublish(ctx context.Context, user *user.SignedInUse
event := dashboardEvent{}
err := json.Unmarshal(e.Data, &event)
if err != nil || event.UID != parts[1] {
return models.PublishReply{}, backend.PublishStreamStatusNotFound, fmt.Errorf("bad request")
return model.PublishReply{}, backend.PublishStreamStatusNotFound, fmt.Errorf("bad request")
}
if event.Action != EditingStarted {
// just ignore the event
return models.PublishReply{}, backend.PublishStreamStatusNotFound, fmt.Errorf("ignore???")
return model.PublishReply{}, backend.PublishStreamStatusNotFound, fmt.Errorf("ignore???")
}
query := dashboards.GetDashboardQuery{UID: parts[1], OrgID: user.OrgID}
if err := h.DashboardService.GetDashboard(ctx, &query); err != nil {
logger.Error("Unknown dashboard", "query", query)
return models.PublishReply{}, backend.PublishStreamStatusNotFound, nil
return model.PublishReply{}, backend.PublishStreamStatusNotFound, nil
}
guard, err := guardian.NewByDashboard(ctx, query.Result, user.OrgID, user)
if err != nil {
logger.Error("Failed to create guardian", "err", err)
return models.PublishReply{}, backend.PublishStreamStatusNotFound, fmt.Errorf("internal error")
return model.PublishReply{}, backend.PublishStreamStatusNotFound, fmt.Errorf("internal error")
}
canEdit, err := guard.CanEdit()
if err != nil {
return models.PublishReply{}, backend.PublishStreamStatusNotFound, fmt.Errorf("internal error")
return model.PublishReply{}, backend.PublishStreamStatusNotFound, fmt.Errorf("internal error")
}
// Ignore edit events if the user can not edit
if !canEdit {
return models.PublishReply{}, backend.PublishStreamStatusNotFound, nil // NOOP
return model.PublishReply{}, backend.PublishStreamStatusNotFound, nil // NOOP
}
// Tell everyone who is editing
@ -143,12 +143,12 @@ func (h *DashboardHandler) OnPublish(ctx context.Context, user *user.SignedInUse
msg, err := json.Marshal(event)
if err != nil {
return models.PublishReply{}, backend.PublishStreamStatusNotFound, fmt.Errorf("internal error")
return model.PublishReply{}, backend.PublishStreamStatusNotFound, fmt.Errorf("internal error")
}
return models.PublishReply{Data: msg}, backend.PublishStreamStatusOK, nil
return model.PublishReply{Data: msg}, backend.PublishStreamStatusOK, nil
}
return models.PublishReply{}, backend.PublishStreamStatusNotFound, nil
return model.PublishReply{}, backend.PublishStreamStatusNotFound, nil
}
// DashboardSaved should broadcast to the appropriate stream

View File

@ -3,7 +3,7 @@ package features
import (
"context"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/live/model"
"github.com/grafana/grafana/pkg/services/live/orgchannel"
"github.com/grafana/grafana/pkg/services/live/runstream"
"github.com/grafana/grafana/pkg/services/user"
@ -39,7 +39,7 @@ func NewPluginRunner(pluginID string, datasourceUID string, runStreamManager *ru
}
// GetHandlerForPath gets the handler for a path.
func (m *PluginRunner) GetHandlerForPath(path string) (models.ChannelHandler, error) {
func (m *PluginRunner) GetHandlerForPath(path string) (model.ChannelHandler, error) {
return &PluginPathRunner{
path: path,
pluginID: m.pluginID,
@ -61,15 +61,15 @@ type PluginPathRunner struct {
}
// OnSubscribe passes control to a plugin.
func (r *PluginPathRunner) OnSubscribe(ctx context.Context, user *user.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) {
func (r *PluginPathRunner) OnSubscribe(ctx context.Context, user *user.SignedInUser, e model.SubscribeEvent) (model.SubscribeReply, backend.SubscribeStreamStatus, error) {
pCtx, found, err := r.pluginContextGetter.GetPluginContext(ctx, user, r.pluginID, r.datasourceUID, false)
if err != nil {
logger.Error("Get plugin context error", "error", err, "path", r.path)
return models.SubscribeReply{}, 0, err
return model.SubscribeReply{}, 0, err
}
if !found {
logger.Error("Plugin context not found", "path", r.path)
return models.SubscribeReply{}, 0, centrifuge.ErrorInternal
return model.SubscribeReply{}, 0, centrifuge.ErrorInternal
}
resp, err := r.handler.SubscribeStream(ctx, &backend.SubscribeStreamRequest{
PluginContext: pCtx,
@ -78,16 +78,16 @@ func (r *PluginPathRunner) OnSubscribe(ctx context.Context, user *user.SignedInU
})
if err != nil {
logger.Error("Plugin OnSubscribe call error", "error", err, "path", r.path)
return models.SubscribeReply{}, 0, err
return model.SubscribeReply{}, 0, err
}
if resp.Status != backend.SubscribeStreamStatusOK {
return models.SubscribeReply{}, resp.Status, nil
return model.SubscribeReply{}, resp.Status, nil
}
submitResult, err := r.runStreamManager.SubmitStream(ctx, user, orgchannel.PrependOrgID(user.OrgID, e.Channel), r.path, e.Data, pCtx, r.handler, false)
if err != nil {
logger.Error("Error submitting stream to manager", "error", err, "path", r.path)
return models.SubscribeReply{}, 0, centrifuge.ErrorInternal
return model.SubscribeReply{}, 0, centrifuge.ErrorInternal
}
if submitResult.StreamExists {
logger.Debug("Skip running new stream (already exists)", "path", r.path)
@ -95,7 +95,7 @@ func (r *PluginPathRunner) OnSubscribe(ctx context.Context, user *user.SignedInU
logger.Debug("Running a new unidirectional stream", "path", r.path)
}
reply := models.SubscribeReply{
reply := model.SubscribeReply{
Presence: true,
}
if resp.InitialData != nil {
@ -105,15 +105,15 @@ func (r *PluginPathRunner) OnSubscribe(ctx context.Context, user *user.SignedInU
}
// OnPublish passes control to a plugin.
func (r *PluginPathRunner) OnPublish(ctx context.Context, user *user.SignedInUser, e models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) {
func (r *PluginPathRunner) OnPublish(ctx context.Context, user *user.SignedInUser, e model.PublishEvent) (model.PublishReply, backend.PublishStreamStatus, error) {
pCtx, found, err := r.pluginContextGetter.GetPluginContext(ctx, user, r.pluginID, r.datasourceUID, false)
if err != nil {
logger.Error("Get plugin context error", "error", err, "path", r.path)
return models.PublishReply{}, 0, err
return model.PublishReply{}, 0, err
}
if !found {
logger.Error("Plugin context not found", "path", r.path)
return models.PublishReply{}, 0, centrifuge.ErrorInternal
return model.PublishReply{}, 0, centrifuge.ErrorInternal
}
resp, err := r.handler.PublishStream(ctx, &backend.PublishStreamRequest{
PluginContext: pCtx,
@ -122,10 +122,10 @@ func (r *PluginPathRunner) OnPublish(ctx context.Context, user *user.SignedInUse
})
if err != nil {
logger.Error("Plugin OnPublish call error", "error", err, "path", r.path)
return models.PublishReply{}, 0, err
return model.PublishReply{}, 0, err
}
if resp.Status != backend.PublishStreamStatusOK {
return models.PublishReply{}, resp.Status, nil
return model.PublishReply{}, resp.Status, nil
}
return models.PublishReply{Data: resp.Data}, backend.PublishStreamStatusOK, nil
return model.PublishReply{Data: resp.Data}, backend.PublishStreamStatusOK, nil
}

View File

@ -36,6 +36,7 @@ import (
"github.com/grafana/grafana/pkg/services/live/livecontext"
"github.com/grafana/grafana/pkg/services/live/liveplugin"
"github.com/grafana/grafana/pkg/services/live/managedstream"
"github.com/grafana/grafana/pkg/services/live/model"
"github.com/grafana/grafana/pkg/services/live/orgchannel"
"github.com/grafana/grafana/pkg/services/live/pipeline"
"github.com/grafana/grafana/pkg/services/live/pushws"
@ -66,7 +67,7 @@ var (
// CoreGrafanaScope list of core features
type CoreGrafanaScope struct {
Features map[string]models.ChannelHandlerFactory
Features map[string]model.ChannelHandlerFactory
// The generic service to advertise dashboard changes
Dashboards DashboardActivityChannel
@ -89,9 +90,9 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r
SQLStore: sqlStore,
SecretsService: secretsService,
queryDataService: queryDataService,
channels: make(map[string]models.ChannelHandler),
channels: make(map[string]model.ChannelHandler),
GrafanaScope: CoreGrafanaScope{
Features: make(map[string]models.ChannelHandlerFactory),
Features: make(map[string]model.ChannelHandlerFactory),
},
usageStatsService: usageStatsService,
orgService: orgService,
@ -419,7 +420,7 @@ type GrafanaLive struct {
pushPipelineWebsocketHandler interface{}
// Full channel handler
channels map[string]models.ChannelHandler
channels map[string]model.ChannelHandler
channelsMu sync.RWMutex
// The core internal features
@ -649,7 +650,7 @@ func (g *GrafanaLive) handleOnSubscribe(ctx context.Context, client *centrifuge.
return centrifuge.SubscribeReply{}, centrifuge.ErrorPermissionDenied
}
var reply models.SubscribeReply
var reply model.SubscribeReply
var status backend.SubscribeStreamStatus
var ruleFound bool
@ -701,7 +702,7 @@ func (g *GrafanaLive) handleOnSubscribe(ctx context.Context, client *centrifuge.
logger.Error("Error getting channel handler", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal
}
reply, status, err = handler.OnSubscribe(client.Context(), user, models.SubscribeEvent{
reply, status, err = handler.OnSubscribe(client.Context(), user, model.SubscribeEvent{
Channel: channel,
Path: addr.Path,
Data: e.Data,
@ -795,7 +796,7 @@ func (g *GrafanaLive) handleOnPublish(ctx context.Context, client *centrifuge.Cl
logger.Error("Error getting channel handler", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
return centrifuge.PublishReply{}, centrifuge.ErrorInternal
}
reply, status, err := handler.OnPublish(client.Context(), user, models.PublishEvent{
reply, status, err := handler.OnPublish(client.Context(), user, model.PublishEvent{
Channel: channel,
Path: addr.Path,
Data: e.Data,
@ -856,7 +857,7 @@ func publishStatusToHTTPError(status backend.PublishStreamStatus) (int, string)
}
// GetChannelHandler gives thread-safe access to the channel.
func (g *GrafanaLive) GetChannelHandler(ctx context.Context, user *user.SignedInUser, channel string) (models.ChannelHandler, live.Channel, error) {
func (g *GrafanaLive) GetChannelHandler(ctx context.Context, user *user.SignedInUser, channel string) (model.ChannelHandler, live.Channel, error) {
// Parse the identifier ${scope}/${namespace}/${path}
addr, err := live.ParseChannel(channel)
if err != nil {
@ -897,7 +898,7 @@ func (g *GrafanaLive) GetChannelHandler(ctx context.Context, user *user.SignedIn
// GetChannelHandlerFactory gets a ChannelHandlerFactory for a namespace.
// It gives thread-safe access to the channel.
func (g *GrafanaLive) GetChannelHandlerFactory(ctx context.Context, user *user.SignedInUser, scope string, namespace string) (models.ChannelHandlerFactory, error) {
func (g *GrafanaLive) GetChannelHandlerFactory(ctx context.Context, user *user.SignedInUser, scope string, namespace string) (model.ChannelHandlerFactory, error) {
switch scope {
case live.ScopeGrafana:
return g.handleGrafanaScope(user, namespace)
@ -912,14 +913,14 @@ func (g *GrafanaLive) GetChannelHandlerFactory(ctx context.Context, user *user.S
}
}
func (g *GrafanaLive) handleGrafanaScope(_ *user.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) {
func (g *GrafanaLive) handleGrafanaScope(_ *user.SignedInUser, namespace string) (model.ChannelHandlerFactory, error) {
if p, ok := g.GrafanaScope.Features[namespace]; ok {
return p, nil
}
return nil, fmt.Errorf("unknown feature: %q", namespace)
}
func (g *GrafanaLive) handlePluginScope(ctx context.Context, _ *user.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) {
func (g *GrafanaLive) handlePluginScope(ctx context.Context, _ *user.SignedInUser, namespace string) (model.ChannelHandlerFactory, error) {
streamHandler, err := g.getStreamPlugin(ctx, namespace)
if err != nil {
return nil, fmt.Errorf("can't find stream plugin: %s", namespace)
@ -933,11 +934,11 @@ func (g *GrafanaLive) handlePluginScope(ctx context.Context, _ *user.SignedInUse
), nil
}
func (g *GrafanaLive) handleStreamScope(u *user.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) {
func (g *GrafanaLive) handleStreamScope(u *user.SignedInUser, namespace string) (model.ChannelHandlerFactory, error) {
return g.ManagedStreamRunner.GetOrCreateStream(u.OrgID, live.ScopeStream, namespace)
}
func (g *GrafanaLive) handleDatasourceScope(ctx context.Context, user *user.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) {
func (g *GrafanaLive) handleDatasourceScope(ctx context.Context, user *user.SignedInUser, namespace string) (model.ChannelHandlerFactory, error) {
ds, err := g.DataSourceCache.GetDatasourceByUID(ctx, namespace, user, false)
if err != nil {
return nil, fmt.Errorf("error getting datasource: %w", err)
@ -1020,7 +1021,7 @@ func (g *GrafanaLive) HandleHTTPPublish(ctx *models.ReqContext) response.Respons
return response.Error(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), nil)
}
reply, status, err := channelHandler.OnPublish(ctx.Req.Context(), ctx.SignedInUser, models.PublishEvent{Channel: cmd.Channel, Path: addr.Path, Data: cmd.Data})
reply, status, err := channelHandler.OnPublish(ctx.Req.Context(), ctx.SignedInUser, model.PublishEvent{Channel: cmd.Channel, Path: addr.Path, Data: cmd.Data})
if err != nil {
logger.Error("Error calling OnPublish", "error", err, "channel", cmd.Channel)
return response.Error(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), nil)

View File

@ -8,6 +8,7 @@ import (
"sync"
"time"
"github.com/grafana/grafana/pkg/services/live/model"
"github.com/grafana/grafana/pkg/services/live/orgchannel"
"github.com/grafana/grafana/pkg/services/user"
@ -15,7 +16,6 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/live"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
)
var (
@ -39,7 +39,7 @@ var (
type Runner struct {
mu sync.RWMutex
streams map[int64]map[string]*NamespaceStream
publisher models.ChannelPublisher
publisher model.ChannelPublisher
localPublisher LocalPublisher
frameCache FrameCache
}
@ -49,7 +49,7 @@ type LocalPublisher interface {
}
// NewRunner creates new Runner.
func NewRunner(publisher models.ChannelPublisher, localPublisher LocalPublisher, frameCache FrameCache) *Runner {
func NewRunner(publisher model.ChannelPublisher, localPublisher LocalPublisher, frameCache FrameCache) *Runner {
return &Runner{
publisher: publisher,
localPublisher: localPublisher,
@ -136,7 +136,7 @@ type NamespaceStream struct {
orgID int64
scope string
namespace string
publisher models.ChannelPublisher
publisher model.ChannelPublisher
localPublisher LocalPublisher
frameCache FrameCache
rateMu sync.RWMutex
@ -156,7 +156,7 @@ type ManagedChannel struct {
}
// NewNamespaceStream creates new NamespaceStream.
func NewNamespaceStream(orgID int64, scope string, namespace string, publisher models.ChannelPublisher, localPublisher LocalPublisher, schemaUpdater FrameCache) *NamespaceStream {
func NewNamespaceStream(orgID int64, scope string, namespace string, publisher model.ChannelPublisher, localPublisher LocalPublisher, schemaUpdater FrameCache) *NamespaceStream {
return &NamespaceStream{
orgID: orgID,
scope: scope,
@ -235,12 +235,12 @@ func (s *NamespaceStream) minuteRate(path string) int64 {
return total
}
func (s *NamespaceStream) GetHandlerForPath(_ string) (models.ChannelHandler, error) {
func (s *NamespaceStream) GetHandlerForPath(_ string) (model.ChannelHandler, error) {
return s, nil
}
func (s *NamespaceStream) OnSubscribe(ctx context.Context, u *user.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) {
reply := models.SubscribeReply{}
func (s *NamespaceStream) OnSubscribe(ctx context.Context, u *user.SignedInUser, e model.SubscribeEvent) (model.SubscribeReply, backend.SubscribeStreamStatus, error) {
reply := model.SubscribeReply{}
frameJSON, ok, err := s.frameCache.GetFrame(ctx, u.OrgID, e.Channel)
if err != nil {
return reply, 0, err
@ -251,6 +251,6 @@ func (s *NamespaceStream) OnSubscribe(ctx context.Context, u *user.SignedInUser,
return reply, backend.SubscribeStreamStatusOK, nil
}
func (s *NamespaceStream) OnPublish(_ context.Context, _ *user.SignedInUser, _ models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) {
return models.PublishReply{}, backend.PublishStreamStatusPermissionDenied, nil
func (s *NamespaceStream) OnPublish(_ context.Context, _ *user.SignedInUser, _ model.PublishEvent) (model.PublishReply, backend.PublishStreamStatus, error) {
return model.PublishReply{}, backend.PublishStreamStatusPermissionDenied, nil
}

View File

@ -0,0 +1,87 @@
package model
import (
"context"
"encoding/json"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/services/user"
)
// ChannelPublisher writes data into a channel. Note that permissions are not checked.
type ChannelPublisher func(orgID int64, channel string, data []byte) error
// ChannelClientCount will return the number of clients for a channel
type ChannelClientCount func(orgID int64, channel string) (int, error)
// SubscribeEvent contains subscription data.
type SubscribeEvent struct {
Channel string
Path string
Data json.RawMessage
}
// SubscribeReply is a reaction to SubscribeEvent.
type SubscribeReply struct {
Presence bool
JoinLeave bool
Recover bool
Data json.RawMessage
}
// PublishEvent contains publication data.
type PublishEvent struct {
Channel string
Path string
Data json.RawMessage
}
// PublishReply is a reaction to PublishEvent.
type PublishReply struct {
// By default, it's a handler responsibility to publish data
// into a stream upon OnPublish but returning a data here
// will make Grafana Live publish data itself (i.e. stream handler
// just works as permission proxy in this case).
Data json.RawMessage
// HistorySize sets a stream history size.
HistorySize int
// HistoryTTL is a time that messages will live in stream history.
HistoryTTL time.Duration
}
// ChannelHandler defines the core channel behavior
type ChannelHandler interface {
// OnSubscribe is called when a client wants to subscribe to a channel
OnSubscribe(ctx context.Context, user *user.SignedInUser, e SubscribeEvent) (SubscribeReply, backend.SubscribeStreamStatus, error)
// OnPublish is called when a client writes a message to the channel websocket.
OnPublish(ctx context.Context, user *user.SignedInUser, e PublishEvent) (PublishReply, backend.PublishStreamStatus, error)
}
// ChannelHandlerFactory should be implemented by all core features.
type ChannelHandlerFactory interface {
// GetHandlerForPath gets a ChannelHandler for a path.
// This is called fast and often -- it must be synchronized
GetHandlerForPath(path string) (ChannelHandler, error)
}
type LiveMessage struct {
Id int64
OrgId int64
Channel string
Data json.RawMessage
Published time.Time
}
type SaveLiveMessageQuery struct {
OrgId int64
Channel string
Data json.RawMessage
}
type GetLiveMessageQuery struct {
OrgId int64
Channel string
}

View File

@ -4,8 +4,8 @@ import (
"context"
"errors"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/live/livecontext"
"github.com/grafana/grafana/pkg/services/live/model"
"github.com/grafana/grafana-plugin-sdk-go/backend"
)
@ -33,7 +33,7 @@ func (s *BuiltinDataOutput) OutputData(ctx context.Context, vars Vars, data []by
if err != nil {
return nil, err
}
_, status, err := handler.OnPublish(ctx, u, models.PublishEvent{
_, status, err := handler.OnPublish(ctx, u, model.PublishEvent{
Channel: vars.Channel,
Data: data,
})

View File

@ -6,7 +6,7 @@ import (
"fmt"
"os"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/live/model"
"github.com/grafana/grafana/pkg/services/user"
"github.com/grafana/grafana-plugin-sdk-go/backend"
@ -108,7 +108,7 @@ type FrameOutputter interface {
// Subscriber can handle channel subscribe events.
type Subscriber interface {
Type() string
Subscribe(ctx context.Context, vars Vars, data []byte) (models.SubscribeReply, backend.SubscribeStreamStatus, error)
Subscribe(ctx context.Context, vars Vars, data []byte) (model.SubscribeReply, backend.SubscribeStreamStatus, error)
}
// PublishAuthChecker checks whether current user can publish to a channel.

View File

@ -3,8 +3,8 @@ package pipeline
import (
"context"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/live/livecontext"
"github.com/grafana/grafana/pkg/services/live/model"
"github.com/grafana/grafana/pkg/services/user"
"github.com/grafana/grafana-plugin-sdk-go/backend"
@ -16,7 +16,7 @@ type BuiltinSubscriber struct {
}
type ChannelHandlerGetter interface {
GetChannelHandler(ctx context.Context, user *user.SignedInUser, channel string) (models.ChannelHandler, live.Channel, error)
GetChannelHandler(ctx context.Context, user *user.SignedInUser, channel string) (model.ChannelHandler, live.Channel, error)
}
const SubscriberTypeBuiltin = "builtin"
@ -29,16 +29,16 @@ func (s *BuiltinSubscriber) Type() string {
return SubscriberTypeBuiltin
}
func (s *BuiltinSubscriber) Subscribe(ctx context.Context, vars Vars, data []byte) (models.SubscribeReply, backend.SubscribeStreamStatus, error) {
func (s *BuiltinSubscriber) Subscribe(ctx context.Context, vars Vars, data []byte) (model.SubscribeReply, backend.SubscribeStreamStatus, error) {
u, ok := livecontext.GetContextSignedUser(ctx)
if !ok {
return models.SubscribeReply{}, backend.SubscribeStreamStatusPermissionDenied, nil
return model.SubscribeReply{}, backend.SubscribeStreamStatusPermissionDenied, nil
}
handler, _, err := s.channelHandlerGetter.GetChannelHandler(ctx, u, vars.Channel)
if err != nil {
return models.SubscribeReply{}, 0, err
return model.SubscribeReply{}, 0, err
}
return handler.OnSubscribe(ctx, u, models.SubscribeEvent{
return handler.OnSubscribe(ctx, u, model.SubscribeEvent{
Channel: vars.Channel,
Path: vars.Path,
Data: data,

View File

@ -5,9 +5,9 @@ import (
"github.com/grafana/grafana/pkg/services/live/livecontext"
"github.com/grafana/grafana/pkg/services/live/managedstream"
"github.com/grafana/grafana/pkg/services/live/model"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/models"
)
type ManagedStreamSubscriber struct {
@ -24,17 +24,17 @@ func (s *ManagedStreamSubscriber) Type() string {
return SubscriberTypeManagedStream
}
func (s *ManagedStreamSubscriber) Subscribe(ctx context.Context, vars Vars, _ []byte) (models.SubscribeReply, backend.SubscribeStreamStatus, error) {
func (s *ManagedStreamSubscriber) Subscribe(ctx context.Context, vars Vars, _ []byte) (model.SubscribeReply, backend.SubscribeStreamStatus, error) {
stream, err := s.managedStream.GetOrCreateStream(vars.OrgID, vars.Scope, vars.Namespace)
if err != nil {
logger.Error("Error getting managed stream", "error", err)
return models.SubscribeReply{}, 0, err
return model.SubscribeReply{}, 0, err
}
u, ok := livecontext.GetContextSignedUser(ctx)
if !ok {
return models.SubscribeReply{}, backend.SubscribeStreamStatusPermissionDenied, nil
return model.SubscribeReply{}, backend.SubscribeStreamStatusPermissionDenied, nil
}
return stream.OnSubscribe(ctx, u, models.SubscribeEvent{
return stream.OnSubscribe(ctx, u, model.SubscribeEvent{
Channel: vars.Channel,
Path: vars.Path,
})

View File

@ -4,7 +4,7 @@ import (
"context"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/live/model"
)
type MultipleSubscriber struct {
@ -21,16 +21,16 @@ func (s *MultipleSubscriber) Type() string {
return SubscriberTypeMultiple
}
func (s *MultipleSubscriber) Subscribe(ctx context.Context, vars Vars, data []byte) (models.SubscribeReply, backend.SubscribeStreamStatus, error) {
finalReply := models.SubscribeReply{}
func (s *MultipleSubscriber) Subscribe(ctx context.Context, vars Vars, data []byte) (model.SubscribeReply, backend.SubscribeStreamStatus, error) {
finalReply := model.SubscribeReply{}
for _, s := range s.Subscribers {
reply, status, err := s.Subscribe(ctx, vars, data)
if err != nil {
return models.SubscribeReply{}, 0, err
return model.SubscribeReply{}, 0, err
}
if status != backend.SubscribeStreamStatusOK {
return models.SubscribeReply{}, status, nil
return model.SubscribeReply{}, status, nil
}
if finalReply.Data == nil {
finalReply.Data = reply.Data