mirror of
https://github.com/mattermost/mattermost.git
synced 2025-02-25 18:55:24 -06:00
Automatic Merge
This commit is contained in:
parent
427c4b53b5
commit
8e147ace93
@ -3129,7 +3129,9 @@ func TestWebHubMembership(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestWebHubCloseConnOnDBFail(t *testing.T) {
|
||||
th := Setup(t).InitBasic()
|
||||
th := SetupConfig(t, func(cfg *model.Config) {
|
||||
*cfg.ServiceSettings.EnableWebHubChannelIteration = true
|
||||
}).InitBasic()
|
||||
defer func() {
|
||||
th.TearDown()
|
||||
// Asserting that the error message is present in the log
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
"github.com/mattermost/mattermost/server/public/shared/i18n"
|
||||
"github.com/mattermost/mattermost/server/public/shared/mlog"
|
||||
"github.com/mattermost/mattermost/server/public/shared/request"
|
||||
"github.com/mattermost/mattermost/server/v8/channels/store/sqlstore"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -94,8 +95,10 @@ type WebConn struct {
|
||||
UserId string
|
||||
PostedAck bool
|
||||
|
||||
lastUserActivityAt int64
|
||||
send chan model.WebSocketMessage
|
||||
allChannelMembers map[string]string
|
||||
lastAllChannelMembersTime int64
|
||||
lastUserActivityAt int64
|
||||
send chan model.WebSocketMessage
|
||||
// deadQueue behaves like a queue of a finite size
|
||||
// which is used to store all messages that are sent via the websocket.
|
||||
// It basically acts as the user-space socket buffer, and is used
|
||||
@ -734,6 +737,8 @@ func (wc *WebConn) drainDeadQueue(index int) error {
|
||||
|
||||
// InvalidateCache resets all internal data of the WebConn.
|
||||
func (wc *WebConn) InvalidateCache() {
|
||||
wc.allChannelMembers = nil
|
||||
wc.lastAllChannelMembersTime = 0
|
||||
wc.SetSession(nil)
|
||||
wc.SetSessionExpiresAt(0)
|
||||
}
|
||||
@ -902,9 +907,36 @@ func (wc *WebConn) ShouldSendEvent(msg *model.WebSocketEvent) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// We don't need to do any further checks because this is already scoped
|
||||
// to channel members from web_hub.
|
||||
return true
|
||||
if *wc.Platform.Config().ServiceSettings.EnableWebHubChannelIteration {
|
||||
// We don't need to do any further checks because this is already scoped
|
||||
// to channel members from web_hub.
|
||||
return true
|
||||
}
|
||||
|
||||
if model.GetMillis()-wc.lastAllChannelMembersTime > webConnMemberCacheTime {
|
||||
wc.allChannelMembers = nil
|
||||
wc.lastAllChannelMembersTime = 0
|
||||
}
|
||||
|
||||
if wc.allChannelMembers == nil {
|
||||
result, err := wc.Platform.Store.Channel().GetAllChannelMembersForUser(
|
||||
sqlstore.RequestContextWithMaster(request.EmptyContext(wc.Platform.logger)),
|
||||
wc.UserId,
|
||||
false,
|
||||
false,
|
||||
)
|
||||
if err != nil {
|
||||
mlog.Error("webhub.shouldSendEvent.", mlog.Err(err))
|
||||
return false
|
||||
}
|
||||
wc.allChannelMembers = result
|
||||
wc.lastAllChannelMembersTime = model.GetMillis()
|
||||
}
|
||||
|
||||
if _, ok := wc.allChannelMembers[chID]; ok {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Only report events to users who are in the team for the event
|
||||
|
@ -406,6 +406,7 @@ func (h *Hub) Start() {
|
||||
connIndex := newHubConnectionIndex(inactiveConnReaperInterval,
|
||||
h.platform.Store,
|
||||
h.platform.logger,
|
||||
*h.platform.Config().ServiceSettings.EnableWebHubChannelIteration,
|
||||
)
|
||||
|
||||
for {
|
||||
@ -519,6 +520,11 @@ func (h *Hub) Start() {
|
||||
for _, webConn := range connIndex.ForUser(userID) {
|
||||
webConn.InvalidateCache()
|
||||
}
|
||||
|
||||
if !*h.platform.Config().ServiceSettings.EnableWebHubChannelIteration {
|
||||
continue
|
||||
}
|
||||
|
||||
err := connIndex.InvalidateCMCacheForUser(userID)
|
||||
if err != nil {
|
||||
h.platform.Log().Error("Error while invalidating channel member cache", mlog.String("user_id", userID), mlog.Err(err))
|
||||
@ -586,7 +592,7 @@ func (h *Hub) Start() {
|
||||
}
|
||||
} else if userID := msg.GetBroadcast().UserId; userID != "" {
|
||||
targetConns = connIndex.ForUser(userID)
|
||||
} else if channelID := msg.GetBroadcast().ChannelId; channelID != "" {
|
||||
} else if channelID := msg.GetBroadcast().ChannelId; channelID != "" && *h.platform.Config().ServiceSettings.EnableWebHubChannelIteration {
|
||||
targetConns = connIndex.ForChannel(channelID)
|
||||
}
|
||||
if targetConns != nil {
|
||||
@ -653,6 +659,10 @@ func closeAndRemoveConn(connIndex *hubConnectionIndex, conn *WebConn) {
|
||||
connIndex.Remove(conn)
|
||||
}
|
||||
|
||||
type connMetadata struct {
|
||||
channelIDs []string
|
||||
}
|
||||
|
||||
// hubConnectionIndex provides fast addition, removal, and iteration of web connections.
|
||||
// It requires 4 functionalities which need to be very fast:
|
||||
// - check if a connection exists or not.
|
||||
@ -660,90 +670,95 @@ func closeAndRemoveConn(connIndex *hubConnectionIndex, conn *WebConn) {
|
||||
// - get all connections for a given channelID.
|
||||
// - get all connections.
|
||||
type hubConnectionIndex struct {
|
||||
// byUserId stores the list of connections for a given userID
|
||||
byUserId map[string][]*WebConn
|
||||
// byChannelID stores the list of connections for a given channelID.
|
||||
byChannelID map[string][]*WebConn
|
||||
// byConnection serves the dual purpose of storing the index of the webconn
|
||||
// in the value of byUserId map, and also to get all connections.
|
||||
byConnection map[*WebConn]int
|
||||
// byUserId stores the set of connections for a given userID
|
||||
byUserId map[string]map[*WebConn]struct{}
|
||||
// byChannelID stores the set of connections for a given channelID
|
||||
byChannelID map[string]map[*WebConn]struct{}
|
||||
// byConnection serves the dual purpose of storing the channelIDs
|
||||
// and also to get all connections
|
||||
byConnection map[*WebConn]connMetadata
|
||||
byConnectionId map[string]*WebConn
|
||||
// staleThreshold is the limit beyond which inactive connections
|
||||
// will be deleted.
|
||||
staleThreshold time.Duration
|
||||
|
||||
store store.Store
|
||||
logger mlog.LoggerIFace
|
||||
fastIteration bool
|
||||
store store.Store
|
||||
logger mlog.LoggerIFace
|
||||
}
|
||||
|
||||
func newHubConnectionIndex(interval time.Duration,
|
||||
store store.Store,
|
||||
logger mlog.LoggerIFace,
|
||||
fastIteration bool,
|
||||
) *hubConnectionIndex {
|
||||
return &hubConnectionIndex{
|
||||
byUserId: make(map[string][]*WebConn),
|
||||
byChannelID: make(map[string][]*WebConn),
|
||||
byConnection: make(map[*WebConn]int),
|
||||
byUserId: make(map[string]map[*WebConn]struct{}),
|
||||
byChannelID: make(map[string]map[*WebConn]struct{}),
|
||||
byConnection: make(map[*WebConn]connMetadata),
|
||||
byConnectionId: make(map[string]*WebConn),
|
||||
staleThreshold: interval,
|
||||
store: store,
|
||||
logger: logger,
|
||||
fastIteration: fastIteration,
|
||||
}
|
||||
}
|
||||
|
||||
func (i *hubConnectionIndex) Add(wc *WebConn) error {
|
||||
cm, err := i.store.Channel().GetAllChannelMembersForUser(request.EmptyContext(i.logger), wc.UserId, false, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getChannelMembersForUser: %v", err)
|
||||
}
|
||||
for chID := range cm {
|
||||
i.byChannelID[chID] = append(i.byChannelID[chID], wc)
|
||||
var channelIDs []string
|
||||
if i.fastIteration {
|
||||
cm, err := i.store.Channel().GetAllChannelMembersForUser(request.EmptyContext(i.logger), wc.UserId, false, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getChannelMembersForUser: %v", err)
|
||||
}
|
||||
|
||||
// Store channel IDs and add to byChannelID
|
||||
channelIDs = make([]string, 0, len(cm))
|
||||
for chID := range cm {
|
||||
channelIDs = append(channelIDs, chID)
|
||||
|
||||
// Initialize the channel's map if it doesn't exist
|
||||
if _, ok := i.byChannelID[chID]; !ok {
|
||||
i.byChannelID[chID] = make(map[*WebConn]struct{})
|
||||
}
|
||||
i.byChannelID[chID][wc] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
i.byUserId[wc.UserId] = append(i.byUserId[wc.UserId], wc)
|
||||
i.byConnection[wc] = len(i.byUserId[wc.UserId]) - 1
|
||||
// Initialize the user's map if it doesn't exist
|
||||
if _, ok := i.byUserId[wc.UserId]; !ok {
|
||||
i.byUserId[wc.UserId] = make(map[*WebConn]struct{})
|
||||
}
|
||||
i.byUserId[wc.UserId][wc] = struct{}{}
|
||||
i.byConnection[wc] = connMetadata{
|
||||
channelIDs: channelIDs,
|
||||
}
|
||||
i.byConnectionId[wc.GetConnectionID()] = wc
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *hubConnectionIndex) Remove(wc *WebConn) {
|
||||
userConnIndex, ok := i.byConnection[wc]
|
||||
connMeta, ok := i.byConnection[wc]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// Remove the wc from i.byUserId
|
||||
// get the conn slice.
|
||||
userConnections := i.byUserId[wc.UserId]
|
||||
// get the last connection.
|
||||
last := userConnections[len(userConnections)-1]
|
||||
// https://go.dev/wiki/SliceTricks#delete-without-preserving-order
|
||||
userConnections[userConnIndex] = last
|
||||
userConnections[len(userConnections)-1] = nil
|
||||
i.byUserId[wc.UserId] = userConnections[:len(userConnections)-1]
|
||||
// set the index of the connection that was moved to the new index.
|
||||
i.byConnection[last] = userConnIndex
|
||||
// Remove from byUserId
|
||||
if userConns, ok := i.byUserId[wc.UserId]; ok {
|
||||
delete(userConns, wc)
|
||||
}
|
||||
|
||||
connectionID := wc.GetConnectionID()
|
||||
// Remove webconns from i.byChannelID
|
||||
// This has O(n) complexity. We are trading off speed while removing
|
||||
// a connection, to improve broadcasting a message.
|
||||
for chID, webConns := range i.byChannelID {
|
||||
// https://go.dev/wiki/SliceTricks#filtering-without-allocating
|
||||
filtered := webConns[:0]
|
||||
for _, conn := range webConns {
|
||||
if conn.GetConnectionID() != connectionID {
|
||||
filtered = append(filtered, conn)
|
||||
if i.fastIteration {
|
||||
// Remove from byChannelID for each channel
|
||||
for _, chID := range connMeta.channelIDs {
|
||||
if channelConns, ok := i.byChannelID[chID]; ok {
|
||||
delete(channelConns, wc)
|
||||
}
|
||||
}
|
||||
for i := len(filtered); i < len(webConns); i++ {
|
||||
webConns[i] = nil
|
||||
}
|
||||
i.byChannelID[chID] = filtered
|
||||
}
|
||||
|
||||
delete(i.byConnection, wc)
|
||||
delete(i.byConnectionId, connectionID)
|
||||
delete(i.byConnectionId, wc.GetConnectionID())
|
||||
}
|
||||
|
||||
func (i *hubConnectionIndex) InvalidateCMCacheForUser(userID string) error {
|
||||
@ -753,25 +768,40 @@ func (i *hubConnectionIndex) InvalidateCMCacheForUser(userID string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Clear out all user entries which belong to channels.
|
||||
for chID, webConns := range i.byChannelID {
|
||||
// https://go.dev/wiki/SliceTricks#filtering-without-allocating
|
||||
filtered := webConns[:0]
|
||||
for _, conn := range webConns {
|
||||
if conn.UserId != userID {
|
||||
filtered = append(filtered, conn)
|
||||
// Get all connections for this user
|
||||
conns := i.ForUser(userID)
|
||||
|
||||
// Remove all user connections from existing channels
|
||||
for _, conn := range conns {
|
||||
if meta, ok := i.byConnection[conn]; ok {
|
||||
// Remove from old channels
|
||||
for _, chID := range meta.channelIDs {
|
||||
if channelConns, ok := i.byChannelID[chID]; ok {
|
||||
delete(channelConns, conn)
|
||||
}
|
||||
}
|
||||
}
|
||||
for i := len(filtered); i < len(webConns); i++ {
|
||||
webConns[i] = nil
|
||||
}
|
||||
i.byChannelID[chID] = filtered
|
||||
}
|
||||
|
||||
// re-populate the cache
|
||||
for chID := range cm {
|
||||
i.byChannelID[chID] = append(i.byChannelID[chID], i.ForUser(userID)...)
|
||||
// Add connections to new channels
|
||||
for _, conn := range conns {
|
||||
newChannelIDs := make([]string, 0, len(cm))
|
||||
for chID := range cm {
|
||||
newChannelIDs = append(newChannelIDs, chID)
|
||||
// Initialize channel map if needed
|
||||
if _, ok := i.byChannelID[chID]; !ok {
|
||||
i.byChannelID[chID] = make(map[*WebConn]struct{})
|
||||
}
|
||||
i.byChannelID[chID][conn] = struct{}{}
|
||||
}
|
||||
|
||||
// Update connection metadata
|
||||
if meta, ok := i.byConnection[conn]; ok {
|
||||
meta.channelIDs = newChannelIDs
|
||||
i.byConnection[conn] = meta
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -782,26 +812,31 @@ func (i *hubConnectionIndex) Has(wc *WebConn) bool {
|
||||
|
||||
// ForUser returns all connections for a user ID.
|
||||
func (i *hubConnectionIndex) ForUser(id string) []*WebConn {
|
||||
// Fast path if there is only one or fewer connection.
|
||||
if len(i.byUserId[id]) <= 1 {
|
||||
return i.byUserId[id]
|
||||
userConns, ok := i.byUserId[id]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Move to using maps.Keys to use the iterator pattern with 1.23.
|
||||
// This saves the additional slice copy.
|
||||
conns := make([]*WebConn, 0, len(userConns))
|
||||
for conn := range userConns {
|
||||
conns = append(conns, conn)
|
||||
}
|
||||
// If there are multiple connections per user,
|
||||
// then we have to return a clone of the slice
|
||||
// to allow connIndex.Remove to be safely called while
|
||||
// iterating the slice.
|
||||
conns := make([]*WebConn, len(i.byUserId[id]))
|
||||
copy(conns, i.byUserId[id])
|
||||
return conns
|
||||
}
|
||||
|
||||
// ForChannel returns all connections for a channelID.
|
||||
func (i *hubConnectionIndex) ForChannel(channelID string) []*WebConn {
|
||||
// Note: this is expensive because usually there will be
|
||||
// more than 1 member for a channel, and broadcasting
|
||||
// is a hot path, but worth it.
|
||||
conns := make([]*WebConn, len(i.byChannelID[channelID]))
|
||||
copy(conns, i.byChannelID[channelID])
|
||||
channelConns, ok := i.byChannelID[channelID]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
conns := make([]*WebConn, 0, len(channelConns))
|
||||
for conn := range channelConns {
|
||||
conns = append(conns, conn)
|
||||
}
|
||||
return conns
|
||||
}
|
||||
|
||||
@ -822,7 +857,7 @@ func (i *hubConnectionIndex) ForConnection(id string) *WebConn {
|
||||
}
|
||||
|
||||
// All returns the full webConn index.
|
||||
func (i *hubConnectionIndex) All() map[*WebConn]int {
|
||||
func (i *hubConnectionIndex) All() map[*WebConn]connMetadata {
|
||||
return i.byConnection
|
||||
}
|
||||
|
||||
|
@ -6,6 +6,7 @@ package platform
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
@ -197,154 +198,158 @@ func TestHubConnIndex(t *testing.T) {
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Run("Basic", func(t *testing.T) {
|
||||
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger)
|
||||
for _, fastIterate := range []bool{true, false} {
|
||||
t.Run(fmt.Sprintf("fastIterate=%t", fastIterate), func(t *testing.T) {
|
||||
t.Run("Basic", func(t *testing.T) {
|
||||
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, fastIterate)
|
||||
|
||||
// User1
|
||||
wc1 := &WebConn{
|
||||
Platform: th.Service,
|
||||
Suite: th.Suite,
|
||||
UserId: model.NewId(),
|
||||
}
|
||||
wc1.SetConnectionID(model.NewId())
|
||||
wc1.SetSession(&model.Session{})
|
||||
// User1
|
||||
wc1 := &WebConn{
|
||||
Platform: th.Service,
|
||||
Suite: th.Suite,
|
||||
UserId: model.NewId(),
|
||||
}
|
||||
wc1.SetConnectionID(model.NewId())
|
||||
wc1.SetSession(&model.Session{})
|
||||
|
||||
// User2
|
||||
wc2 := &WebConn{
|
||||
Platform: th.Service,
|
||||
Suite: th.Suite,
|
||||
UserId: model.NewId(),
|
||||
}
|
||||
wc2.SetConnectionID(model.NewId())
|
||||
wc2.SetSession(&model.Session{})
|
||||
// User2
|
||||
wc2 := &WebConn{
|
||||
Platform: th.Service,
|
||||
Suite: th.Suite,
|
||||
UserId: model.NewId(),
|
||||
}
|
||||
wc2.SetConnectionID(model.NewId())
|
||||
wc2.SetSession(&model.Session{})
|
||||
|
||||
wc3 := &WebConn{
|
||||
Platform: th.Service,
|
||||
Suite: th.Suite,
|
||||
UserId: wc2.UserId,
|
||||
}
|
||||
wc3.SetConnectionID(model.NewId())
|
||||
wc3.SetSession(&model.Session{})
|
||||
wc3 := &WebConn{
|
||||
Platform: th.Service,
|
||||
Suite: th.Suite,
|
||||
UserId: wc2.UserId,
|
||||
}
|
||||
wc3.SetConnectionID(model.NewId())
|
||||
wc3.SetSession(&model.Session{})
|
||||
|
||||
wc4 := &WebConn{
|
||||
Platform: th.Service,
|
||||
Suite: th.Suite,
|
||||
UserId: wc2.UserId,
|
||||
}
|
||||
wc4.SetConnectionID(model.NewId())
|
||||
wc4.SetSession(&model.Session{})
|
||||
wc4 := &WebConn{
|
||||
Platform: th.Service,
|
||||
Suite: th.Suite,
|
||||
UserId: wc2.UserId,
|
||||
}
|
||||
wc4.SetConnectionID(model.NewId())
|
||||
wc4.SetSession(&model.Session{})
|
||||
|
||||
connIndex.Add(wc1)
|
||||
connIndex.Add(wc2)
|
||||
connIndex.Add(wc3)
|
||||
connIndex.Add(wc4)
|
||||
connIndex.Add(wc1)
|
||||
connIndex.Add(wc2)
|
||||
connIndex.Add(wc3)
|
||||
connIndex.Add(wc4)
|
||||
|
||||
t.Run("Basic", func(t *testing.T) {
|
||||
assert.True(t, connIndex.Has(wc1))
|
||||
assert.True(t, connIndex.Has(wc2))
|
||||
t.Run("Basic", func(t *testing.T) {
|
||||
assert.True(t, connIndex.Has(wc1))
|
||||
assert.True(t, connIndex.Has(wc2))
|
||||
|
||||
assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2, wc3, wc4})
|
||||
assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{wc1})
|
||||
assert.True(t, connIndex.Has(wc2))
|
||||
assert.True(t, connIndex.Has(wc1))
|
||||
assert.Len(t, connIndex.All(), 4)
|
||||
assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2, wc3, wc4})
|
||||
assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{wc1})
|
||||
assert.True(t, connIndex.Has(wc2))
|
||||
assert.True(t, connIndex.Has(wc1))
|
||||
assert.Len(t, connIndex.All(), 4)
|
||||
})
|
||||
|
||||
t.Run("RemoveMiddleUser2", func(t *testing.T) {
|
||||
connIndex.Remove(wc3) // Remove from middle from user2
|
||||
|
||||
assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2, wc4})
|
||||
assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{wc1})
|
||||
assert.True(t, connIndex.Has(wc2))
|
||||
assert.False(t, connIndex.Has(wc3))
|
||||
assert.True(t, connIndex.Has(wc4))
|
||||
assert.Len(t, connIndex.All(), 3)
|
||||
})
|
||||
|
||||
t.Run("RemoveUser1", func(t *testing.T) {
|
||||
connIndex.Remove(wc1) // Remove sole connection from user1
|
||||
|
||||
assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2, wc4})
|
||||
assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{})
|
||||
assert.Len(t, connIndex.ForUser(wc1.UserId), 0)
|
||||
assert.Len(t, connIndex.All(), 2)
|
||||
assert.False(t, connIndex.Has(wc1))
|
||||
assert.True(t, connIndex.Has(wc2))
|
||||
})
|
||||
|
||||
t.Run("RemoveEndUser2", func(t *testing.T) {
|
||||
connIndex.Remove(wc4) // Remove from end from user2
|
||||
|
||||
assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2})
|
||||
assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{})
|
||||
assert.True(t, connIndex.Has(wc2))
|
||||
assert.False(t, connIndex.Has(wc3))
|
||||
assert.False(t, connIndex.Has(wc4))
|
||||
assert.Len(t, connIndex.All(), 1)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("ByConnectionId", func(t *testing.T) {
|
||||
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, fastIterate)
|
||||
|
||||
// User1
|
||||
wc1ID := model.NewId()
|
||||
wc1 := &WebConn{
|
||||
Platform: th.Service,
|
||||
Suite: th.Suite,
|
||||
UserId: th.BasicUser.Id,
|
||||
}
|
||||
wc1.SetConnectionID(wc1ID)
|
||||
wc1.SetSession(&model.Session{})
|
||||
|
||||
// User2
|
||||
wc2ID := model.NewId()
|
||||
wc2 := &WebConn{
|
||||
Platform: th.Service,
|
||||
Suite: th.Suite,
|
||||
UserId: th.BasicUser2.Id,
|
||||
}
|
||||
wc2.SetConnectionID(wc2ID)
|
||||
wc2.SetSession(&model.Session{})
|
||||
|
||||
wc3ID := model.NewId()
|
||||
wc3 := &WebConn{
|
||||
Platform: th.Service,
|
||||
Suite: th.Suite,
|
||||
UserId: wc2.UserId,
|
||||
}
|
||||
wc3.SetConnectionID(wc3ID)
|
||||
wc3.SetSession(&model.Session{})
|
||||
|
||||
t.Run("no connections", func(t *testing.T) {
|
||||
assert.False(t, connIndex.Has(wc1))
|
||||
assert.False(t, connIndex.Has(wc2))
|
||||
assert.False(t, connIndex.Has(wc3))
|
||||
assert.Empty(t, connIndex.byConnectionId)
|
||||
})
|
||||
|
||||
t.Run("adding", func(t *testing.T) {
|
||||
connIndex.Add(wc1)
|
||||
connIndex.Add(wc3)
|
||||
|
||||
assert.Len(t, connIndex.byConnectionId, 2)
|
||||
assert.Equal(t, wc1, connIndex.ForConnection(wc1ID))
|
||||
assert.Equal(t, wc3, connIndex.ForConnection(wc3ID))
|
||||
assert.Equal(t, (*WebConn)(nil), connIndex.ForConnection(wc2ID))
|
||||
})
|
||||
|
||||
t.Run("removing", func(t *testing.T) {
|
||||
connIndex.Remove(wc3)
|
||||
|
||||
assert.Len(t, connIndex.byConnectionId, 1)
|
||||
assert.Equal(t, wc1, connIndex.ForConnection(wc1ID))
|
||||
assert.Equal(t, (*WebConn)(nil), connIndex.ForConnection(wc3ID))
|
||||
assert.Equal(t, (*WebConn)(nil), connIndex.ForConnection(wc2ID))
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("RemoveMiddleUser2", func(t *testing.T) {
|
||||
connIndex.Remove(wc3) // Remove from middle from user2
|
||||
|
||||
assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2, wc4})
|
||||
assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{wc1})
|
||||
assert.True(t, connIndex.Has(wc2))
|
||||
assert.False(t, connIndex.Has(wc3))
|
||||
assert.True(t, connIndex.Has(wc4))
|
||||
assert.Len(t, connIndex.All(), 3)
|
||||
})
|
||||
|
||||
t.Run("RemoveUser1", func(t *testing.T) {
|
||||
connIndex.Remove(wc1) // Remove sole connection from user1
|
||||
|
||||
assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2, wc4})
|
||||
assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{})
|
||||
assert.Len(t, connIndex.ForUser(wc1.UserId), 0)
|
||||
assert.Len(t, connIndex.All(), 2)
|
||||
assert.False(t, connIndex.Has(wc1))
|
||||
assert.True(t, connIndex.Has(wc2))
|
||||
})
|
||||
|
||||
t.Run("RemoveEndUser2", func(t *testing.T) {
|
||||
connIndex.Remove(wc4) // Remove from end from user2
|
||||
|
||||
assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2})
|
||||
assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{})
|
||||
assert.True(t, connIndex.Has(wc2))
|
||||
assert.False(t, connIndex.Has(wc3))
|
||||
assert.False(t, connIndex.Has(wc4))
|
||||
assert.Len(t, connIndex.All(), 1)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("ByConnectionId", func(t *testing.T) {
|
||||
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger)
|
||||
|
||||
// User1
|
||||
wc1ID := model.NewId()
|
||||
wc1 := &WebConn{
|
||||
Platform: th.Service,
|
||||
Suite: th.Suite,
|
||||
UserId: th.BasicUser.Id,
|
||||
}
|
||||
wc1.SetConnectionID(wc1ID)
|
||||
wc1.SetSession(&model.Session{})
|
||||
|
||||
// User2
|
||||
wc2ID := model.NewId()
|
||||
wc2 := &WebConn{
|
||||
Platform: th.Service,
|
||||
Suite: th.Suite,
|
||||
UserId: th.BasicUser2.Id,
|
||||
}
|
||||
wc2.SetConnectionID(wc2ID)
|
||||
wc2.SetSession(&model.Session{})
|
||||
|
||||
wc3ID := model.NewId()
|
||||
wc3 := &WebConn{
|
||||
Platform: th.Service,
|
||||
Suite: th.Suite,
|
||||
UserId: wc2.UserId,
|
||||
}
|
||||
wc3.SetConnectionID(wc3ID)
|
||||
wc3.SetSession(&model.Session{})
|
||||
|
||||
t.Run("no connections", func(t *testing.T) {
|
||||
assert.False(t, connIndex.Has(wc1))
|
||||
assert.False(t, connIndex.Has(wc2))
|
||||
assert.False(t, connIndex.Has(wc3))
|
||||
assert.Empty(t, connIndex.byConnectionId)
|
||||
})
|
||||
|
||||
t.Run("adding", func(t *testing.T) {
|
||||
connIndex.Add(wc1)
|
||||
connIndex.Add(wc3)
|
||||
|
||||
assert.Len(t, connIndex.byConnectionId, 2)
|
||||
assert.Equal(t, wc1, connIndex.ForConnection(wc1ID))
|
||||
assert.Equal(t, wc3, connIndex.ForConnection(wc3ID))
|
||||
assert.Equal(t, (*WebConn)(nil), connIndex.ForConnection(wc2ID))
|
||||
})
|
||||
|
||||
t.Run("removing", func(t *testing.T) {
|
||||
connIndex.Remove(wc3)
|
||||
|
||||
assert.Len(t, connIndex.byConnectionId, 1)
|
||||
assert.Equal(t, wc1, connIndex.ForConnection(wc1ID))
|
||||
assert.Equal(t, (*WebConn)(nil), connIndex.ForConnection(wc3ID))
|
||||
assert.Equal(t, (*WebConn)(nil), connIndex.ForConnection(wc2ID))
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("ByChannelId", func(t *testing.T) {
|
||||
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger)
|
||||
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, true)
|
||||
|
||||
// User1
|
||||
wc1ID := model.NewId()
|
||||
@ -414,7 +419,7 @@ func TestHubConnIndexIncorrectRemoval(t *testing.T) {
|
||||
th := Setup(t)
|
||||
defer th.TearDown()
|
||||
|
||||
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger)
|
||||
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, false)
|
||||
|
||||
// User2
|
||||
wc2 := &WebConn{
|
||||
@ -461,7 +466,7 @@ func TestHubConnIndexInactive(t *testing.T) {
|
||||
th := Setup(t)
|
||||
defer th.TearDown()
|
||||
|
||||
connIndex := newHubConnectionIndex(2*time.Second, th.Service.Store, th.Service.logger)
|
||||
connIndex := newHubConnectionIndex(2*time.Second, th.Service.Store, th.Service.logger, false)
|
||||
|
||||
// User1
|
||||
wc1 := &WebConn{
|
||||
@ -621,7 +626,7 @@ func TestHubWebConnCount(t *testing.T) {
|
||||
func BenchmarkHubConnIndex(b *testing.B) {
|
||||
th := Setup(b).InitBasic()
|
||||
defer th.TearDown()
|
||||
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger)
|
||||
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, false)
|
||||
|
||||
// User1
|
||||
wc1 := &WebConn{
|
||||
@ -666,7 +671,7 @@ func TestHubConnIndexRemoveMemLeak(t *testing.T) {
|
||||
th := Setup(t)
|
||||
defer th.TearDown()
|
||||
|
||||
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger)
|
||||
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, false)
|
||||
|
||||
wc := &WebConn{
|
||||
Platform: th.Service,
|
||||
|
@ -420,6 +420,7 @@ type ServiceSettings struct {
|
||||
MaximumPayloadSizeBytes *int64 `access:"environment_file_storage,write_restrictable,cloud_restrictable"`
|
||||
MaximumURLLength *int `access:"environment_file_storage,write_restrictable,cloud_restrictable"`
|
||||
ScheduledPosts *bool `access:"site_posts"`
|
||||
EnableWebHubChannelIteration *bool `access:"write_restrictable,cloud_restrictable"` // telemetry: none
|
||||
}
|
||||
|
||||
var MattermostGiphySdkKey string
|
||||
@ -947,6 +948,10 @@ func (s *ServiceSettings) SetDefaults(isUpdate bool) {
|
||||
if s.ScheduledPosts == nil {
|
||||
s.ScheduledPosts = NewPointer(true)
|
||||
}
|
||||
|
||||
if s.EnableWebHubChannelIteration == nil {
|
||||
s.EnableWebHubChannelIteration = NewPointer(false)
|
||||
}
|
||||
}
|
||||
|
||||
type CacheSettings struct {
|
||||
|
Loading…
Reference in New Issue
Block a user