diff --git a/pkg/models/live.go b/pkg/models/live.go deleted file mode 100644 index c8990fa2c27..00000000000 --- a/pkg/models/live.go +++ /dev/null @@ -1,87 +0,0 @@ -package models - -import ( - "context" - "encoding/json" - "time" - - "github.com/grafana/grafana-plugin-sdk-go/backend" - - "github.com/grafana/grafana/pkg/services/user" -) - -// ChannelPublisher writes data into a channel. Note that permissions are not checked. -type ChannelPublisher func(orgID int64, channel string, data []byte) error - -// ChannelClientCount will return the number of clients for a channel -type ChannelClientCount func(orgID int64, channel string) (int, error) - -// SubscribeEvent contains subscription data. -type SubscribeEvent struct { - Channel string - Path string - Data json.RawMessage -} - -// SubscribeReply is a reaction to SubscribeEvent. -type SubscribeReply struct { - Presence bool - JoinLeave bool - Recover bool - Data json.RawMessage -} - -// PublishEvent contains publication data. -type PublishEvent struct { - Channel string - Path string - Data json.RawMessage -} - -// PublishReply is a reaction to PublishEvent. -type PublishReply struct { - // By default, it's a handler responsibility to publish data - // into a stream upon OnPublish but returning a data here - // will make Grafana Live publish data itself (i.e. stream handler - // just works as permission proxy in this case). - Data json.RawMessage - // HistorySize sets a stream history size. - HistorySize int - // HistoryTTL is a time that messages will live in stream history. - HistoryTTL time.Duration -} - -// ChannelHandler defines the core channel behavior -type ChannelHandler interface { - // OnSubscribe is called when a client wants to subscribe to a channel - OnSubscribe(ctx context.Context, user *user.SignedInUser, e SubscribeEvent) (SubscribeReply, backend.SubscribeStreamStatus, error) - - // OnPublish is called when a client writes a message to the channel websocket. - OnPublish(ctx context.Context, user *user.SignedInUser, e PublishEvent) (PublishReply, backend.PublishStreamStatus, error) -} - -// ChannelHandlerFactory should be implemented by all core features. -type ChannelHandlerFactory interface { - // GetHandlerForPath gets a ChannelHandler for a path. - // This is called fast and often -- it must be synchronized - GetHandlerForPath(path string) (ChannelHandler, error) -} - -type LiveMessage struct { - Id int64 - OrgId int64 - Channel string - Data json.RawMessage - Published time.Time -} - -type SaveLiveMessageQuery struct { - OrgId int64 - Channel string - Data json.RawMessage -} - -type GetLiveMessageQuery struct { - OrgId int64 - Channel string -} diff --git a/pkg/services/live/database/storage.go b/pkg/services/live/database/storage.go index a88c0ff7424..9d829a5f9ad 100644 --- a/pkg/services/live/database/storage.go +++ b/pkg/services/live/database/storage.go @@ -34,9 +34,9 @@ func (s *Storage) SaveLiveMessage(query *model.SaveLiveMessageQuery) error { // return err //}) // return err - s.cache.Set(getLiveMessageCacheKey(query.OrgId, query.Channel), model.LiveMessage{ - Id: 0, // Not used actually. - OrgId: query.OrgId, + s.cache.Set(getLiveMessageCacheKey(query.OrgID, query.Channel), model.LiveMessage{ + ID: 0, // Not used actually. + OrgID: query.OrgID, Channel: query.Channel, Data: query.Data, Published: time.Now(), @@ -54,7 +54,7 @@ func (s *Storage) GetLiveMessage(query *model.GetLiveMessageQuery) (model.LiveMe // return err //}) //return msg, exists, err - m, ok := s.cache.Get(getLiveMessageCacheKey(query.OrgId, query.Channel)) + m, ok := s.cache.Get(getLiveMessageCacheKey(query.OrgID, query.Channel)) if !ok { return model.LiveMessage{}, false, nil } diff --git a/pkg/services/live/database/tests/storage_test.go b/pkg/services/live/database/tests/storage_test.go index e6727d99b60..0f107dcc098 100644 --- a/pkg/services/live/database/tests/storage_test.go +++ b/pkg/services/live/database/tests/storage_test.go @@ -15,7 +15,7 @@ func TestIntegrationLiveMessage(t *testing.T) { storage := SetupTestStorage(t) getQuery := &model.GetLiveMessageQuery{ - OrgId: 1, + OrgID: 1, Channel: "test_channel", } _, ok, err := storage.GetLiveMessage(getQuery) @@ -23,7 +23,7 @@ func TestIntegrationLiveMessage(t *testing.T) { require.False(t, ok) saveQuery := &model.SaveLiveMessageQuery{ - OrgId: 1, + OrgID: 1, Channel: "test_channel", Data: []byte(`{}`), } @@ -33,14 +33,14 @@ func TestIntegrationLiveMessage(t *testing.T) { msg, ok, err := storage.GetLiveMessage(getQuery) require.NoError(t, err) require.True(t, ok) - require.Equal(t, int64(1), msg.OrgId) + require.Equal(t, int64(1), msg.OrgID) require.Equal(t, "test_channel", msg.Channel) require.Equal(t, json.RawMessage(`{}`), msg.Data) require.NotZero(t, msg.Published) // try saving again, should be replaced. saveQuery2 := &model.SaveLiveMessageQuery{ - OrgId: 1, + OrgID: 1, Channel: "test_channel", Data: []byte(`{"input": "hello"}`), } @@ -48,13 +48,13 @@ func TestIntegrationLiveMessage(t *testing.T) { require.NoError(t, err) getQuery2 := &model.GetLiveMessageQuery{ - OrgId: 1, + OrgID: 1, Channel: "test_channel", } msg2, ok, err := storage.GetLiveMessage(getQuery2) require.NoError(t, err) require.True(t, ok) - require.Equal(t, int64(1), msg2.OrgId) + require.Equal(t, int64(1), msg2.OrgID) require.Equal(t, "test_channel", msg2.Channel) require.Equal(t, json.RawMessage(`{"input": "hello"}`), msg2.Data) require.NotZero(t, msg2.Published) diff --git a/pkg/services/live/features/broadcast.go b/pkg/services/live/features/broadcast.go index 2a2570a1e49..c479a82d505 100644 --- a/pkg/services/live/features/broadcast.go +++ b/pkg/services/live/features/broadcast.go @@ -43,7 +43,7 @@ func (b *BroadcastRunner) OnSubscribe(_ context.Context, u *user.SignedInUser, e JoinLeave: true, } query := &model.GetLiveMessageQuery{ - OrgId: u.OrgID, + OrgID: u.OrgID, Channel: e.Channel, } msg, ok, err := b.liveMessageStore.GetLiveMessage(query) @@ -59,7 +59,7 @@ func (b *BroadcastRunner) OnSubscribe(_ context.Context, u *user.SignedInUser, e // OnPublish is called when a client wants to broadcast on the websocket func (b *BroadcastRunner) OnPublish(_ context.Context, u *user.SignedInUser, e model.PublishEvent) (model.PublishReply, backend.PublishStreamStatus, error) { query := &model.SaveLiveMessageQuery{ - OrgId: u.OrgID, + OrgID: u.OrgID, Channel: e.Channel, Data: e.Data, } diff --git a/pkg/services/live/features/broadcast_test.go b/pkg/services/live/features/broadcast_test.go index e832dbceaf9..459f07e2b2f 100644 --- a/pkg/services/live/features/broadcast_test.go +++ b/pkg/services/live/features/broadcast_test.go @@ -29,7 +29,7 @@ func TestBroadcastRunner_OnSubscribe(t *testing.T) { data := json.RawMessage(`{}`) mockDispatcher.EXPECT().GetLiveMessage(&model.GetLiveMessageQuery{ - OrgId: 1, + OrgID: 1, Channel: channel, }).DoAndReturn(func(query *model.GetLiveMessageQuery) (model.LiveMessage, bool, error) { return model.LiveMessage{ @@ -64,7 +64,7 @@ func TestBroadcastRunner_OnPublish(t *testing.T) { var orgID int64 = 1 mockDispatcher.EXPECT().SaveLiveMessage(&model.SaveLiveMessageQuery{ - OrgId: orgID, + OrgID: orgID, Channel: channel, Data: data, }).DoAndReturn(func(query *model.SaveLiveMessageQuery) error { diff --git a/pkg/services/live/model/model.go b/pkg/services/live/model/model.go index 251c6fe0813..612650fae11 100644 --- a/pkg/services/live/model/model.go +++ b/pkg/services/live/model/model.go @@ -68,20 +68,20 @@ type ChannelHandlerFactory interface { } type LiveMessage struct { - Id int64 - OrgId int64 + ID int64 `xorm:"pk autoincr 'id'"` + OrgID int64 `xorm:"org_id"` Channel string Data json.RawMessage Published time.Time } type SaveLiveMessageQuery struct { - OrgId int64 + OrgID int64 `xorm:"org_id"` Channel string Data json.RawMessage } type GetLiveMessageQuery struct { - OrgId int64 + OrgID int64 `xorm:"org_id"` Channel string }