MM-62960: Improve webConn remove performance from hubConnectionIndex (#30178) (#30212)

Automatic Merge
This commit is contained in:
Mattermost Build 2025-02-14 09:50:09 +02:00 committed by GitHub
parent 6bdb5504eb
commit e7fda95292
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 304 additions and 225 deletions

View File

@ -3437,7 +3437,9 @@ func TestWebHubMembership(t *testing.T) {
} }
func TestWebHubCloseConnOnDBFail(t *testing.T) { func TestWebHubCloseConnOnDBFail(t *testing.T) {
th := Setup(t).InitBasic() th := SetupConfig(t, func(cfg *model.Config) {
*cfg.ServiceSettings.EnableWebHubChannelIteration = true
}).InitBasic()
defer func() { defer func() {
th.TearDown() th.TearDown()
_, err := th.Server.Store().GetInternalMasterDB().Exec(`ALTER TABLE dummy RENAME to ChannelMembers`) _, err := th.Server.Store().GetInternalMasterDB().Exec(`ALTER TABLE dummy RENAME to ChannelMembers`)

View File

@ -27,6 +27,7 @@ import (
"github.com/mattermost/mattermost/server/public/shared/i18n" "github.com/mattermost/mattermost/server/public/shared/i18n"
"github.com/mattermost/mattermost/server/public/shared/mlog" "github.com/mattermost/mattermost/server/public/shared/mlog"
"github.com/mattermost/mattermost/server/public/shared/request" "github.com/mattermost/mattermost/server/public/shared/request"
"github.com/mattermost/mattermost/server/v8/channels/store/sqlstore"
) )
const ( const (
@ -95,8 +96,10 @@ type WebConn struct {
UserId string UserId string
PostedAck bool PostedAck bool
lastUserActivityAt int64 allChannelMembers map[string]string
send chan model.WebSocketMessage lastAllChannelMembersTime int64
lastUserActivityAt int64
send chan model.WebSocketMessage
// deadQueue behaves like a queue of a finite size // deadQueue behaves like a queue of a finite size
// which is used to store all messages that are sent via the websocket. // which is used to store all messages that are sent via the websocket.
// It basically acts as the user-space socket buffer, and is used // It basically acts as the user-space socket buffer, and is used
@ -743,6 +746,8 @@ func (wc *WebConn) drainDeadQueue(index int) error {
// InvalidateCache resets all internal data of the WebConn. // InvalidateCache resets all internal data of the WebConn.
func (wc *WebConn) InvalidateCache() { func (wc *WebConn) InvalidateCache() {
wc.allChannelMembers = nil
wc.lastAllChannelMembersTime = 0
wc.SetSession(nil) wc.SetSession(nil)
wc.SetSessionExpiresAt(0) wc.SetSessionExpiresAt(0)
} }
@ -923,9 +928,36 @@ func (wc *WebConn) ShouldSendEvent(msg *model.WebSocketEvent) bool {
return false return false
} }
// We don't need to do any further checks because this is already scoped if *wc.Platform.Config().ServiceSettings.EnableWebHubChannelIteration {
// to channel members from web_hub. // We don't need to do any further checks because this is already scoped
return true // to channel members from web_hub.
return true
}
if model.GetMillis()-wc.lastAllChannelMembersTime > webConnMemberCacheTime {
wc.allChannelMembers = nil
wc.lastAllChannelMembersTime = 0
}
if wc.allChannelMembers == nil {
result, err := wc.Platform.Store.Channel().GetAllChannelMembersForUser(
sqlstore.RequestContextWithMaster(request.EmptyContext(wc.Platform.logger)),
wc.UserId,
false,
false,
)
if err != nil {
mlog.Error("webhub.shouldSendEvent.", mlog.Err(err))
return false
}
wc.allChannelMembers = result
wc.lastAllChannelMembersTime = model.GetMillis()
}
if _, ok := wc.allChannelMembers[chID]; ok {
return true
}
return false
} }
// Only report events to users who are in the team for the event // Only report events to users who are in the team for the event

View File

@ -409,6 +409,7 @@ func (h *Hub) Start() {
connIndex := newHubConnectionIndex(inactiveConnReaperInterval, connIndex := newHubConnectionIndex(inactiveConnReaperInterval,
h.platform.Store, h.platform.Store,
h.platform.logger, h.platform.logger,
*h.platform.Config().ServiceSettings.EnableWebHubChannelIteration,
) )
for { for {
@ -522,6 +523,11 @@ func (h *Hub) Start() {
for _, webConn := range connIndex.ForUser(userID) { for _, webConn := range connIndex.ForUser(userID) {
webConn.InvalidateCache() webConn.InvalidateCache()
} }
if !*h.platform.Config().ServiceSettings.EnableWebHubChannelIteration {
continue
}
err := connIndex.InvalidateCMCacheForUser(userID) err := connIndex.InvalidateCMCacheForUser(userID)
if err != nil { if err != nil {
h.platform.Log().Error("Error while invalidating channel member cache", mlog.String("user_id", userID), mlog.Err(err)) h.platform.Log().Error("Error while invalidating channel member cache", mlog.String("user_id", userID), mlog.Err(err))
@ -589,7 +595,7 @@ func (h *Hub) Start() {
} }
} else if userID := msg.GetBroadcast().UserId; userID != "" { } else if userID := msg.GetBroadcast().UserId; userID != "" {
targetConns = connIndex.ForUser(userID) targetConns = connIndex.ForUser(userID)
} else if channelID := msg.GetBroadcast().ChannelId; channelID != "" { } else if channelID := msg.GetBroadcast().ChannelId; channelID != "" && *h.platform.Config().ServiceSettings.EnableWebHubChannelIteration {
targetConns = connIndex.ForChannel(channelID) targetConns = connIndex.ForChannel(channelID)
} }
if targetConns != nil { if targetConns != nil {
@ -656,6 +662,10 @@ func closeAndRemoveConn(connIndex *hubConnectionIndex, conn *WebConn) {
connIndex.Remove(conn) connIndex.Remove(conn)
} }
type connMetadata struct {
channelIDs []string
}
// hubConnectionIndex provides fast addition, removal, and iteration of web connections. // hubConnectionIndex provides fast addition, removal, and iteration of web connections.
// It requires 4 functionalities which need to be very fast: // It requires 4 functionalities which need to be very fast:
// - check if a connection exists or not. // - check if a connection exists or not.
@ -663,90 +673,95 @@ func closeAndRemoveConn(connIndex *hubConnectionIndex, conn *WebConn) {
// - get all connections for a given channelID. // - get all connections for a given channelID.
// - get all connections. // - get all connections.
type hubConnectionIndex struct { type hubConnectionIndex struct {
// byUserId stores the list of connections for a given userID // byUserId stores the set of connections for a given userID
byUserId map[string][]*WebConn byUserId map[string]map[*WebConn]struct{}
// byChannelID stores the list of connections for a given channelID. // byChannelID stores the set of connections for a given channelID
byChannelID map[string][]*WebConn byChannelID map[string]map[*WebConn]struct{}
// byConnection serves the dual purpose of storing the index of the webconn // byConnection serves the dual purpose of storing the channelIDs
// in the value of byUserId map, and also to get all connections. // and also to get all connections
byConnection map[*WebConn]int byConnection map[*WebConn]connMetadata
byConnectionId map[string]*WebConn byConnectionId map[string]*WebConn
// staleThreshold is the limit beyond which inactive connections // staleThreshold is the limit beyond which inactive connections
// will be deleted. // will be deleted.
staleThreshold time.Duration staleThreshold time.Duration
store store.Store fastIteration bool
logger mlog.LoggerIFace store store.Store
logger mlog.LoggerIFace
} }
func newHubConnectionIndex(interval time.Duration, func newHubConnectionIndex(interval time.Duration,
store store.Store, store store.Store,
logger mlog.LoggerIFace, logger mlog.LoggerIFace,
fastIteration bool,
) *hubConnectionIndex { ) *hubConnectionIndex {
return &hubConnectionIndex{ return &hubConnectionIndex{
byUserId: make(map[string][]*WebConn), byUserId: make(map[string]map[*WebConn]struct{}),
byChannelID: make(map[string][]*WebConn), byChannelID: make(map[string]map[*WebConn]struct{}),
byConnection: make(map[*WebConn]int), byConnection: make(map[*WebConn]connMetadata),
byConnectionId: make(map[string]*WebConn), byConnectionId: make(map[string]*WebConn),
staleThreshold: interval, staleThreshold: interval,
store: store, store: store,
logger: logger, logger: logger,
fastIteration: fastIteration,
} }
} }
func (i *hubConnectionIndex) Add(wc *WebConn) error { func (i *hubConnectionIndex) Add(wc *WebConn) error {
cm, err := i.store.Channel().GetAllChannelMembersForUser(request.EmptyContext(i.logger), wc.UserId, false, false) var channelIDs []string
if err != nil { if i.fastIteration {
return fmt.Errorf("error getChannelMembersForUser: %v", err) cm, err := i.store.Channel().GetAllChannelMembersForUser(request.EmptyContext(i.logger), wc.UserId, false, false)
} if err != nil {
for chID := range cm { return fmt.Errorf("error getChannelMembersForUser: %v", err)
i.byChannelID[chID] = append(i.byChannelID[chID], wc) }
// Store channel IDs and add to byChannelID
channelIDs = make([]string, 0, len(cm))
for chID := range cm {
channelIDs = append(channelIDs, chID)
// Initialize the channel's map if it doesn't exist
if _, ok := i.byChannelID[chID]; !ok {
i.byChannelID[chID] = make(map[*WebConn]struct{})
}
i.byChannelID[chID][wc] = struct{}{}
}
} }
i.byUserId[wc.UserId] = append(i.byUserId[wc.UserId], wc) // Initialize the user's map if it doesn't exist
i.byConnection[wc] = len(i.byUserId[wc.UserId]) - 1 if _, ok := i.byUserId[wc.UserId]; !ok {
i.byUserId[wc.UserId] = make(map[*WebConn]struct{})
}
i.byUserId[wc.UserId][wc] = struct{}{}
i.byConnection[wc] = connMetadata{
channelIDs: channelIDs,
}
i.byConnectionId[wc.GetConnectionID()] = wc i.byConnectionId[wc.GetConnectionID()] = wc
return nil return nil
} }
func (i *hubConnectionIndex) Remove(wc *WebConn) { func (i *hubConnectionIndex) Remove(wc *WebConn) {
userConnIndex, ok := i.byConnection[wc] connMeta, ok := i.byConnection[wc]
if !ok { if !ok {
return return
} }
// Remove the wc from i.byUserId // Remove from byUserId
// get the conn slice. if userConns, ok := i.byUserId[wc.UserId]; ok {
userConnections := i.byUserId[wc.UserId] delete(userConns, wc)
// get the last connection. }
last := userConnections[len(userConnections)-1]
// https://go.dev/wiki/SliceTricks#delete-without-preserving-order
userConnections[userConnIndex] = last
userConnections[len(userConnections)-1] = nil
i.byUserId[wc.UserId] = userConnections[:len(userConnections)-1]
// set the index of the connection that was moved to the new index.
i.byConnection[last] = userConnIndex
connectionID := wc.GetConnectionID() if i.fastIteration {
// Remove webconns from i.byChannelID // Remove from byChannelID for each channel
// This has O(n) complexity. We are trading off speed while removing for _, chID := range connMeta.channelIDs {
// a connection, to improve broadcasting a message. if channelConns, ok := i.byChannelID[chID]; ok {
for chID, webConns := range i.byChannelID { delete(channelConns, wc)
// https://go.dev/wiki/SliceTricks#filtering-without-allocating
filtered := webConns[:0]
for _, conn := range webConns {
if conn.GetConnectionID() != connectionID {
filtered = append(filtered, conn)
} }
} }
for i := len(filtered); i < len(webConns); i++ {
webConns[i] = nil
}
i.byChannelID[chID] = filtered
} }
delete(i.byConnection, wc) delete(i.byConnection, wc)
delete(i.byConnectionId, connectionID) delete(i.byConnectionId, wc.GetConnectionID())
} }
func (i *hubConnectionIndex) InvalidateCMCacheForUser(userID string) error { func (i *hubConnectionIndex) InvalidateCMCacheForUser(userID string) error {
@ -756,25 +771,40 @@ func (i *hubConnectionIndex) InvalidateCMCacheForUser(userID string) error {
return err return err
} }
// Clear out all user entries which belong to channels. // Get all connections for this user
for chID, webConns := range i.byChannelID { conns := i.ForUser(userID)
// https://go.dev/wiki/SliceTricks#filtering-without-allocating
filtered := webConns[:0] // Remove all user connections from existing channels
for _, conn := range webConns { for _, conn := range conns {
if conn.UserId != userID { if meta, ok := i.byConnection[conn]; ok {
filtered = append(filtered, conn) // Remove from old channels
for _, chID := range meta.channelIDs {
if channelConns, ok := i.byChannelID[chID]; ok {
delete(channelConns, conn)
}
} }
} }
for i := len(filtered); i < len(webConns); i++ {
webConns[i] = nil
}
i.byChannelID[chID] = filtered
} }
// re-populate the cache // Add connections to new channels
for chID := range cm { for _, conn := range conns {
i.byChannelID[chID] = append(i.byChannelID[chID], i.ForUser(userID)...) newChannelIDs := make([]string, 0, len(cm))
for chID := range cm {
newChannelIDs = append(newChannelIDs, chID)
// Initialize channel map if needed
if _, ok := i.byChannelID[chID]; !ok {
i.byChannelID[chID] = make(map[*WebConn]struct{})
}
i.byChannelID[chID][conn] = struct{}{}
}
// Update connection metadata
if meta, ok := i.byConnection[conn]; ok {
meta.channelIDs = newChannelIDs
i.byConnection[conn] = meta
}
} }
return nil return nil
} }
@ -785,26 +815,31 @@ func (i *hubConnectionIndex) Has(wc *WebConn) bool {
// ForUser returns all connections for a user ID. // ForUser returns all connections for a user ID.
func (i *hubConnectionIndex) ForUser(id string) []*WebConn { func (i *hubConnectionIndex) ForUser(id string) []*WebConn {
// Fast path if there is only one or fewer connection. userConns, ok := i.byUserId[id]
if len(i.byUserId[id]) <= 1 { if !ok {
return i.byUserId[id] return nil
}
// Move to using maps.Keys to use the iterator pattern with 1.23.
// This saves the additional slice copy.
conns := make([]*WebConn, 0, len(userConns))
for conn := range userConns {
conns = append(conns, conn)
} }
// If there are multiple connections per user,
// then we have to return a clone of the slice
// to allow connIndex.Remove to be safely called while
// iterating the slice.
conns := make([]*WebConn, len(i.byUserId[id]))
copy(conns, i.byUserId[id])
return conns return conns
} }
// ForChannel returns all connections for a channelID. // ForChannel returns all connections for a channelID.
func (i *hubConnectionIndex) ForChannel(channelID string) []*WebConn { func (i *hubConnectionIndex) ForChannel(channelID string) []*WebConn {
// Note: this is expensive because usually there will be channelConns, ok := i.byChannelID[channelID]
// more than 1 member for a channel, and broadcasting if !ok {
// is a hot path, but worth it. return nil
conns := make([]*WebConn, len(i.byChannelID[channelID])) }
copy(conns, i.byChannelID[channelID])
conns := make([]*WebConn, 0, len(channelConns))
for conn := range channelConns {
conns = append(conns, conn)
}
return conns return conns
} }
@ -825,7 +860,7 @@ func (i *hubConnectionIndex) ForConnection(id string) *WebConn {
} }
// All returns the full webConn index. // All returns the full webConn index.
func (i *hubConnectionIndex) All() map[*WebConn]int { func (i *hubConnectionIndex) All() map[*WebConn]connMetadata {
return i.byConnection return i.byConnection
} }

View File

@ -6,6 +6,7 @@ package platform
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt"
"net" "net"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
@ -197,154 +198,158 @@ func TestHubConnIndex(t *testing.T) {
}) })
require.NoError(t, err) require.NoError(t, err)
t.Run("Basic", func(t *testing.T) { for _, fastIterate := range []bool{true, false} {
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger) t.Run(fmt.Sprintf("fastIterate=%t", fastIterate), func(t *testing.T) {
t.Run("Basic", func(t *testing.T) {
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, fastIterate)
// User1 // User1
wc1 := &WebConn{ wc1 := &WebConn{
Platform: th.Service, Platform: th.Service,
Suite: th.Suite, Suite: th.Suite,
UserId: model.NewId(), UserId: model.NewId(),
} }
wc1.SetConnectionID(model.NewId()) wc1.SetConnectionID(model.NewId())
wc1.SetSession(&model.Session{}) wc1.SetSession(&model.Session{})
// User2 // User2
wc2 := &WebConn{ wc2 := &WebConn{
Platform: th.Service, Platform: th.Service,
Suite: th.Suite, Suite: th.Suite,
UserId: model.NewId(), UserId: model.NewId(),
} }
wc2.SetConnectionID(model.NewId()) wc2.SetConnectionID(model.NewId())
wc2.SetSession(&model.Session{}) wc2.SetSession(&model.Session{})
wc3 := &WebConn{ wc3 := &WebConn{
Platform: th.Service, Platform: th.Service,
Suite: th.Suite, Suite: th.Suite,
UserId: wc2.UserId, UserId: wc2.UserId,
} }
wc3.SetConnectionID(model.NewId()) wc3.SetConnectionID(model.NewId())
wc3.SetSession(&model.Session{}) wc3.SetSession(&model.Session{})
wc4 := &WebConn{ wc4 := &WebConn{
Platform: th.Service, Platform: th.Service,
Suite: th.Suite, Suite: th.Suite,
UserId: wc2.UserId, UserId: wc2.UserId,
} }
wc4.SetConnectionID(model.NewId()) wc4.SetConnectionID(model.NewId())
wc4.SetSession(&model.Session{}) wc4.SetSession(&model.Session{})
connIndex.Add(wc1) connIndex.Add(wc1)
connIndex.Add(wc2) connIndex.Add(wc2)
connIndex.Add(wc3) connIndex.Add(wc3)
connIndex.Add(wc4) connIndex.Add(wc4)
t.Run("Basic", func(t *testing.T) { t.Run("Basic", func(t *testing.T) {
assert.True(t, connIndex.Has(wc1)) assert.True(t, connIndex.Has(wc1))
assert.True(t, connIndex.Has(wc2)) assert.True(t, connIndex.Has(wc2))
assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2, wc3, wc4}) assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2, wc3, wc4})
assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{wc1}) assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{wc1})
assert.True(t, connIndex.Has(wc2)) assert.True(t, connIndex.Has(wc2))
assert.True(t, connIndex.Has(wc1)) assert.True(t, connIndex.Has(wc1))
assert.Len(t, connIndex.All(), 4) assert.Len(t, connIndex.All(), 4)
})
t.Run("RemoveMiddleUser2", func(t *testing.T) {
connIndex.Remove(wc3) // Remove from middle from user2
assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2, wc4})
assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{wc1})
assert.True(t, connIndex.Has(wc2))
assert.False(t, connIndex.Has(wc3))
assert.True(t, connIndex.Has(wc4))
assert.Len(t, connIndex.All(), 3)
})
t.Run("RemoveUser1", func(t *testing.T) {
connIndex.Remove(wc1) // Remove sole connection from user1
assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2, wc4})
assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{})
assert.Len(t, connIndex.ForUser(wc1.UserId), 0)
assert.Len(t, connIndex.All(), 2)
assert.False(t, connIndex.Has(wc1))
assert.True(t, connIndex.Has(wc2))
})
t.Run("RemoveEndUser2", func(t *testing.T) {
connIndex.Remove(wc4) // Remove from end from user2
assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2})
assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{})
assert.True(t, connIndex.Has(wc2))
assert.False(t, connIndex.Has(wc3))
assert.False(t, connIndex.Has(wc4))
assert.Len(t, connIndex.All(), 1)
})
})
t.Run("ByConnectionId", func(t *testing.T) {
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, fastIterate)
// User1
wc1ID := model.NewId()
wc1 := &WebConn{
Platform: th.Service,
Suite: th.Suite,
UserId: th.BasicUser.Id,
}
wc1.SetConnectionID(wc1ID)
wc1.SetSession(&model.Session{})
// User2
wc2ID := model.NewId()
wc2 := &WebConn{
Platform: th.Service,
Suite: th.Suite,
UserId: th.BasicUser2.Id,
}
wc2.SetConnectionID(wc2ID)
wc2.SetSession(&model.Session{})
wc3ID := model.NewId()
wc3 := &WebConn{
Platform: th.Service,
Suite: th.Suite,
UserId: wc2.UserId,
}
wc3.SetConnectionID(wc3ID)
wc3.SetSession(&model.Session{})
t.Run("no connections", func(t *testing.T) {
assert.False(t, connIndex.Has(wc1))
assert.False(t, connIndex.Has(wc2))
assert.False(t, connIndex.Has(wc3))
assert.Empty(t, connIndex.byConnectionId)
})
t.Run("adding", func(t *testing.T) {
connIndex.Add(wc1)
connIndex.Add(wc3)
assert.Len(t, connIndex.byConnectionId, 2)
assert.Equal(t, wc1, connIndex.ForConnection(wc1ID))
assert.Equal(t, wc3, connIndex.ForConnection(wc3ID))
assert.Equal(t, (*WebConn)(nil), connIndex.ForConnection(wc2ID))
})
t.Run("removing", func(t *testing.T) {
connIndex.Remove(wc3)
assert.Len(t, connIndex.byConnectionId, 1)
assert.Equal(t, wc1, connIndex.ForConnection(wc1ID))
assert.Equal(t, (*WebConn)(nil), connIndex.ForConnection(wc3ID))
assert.Equal(t, (*WebConn)(nil), connIndex.ForConnection(wc2ID))
})
})
}) })
}
t.Run("RemoveMiddleUser2", func(t *testing.T) {
connIndex.Remove(wc3) // Remove from middle from user2
assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2, wc4})
assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{wc1})
assert.True(t, connIndex.Has(wc2))
assert.False(t, connIndex.Has(wc3))
assert.True(t, connIndex.Has(wc4))
assert.Len(t, connIndex.All(), 3)
})
t.Run("RemoveUser1", func(t *testing.T) {
connIndex.Remove(wc1) // Remove sole connection from user1
assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2, wc4})
assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{})
assert.Len(t, connIndex.ForUser(wc1.UserId), 0)
assert.Len(t, connIndex.All(), 2)
assert.False(t, connIndex.Has(wc1))
assert.True(t, connIndex.Has(wc2))
})
t.Run("RemoveEndUser2", func(t *testing.T) {
connIndex.Remove(wc4) // Remove from end from user2
assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2})
assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{})
assert.True(t, connIndex.Has(wc2))
assert.False(t, connIndex.Has(wc3))
assert.False(t, connIndex.Has(wc4))
assert.Len(t, connIndex.All(), 1)
})
})
t.Run("ByConnectionId", func(t *testing.T) {
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger)
// User1
wc1ID := model.NewId()
wc1 := &WebConn{
Platform: th.Service,
Suite: th.Suite,
UserId: th.BasicUser.Id,
}
wc1.SetConnectionID(wc1ID)
wc1.SetSession(&model.Session{})
// User2
wc2ID := model.NewId()
wc2 := &WebConn{
Platform: th.Service,
Suite: th.Suite,
UserId: th.BasicUser2.Id,
}
wc2.SetConnectionID(wc2ID)
wc2.SetSession(&model.Session{})
wc3ID := model.NewId()
wc3 := &WebConn{
Platform: th.Service,
Suite: th.Suite,
UserId: wc2.UserId,
}
wc3.SetConnectionID(wc3ID)
wc3.SetSession(&model.Session{})
t.Run("no connections", func(t *testing.T) {
assert.False(t, connIndex.Has(wc1))
assert.False(t, connIndex.Has(wc2))
assert.False(t, connIndex.Has(wc3))
assert.Empty(t, connIndex.byConnectionId)
})
t.Run("adding", func(t *testing.T) {
connIndex.Add(wc1)
connIndex.Add(wc3)
assert.Len(t, connIndex.byConnectionId, 2)
assert.Equal(t, wc1, connIndex.ForConnection(wc1ID))
assert.Equal(t, wc3, connIndex.ForConnection(wc3ID))
assert.Equal(t, (*WebConn)(nil), connIndex.ForConnection(wc2ID))
})
t.Run("removing", func(t *testing.T) {
connIndex.Remove(wc3)
assert.Len(t, connIndex.byConnectionId, 1)
assert.Equal(t, wc1, connIndex.ForConnection(wc1ID))
assert.Equal(t, (*WebConn)(nil), connIndex.ForConnection(wc3ID))
assert.Equal(t, (*WebConn)(nil), connIndex.ForConnection(wc2ID))
})
})
t.Run("ByChannelId", func(t *testing.T) { t.Run("ByChannelId", func(t *testing.T) {
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger) connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, true)
// User1 // User1
wc1ID := model.NewId() wc1ID := model.NewId()
@ -414,7 +419,7 @@ func TestHubConnIndexIncorrectRemoval(t *testing.T) {
th := Setup(t) th := Setup(t)
defer th.TearDown() defer th.TearDown()
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger) connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, false)
// User2 // User2
wc2 := &WebConn{ wc2 := &WebConn{
@ -461,7 +466,7 @@ func TestHubConnIndexInactive(t *testing.T) {
th := Setup(t) th := Setup(t)
defer th.TearDown() defer th.TearDown()
connIndex := newHubConnectionIndex(2*time.Second, th.Service.Store, th.Service.logger) connIndex := newHubConnectionIndex(2*time.Second, th.Service.Store, th.Service.logger, false)
// User1 // User1
wc1 := &WebConn{ wc1 := &WebConn{
@ -621,7 +626,7 @@ func TestHubWebConnCount(t *testing.T) {
func BenchmarkHubConnIndex(b *testing.B) { func BenchmarkHubConnIndex(b *testing.B) {
th := Setup(b).InitBasic() th := Setup(b).InitBasic()
defer th.TearDown() defer th.TearDown()
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger) connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, false)
// User1 // User1
wc1 := &WebConn{ wc1 := &WebConn{
@ -666,7 +671,7 @@ func TestHubConnIndexRemoveMemLeak(t *testing.T) {
th := Setup(t) th := Setup(t)
defer th.TearDown() defer th.TearDown()
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger) connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, false)
wc := &WebConn{ wc := &WebConn{
Platform: th.Service, Platform: th.Service,

View File

@ -440,6 +440,7 @@ type ServiceSettings struct {
MaximumPayloadSizeBytes *int64 `access:"environment_file_storage,write_restrictable,cloud_restrictable"` MaximumPayloadSizeBytes *int64 `access:"environment_file_storage,write_restrictable,cloud_restrictable"`
MaximumURLLength *int `access:"environment_file_storage,write_restrictable,cloud_restrictable"` MaximumURLLength *int `access:"environment_file_storage,write_restrictable,cloud_restrictable"`
ScheduledPosts *bool `access:"site_posts"` ScheduledPosts *bool `access:"site_posts"`
EnableWebHubChannelIteration *bool `access:"write_restrictable,cloud_restrictable"` // telemetry: none
} }
var MattermostGiphySdkKey string var MattermostGiphySdkKey string
@ -967,6 +968,10 @@ func (s *ServiceSettings) SetDefaults(isUpdate bool) {
if s.ScheduledPosts == nil { if s.ScheduledPosts == nil {
s.ScheduledPosts = NewPointer(true) s.ScheduledPosts = NewPointer(true)
} }
if s.EnableWebHubChannelIteration == nil {
s.EnableWebHubChannelIteration = NewPointer(false)
}
} }
type CacheSettings struct { type CacheSettings struct {