Remove live.go from models (#61742)

* Remove live.go from models

* Change Id to ID

* Add xorm tags
This commit is contained in:
idafurjes 2023-01-19 18:10:40 +01:00 committed by GitHub
parent c104cc7020
commit 8cbcdf1c26
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 18 additions and 105 deletions

View File

@ -1,87 +0,0 @@
package models
import (
"context"
"encoding/json"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/services/user"
)
// ChannelPublisher writes data into a channel. Note that permissions are not checked.
type ChannelPublisher func(orgID int64, channel string, data []byte) error
// ChannelClientCount will return the number of clients for a channel
type ChannelClientCount func(orgID int64, channel string) (int, error)
// SubscribeEvent contains subscription data.
type SubscribeEvent struct {
Channel string
Path string
Data json.RawMessage
}
// SubscribeReply is a reaction to SubscribeEvent.
type SubscribeReply struct {
Presence bool
JoinLeave bool
Recover bool
Data json.RawMessage
}
// PublishEvent contains publication data.
type PublishEvent struct {
Channel string
Path string
Data json.RawMessage
}
// PublishReply is a reaction to PublishEvent.
type PublishReply struct {
// By default, it's a handler responsibility to publish data
// into a stream upon OnPublish but returning a data here
// will make Grafana Live publish data itself (i.e. stream handler
// just works as permission proxy in this case).
Data json.RawMessage
// HistorySize sets a stream history size.
HistorySize int
// HistoryTTL is a time that messages will live in stream history.
HistoryTTL time.Duration
}
// ChannelHandler defines the core channel behavior
type ChannelHandler interface {
// OnSubscribe is called when a client wants to subscribe to a channel
OnSubscribe(ctx context.Context, user *user.SignedInUser, e SubscribeEvent) (SubscribeReply, backend.SubscribeStreamStatus, error)
// OnPublish is called when a client writes a message to the channel websocket.
OnPublish(ctx context.Context, user *user.SignedInUser, e PublishEvent) (PublishReply, backend.PublishStreamStatus, error)
}
// ChannelHandlerFactory should be implemented by all core features.
type ChannelHandlerFactory interface {
// GetHandlerForPath gets a ChannelHandler for a path.
// This is called fast and often -- it must be synchronized
GetHandlerForPath(path string) (ChannelHandler, error)
}
type LiveMessage struct {
Id int64
OrgId int64
Channel string
Data json.RawMessage
Published time.Time
}
type SaveLiveMessageQuery struct {
OrgId int64
Channel string
Data json.RawMessage
}
type GetLiveMessageQuery struct {
OrgId int64
Channel string
}

View File

@ -34,9 +34,9 @@ func (s *Storage) SaveLiveMessage(query *model.SaveLiveMessageQuery) error {
// return err // return err
//}) //})
// return err // return err
s.cache.Set(getLiveMessageCacheKey(query.OrgId, query.Channel), model.LiveMessage{ s.cache.Set(getLiveMessageCacheKey(query.OrgID, query.Channel), model.LiveMessage{
Id: 0, // Not used actually. ID: 0, // Not used actually.
OrgId: query.OrgId, OrgID: query.OrgID,
Channel: query.Channel, Channel: query.Channel,
Data: query.Data, Data: query.Data,
Published: time.Now(), Published: time.Now(),
@ -54,7 +54,7 @@ func (s *Storage) GetLiveMessage(query *model.GetLiveMessageQuery) (model.LiveMe
// return err // return err
//}) //})
//return msg, exists, err //return msg, exists, err
m, ok := s.cache.Get(getLiveMessageCacheKey(query.OrgId, query.Channel)) m, ok := s.cache.Get(getLiveMessageCacheKey(query.OrgID, query.Channel))
if !ok { if !ok {
return model.LiveMessage{}, false, nil return model.LiveMessage{}, false, nil
} }

View File

@ -15,7 +15,7 @@ func TestIntegrationLiveMessage(t *testing.T) {
storage := SetupTestStorage(t) storage := SetupTestStorage(t)
getQuery := &model.GetLiveMessageQuery{ getQuery := &model.GetLiveMessageQuery{
OrgId: 1, OrgID: 1,
Channel: "test_channel", Channel: "test_channel",
} }
_, ok, err := storage.GetLiveMessage(getQuery) _, ok, err := storage.GetLiveMessage(getQuery)
@ -23,7 +23,7 @@ func TestIntegrationLiveMessage(t *testing.T) {
require.False(t, ok) require.False(t, ok)
saveQuery := &model.SaveLiveMessageQuery{ saveQuery := &model.SaveLiveMessageQuery{
OrgId: 1, OrgID: 1,
Channel: "test_channel", Channel: "test_channel",
Data: []byte(`{}`), Data: []byte(`{}`),
} }
@ -33,14 +33,14 @@ func TestIntegrationLiveMessage(t *testing.T) {
msg, ok, err := storage.GetLiveMessage(getQuery) msg, ok, err := storage.GetLiveMessage(getQuery)
require.NoError(t, err) require.NoError(t, err)
require.True(t, ok) require.True(t, ok)
require.Equal(t, int64(1), msg.OrgId) require.Equal(t, int64(1), msg.OrgID)
require.Equal(t, "test_channel", msg.Channel) require.Equal(t, "test_channel", msg.Channel)
require.Equal(t, json.RawMessage(`{}`), msg.Data) require.Equal(t, json.RawMessage(`{}`), msg.Data)
require.NotZero(t, msg.Published) require.NotZero(t, msg.Published)
// try saving again, should be replaced. // try saving again, should be replaced.
saveQuery2 := &model.SaveLiveMessageQuery{ saveQuery2 := &model.SaveLiveMessageQuery{
OrgId: 1, OrgID: 1,
Channel: "test_channel", Channel: "test_channel",
Data: []byte(`{"input": "hello"}`), Data: []byte(`{"input": "hello"}`),
} }
@ -48,13 +48,13 @@ func TestIntegrationLiveMessage(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
getQuery2 := &model.GetLiveMessageQuery{ getQuery2 := &model.GetLiveMessageQuery{
OrgId: 1, OrgID: 1,
Channel: "test_channel", Channel: "test_channel",
} }
msg2, ok, err := storage.GetLiveMessage(getQuery2) msg2, ok, err := storage.GetLiveMessage(getQuery2)
require.NoError(t, err) require.NoError(t, err)
require.True(t, ok) require.True(t, ok)
require.Equal(t, int64(1), msg2.OrgId) require.Equal(t, int64(1), msg2.OrgID)
require.Equal(t, "test_channel", msg2.Channel) require.Equal(t, "test_channel", msg2.Channel)
require.Equal(t, json.RawMessage(`{"input": "hello"}`), msg2.Data) require.Equal(t, json.RawMessage(`{"input": "hello"}`), msg2.Data)
require.NotZero(t, msg2.Published) require.NotZero(t, msg2.Published)

View File

@ -43,7 +43,7 @@ func (b *BroadcastRunner) OnSubscribe(_ context.Context, u *user.SignedInUser, e
JoinLeave: true, JoinLeave: true,
} }
query := &model.GetLiveMessageQuery{ query := &model.GetLiveMessageQuery{
OrgId: u.OrgID, OrgID: u.OrgID,
Channel: e.Channel, Channel: e.Channel,
} }
msg, ok, err := b.liveMessageStore.GetLiveMessage(query) msg, ok, err := b.liveMessageStore.GetLiveMessage(query)
@ -59,7 +59,7 @@ func (b *BroadcastRunner) OnSubscribe(_ context.Context, u *user.SignedInUser, e
// OnPublish is called when a client wants to broadcast on the websocket // OnPublish is called when a client wants to broadcast on the websocket
func (b *BroadcastRunner) OnPublish(_ context.Context, u *user.SignedInUser, e model.PublishEvent) (model.PublishReply, backend.PublishStreamStatus, error) { func (b *BroadcastRunner) OnPublish(_ context.Context, u *user.SignedInUser, e model.PublishEvent) (model.PublishReply, backend.PublishStreamStatus, error) {
query := &model.SaveLiveMessageQuery{ query := &model.SaveLiveMessageQuery{
OrgId: u.OrgID, OrgID: u.OrgID,
Channel: e.Channel, Channel: e.Channel,
Data: e.Data, Data: e.Data,
} }

View File

@ -29,7 +29,7 @@ func TestBroadcastRunner_OnSubscribe(t *testing.T) {
data := json.RawMessage(`{}`) data := json.RawMessage(`{}`)
mockDispatcher.EXPECT().GetLiveMessage(&model.GetLiveMessageQuery{ mockDispatcher.EXPECT().GetLiveMessage(&model.GetLiveMessageQuery{
OrgId: 1, OrgID: 1,
Channel: channel, Channel: channel,
}).DoAndReturn(func(query *model.GetLiveMessageQuery) (model.LiveMessage, bool, error) { }).DoAndReturn(func(query *model.GetLiveMessageQuery) (model.LiveMessage, bool, error) {
return model.LiveMessage{ return model.LiveMessage{
@ -64,7 +64,7 @@ func TestBroadcastRunner_OnPublish(t *testing.T) {
var orgID int64 = 1 var orgID int64 = 1
mockDispatcher.EXPECT().SaveLiveMessage(&model.SaveLiveMessageQuery{ mockDispatcher.EXPECT().SaveLiveMessage(&model.SaveLiveMessageQuery{
OrgId: orgID, OrgID: orgID,
Channel: channel, Channel: channel,
Data: data, Data: data,
}).DoAndReturn(func(query *model.SaveLiveMessageQuery) error { }).DoAndReturn(func(query *model.SaveLiveMessageQuery) error {

View File

@ -68,20 +68,20 @@ type ChannelHandlerFactory interface {
} }
type LiveMessage struct { type LiveMessage struct {
Id int64 ID int64 `xorm:"pk autoincr 'id'"`
OrgId int64 OrgID int64 `xorm:"org_id"`
Channel string Channel string
Data json.RawMessage Data json.RawMessage
Published time.Time Published time.Time
} }
type SaveLiveMessageQuery struct { type SaveLiveMessageQuery struct {
OrgId int64 OrgID int64 `xorm:"org_id"`
Channel string Channel string
Data json.RawMessage Data json.RawMessage
} }
type GetLiveMessageQuery struct { type GetLiveMessageQuery struct {
OrgId int64 OrgID int64 `xorm:"org_id"`
Channel string Channel string
} }