diff --git a/server/channels/api4/post_test.go b/server/channels/api4/post_test.go index 99b53532c1..a6385837ec 100644 --- a/server/channels/api4/post_test.go +++ b/server/channels/api4/post_test.go @@ -3163,7 +3163,9 @@ func TestWebHubMembership(t *testing.T) { func TestWebHubCloseConnOnDBFail(t *testing.T) { t.Skip("MM-61780") - th := Setup(t).InitBasic() + th := SetupConfig(t, func(cfg *model.Config) { + *cfg.ServiceSettings.EnableWebHubChannelIteration = true + }).InitBasic() defer func() { th.TearDown() // Asserting that the error message is present in the log diff --git a/server/channels/app/platform/web_conn.go b/server/channels/app/platform/web_conn.go index c92856eecd..9f6e7862e0 100644 --- a/server/channels/app/platform/web_conn.go +++ b/server/channels/app/platform/web_conn.go @@ -27,6 +27,7 @@ import ( "github.com/mattermost/mattermost/server/public/shared/i18n" "github.com/mattermost/mattermost/server/public/shared/mlog" "github.com/mattermost/mattermost/server/public/shared/request" + "github.com/mattermost/mattermost/server/v8/channels/store/sqlstore" ) const ( @@ -95,8 +96,10 @@ type WebConn struct { UserId string PostedAck bool - lastUserActivityAt int64 - send chan model.WebSocketMessage + allChannelMembers map[string]string + lastAllChannelMembersTime int64 + lastUserActivityAt int64 + send chan model.WebSocketMessage // deadQueue behaves like a queue of a finite size // which is used to store all messages that are sent via the websocket. // It basically acts as the user-space socket buffer, and is used @@ -735,6 +738,8 @@ func (wc *WebConn) drainDeadQueue(index int) error { // InvalidateCache resets all internal data of the WebConn. func (wc *WebConn) InvalidateCache() { + wc.allChannelMembers = nil + wc.lastAllChannelMembersTime = 0 wc.SetSession(nil) wc.SetSessionExpiresAt(0) } @@ -912,9 +917,36 @@ func (wc *WebConn) ShouldSendEvent(msg *model.WebSocketEvent) bool { return false } - // We don't need to do any further checks because this is already scoped - // to channel members from web_hub. - return true + if *wc.Platform.Config().ServiceSettings.EnableWebHubChannelIteration { + // We don't need to do any further checks because this is already scoped + // to channel members from web_hub. + return true + } + + if model.GetMillis()-wc.lastAllChannelMembersTime > webConnMemberCacheTime { + wc.allChannelMembers = nil + wc.lastAllChannelMembersTime = 0 + } + + if wc.allChannelMembers == nil { + result, err := wc.Platform.Store.Channel().GetAllChannelMembersForUser( + sqlstore.RequestContextWithMaster(request.EmptyContext(wc.Platform.logger)), + wc.UserId, + false, + false, + ) + if err != nil { + mlog.Error("webhub.shouldSendEvent.", mlog.Err(err)) + return false + } + wc.allChannelMembers = result + wc.lastAllChannelMembersTime = model.GetMillis() + } + + if _, ok := wc.allChannelMembers[chID]; ok { + return true + } + return false } // Only report events to users who are in the team for the event diff --git a/server/channels/app/platform/web_hub.go b/server/channels/app/platform/web_hub.go index a73f611067..56c38bc736 100644 --- a/server/channels/app/platform/web_hub.go +++ b/server/channels/app/platform/web_hub.go @@ -406,6 +406,7 @@ func (h *Hub) Start() { connIndex := newHubConnectionIndex(inactiveConnReaperInterval, h.platform.Store, h.platform.logger, + *h.platform.Config().ServiceSettings.EnableWebHubChannelIteration, ) for { @@ -519,6 +520,11 @@ func (h *Hub) Start() { for _, webConn := range connIndex.ForUser(userID) { webConn.InvalidateCache() } + + if !*h.platform.Config().ServiceSettings.EnableWebHubChannelIteration { + continue + } + err := connIndex.InvalidateCMCacheForUser(userID) if err != nil { h.platform.Log().Error("Error while invalidating channel member cache", mlog.String("user_id", userID), mlog.Err(err)) @@ -586,7 +592,7 @@ func (h *Hub) Start() { } } else if userID := msg.GetBroadcast().UserId; userID != "" { targetConns = connIndex.ForUser(userID) - } else if channelID := msg.GetBroadcast().ChannelId; channelID != "" { + } else if channelID := msg.GetBroadcast().ChannelId; channelID != "" && *h.platform.Config().ServiceSettings.EnableWebHubChannelIteration { targetConns = connIndex.ForChannel(channelID) } if targetConns != nil { @@ -653,6 +659,10 @@ func closeAndRemoveConn(connIndex *hubConnectionIndex, conn *WebConn) { connIndex.Remove(conn) } +type connMetadata struct { + channelIDs []string +} + // hubConnectionIndex provides fast addition, removal, and iteration of web connections. // It requires 4 functionalities which need to be very fast: // - check if a connection exists or not. @@ -660,90 +670,95 @@ func closeAndRemoveConn(connIndex *hubConnectionIndex, conn *WebConn) { // - get all connections for a given channelID. // - get all connections. type hubConnectionIndex struct { - // byUserId stores the list of connections for a given userID - byUserId map[string][]*WebConn - // byChannelID stores the list of connections for a given channelID. - byChannelID map[string][]*WebConn - // byConnection serves the dual purpose of storing the index of the webconn - // in the value of byUserId map, and also to get all connections. - byConnection map[*WebConn]int + // byUserId stores the set of connections for a given userID + byUserId map[string]map[*WebConn]struct{} + // byChannelID stores the set of connections for a given channelID + byChannelID map[string]map[*WebConn]struct{} + // byConnection serves the dual purpose of storing the channelIDs + // and also to get all connections + byConnection map[*WebConn]connMetadata byConnectionId map[string]*WebConn // staleThreshold is the limit beyond which inactive connections // will be deleted. staleThreshold time.Duration - store store.Store - logger mlog.LoggerIFace + fastIteration bool + store store.Store + logger mlog.LoggerIFace } func newHubConnectionIndex(interval time.Duration, store store.Store, logger mlog.LoggerIFace, + fastIteration bool, ) *hubConnectionIndex { return &hubConnectionIndex{ - byUserId: make(map[string][]*WebConn), - byChannelID: make(map[string][]*WebConn), - byConnection: make(map[*WebConn]int), + byUserId: make(map[string]map[*WebConn]struct{}), + byChannelID: make(map[string]map[*WebConn]struct{}), + byConnection: make(map[*WebConn]connMetadata), byConnectionId: make(map[string]*WebConn), staleThreshold: interval, store: store, logger: logger, + fastIteration: fastIteration, } } func (i *hubConnectionIndex) Add(wc *WebConn) error { - cm, err := i.store.Channel().GetAllChannelMembersForUser(request.EmptyContext(i.logger), wc.UserId, false, false) - if err != nil { - return fmt.Errorf("error getChannelMembersForUser: %v", err) - } - for chID := range cm { - i.byChannelID[chID] = append(i.byChannelID[chID], wc) + var channelIDs []string + if i.fastIteration { + cm, err := i.store.Channel().GetAllChannelMembersForUser(request.EmptyContext(i.logger), wc.UserId, false, false) + if err != nil { + return fmt.Errorf("error getChannelMembersForUser: %v", err) + } + + // Store channel IDs and add to byChannelID + channelIDs = make([]string, 0, len(cm)) + for chID := range cm { + channelIDs = append(channelIDs, chID) + + // Initialize the channel's map if it doesn't exist + if _, ok := i.byChannelID[chID]; !ok { + i.byChannelID[chID] = make(map[*WebConn]struct{}) + } + i.byChannelID[chID][wc] = struct{}{} + } } - i.byUserId[wc.UserId] = append(i.byUserId[wc.UserId], wc) - i.byConnection[wc] = len(i.byUserId[wc.UserId]) - 1 + // Initialize the user's map if it doesn't exist + if _, ok := i.byUserId[wc.UserId]; !ok { + i.byUserId[wc.UserId] = make(map[*WebConn]struct{}) + } + i.byUserId[wc.UserId][wc] = struct{}{} + i.byConnection[wc] = connMetadata{ + channelIDs: channelIDs, + } i.byConnectionId[wc.GetConnectionID()] = wc return nil } func (i *hubConnectionIndex) Remove(wc *WebConn) { - userConnIndex, ok := i.byConnection[wc] + connMeta, ok := i.byConnection[wc] if !ok { return } - // Remove the wc from i.byUserId - // get the conn slice. - userConnections := i.byUserId[wc.UserId] - // get the last connection. - last := userConnections[len(userConnections)-1] - // https://go.dev/wiki/SliceTricks#delete-without-preserving-order - userConnections[userConnIndex] = last - userConnections[len(userConnections)-1] = nil - i.byUserId[wc.UserId] = userConnections[:len(userConnections)-1] - // set the index of the connection that was moved to the new index. - i.byConnection[last] = userConnIndex + // Remove from byUserId + if userConns, ok := i.byUserId[wc.UserId]; ok { + delete(userConns, wc) + } - connectionID := wc.GetConnectionID() - // Remove webconns from i.byChannelID - // This has O(n) complexity. We are trading off speed while removing - // a connection, to improve broadcasting a message. - for chID, webConns := range i.byChannelID { - // https://go.dev/wiki/SliceTricks#filtering-without-allocating - filtered := webConns[:0] - for _, conn := range webConns { - if conn.GetConnectionID() != connectionID { - filtered = append(filtered, conn) + if i.fastIteration { + // Remove from byChannelID for each channel + for _, chID := range connMeta.channelIDs { + if channelConns, ok := i.byChannelID[chID]; ok { + delete(channelConns, wc) } } - for i := len(filtered); i < len(webConns); i++ { - webConns[i] = nil - } - i.byChannelID[chID] = filtered } delete(i.byConnection, wc) - delete(i.byConnectionId, connectionID) + delete(i.byConnectionId, wc.GetConnectionID()) } func (i *hubConnectionIndex) InvalidateCMCacheForUser(userID string) error { @@ -753,25 +768,40 @@ func (i *hubConnectionIndex) InvalidateCMCacheForUser(userID string) error { return err } - // Clear out all user entries which belong to channels. - for chID, webConns := range i.byChannelID { - // https://go.dev/wiki/SliceTricks#filtering-without-allocating - filtered := webConns[:0] - for _, conn := range webConns { - if conn.UserId != userID { - filtered = append(filtered, conn) + // Get all connections for this user + conns := i.ForUser(userID) + + // Remove all user connections from existing channels + for _, conn := range conns { + if meta, ok := i.byConnection[conn]; ok { + // Remove from old channels + for _, chID := range meta.channelIDs { + if channelConns, ok := i.byChannelID[chID]; ok { + delete(channelConns, conn) + } } } - for i := len(filtered); i < len(webConns); i++ { - webConns[i] = nil - } - i.byChannelID[chID] = filtered } - // re-populate the cache - for chID := range cm { - i.byChannelID[chID] = append(i.byChannelID[chID], i.ForUser(userID)...) + // Add connections to new channels + for _, conn := range conns { + newChannelIDs := make([]string, 0, len(cm)) + for chID := range cm { + newChannelIDs = append(newChannelIDs, chID) + // Initialize channel map if needed + if _, ok := i.byChannelID[chID]; !ok { + i.byChannelID[chID] = make(map[*WebConn]struct{}) + } + i.byChannelID[chID][conn] = struct{}{} + } + + // Update connection metadata + if meta, ok := i.byConnection[conn]; ok { + meta.channelIDs = newChannelIDs + i.byConnection[conn] = meta + } } + return nil } @@ -782,26 +812,31 @@ func (i *hubConnectionIndex) Has(wc *WebConn) bool { // ForUser returns all connections for a user ID. func (i *hubConnectionIndex) ForUser(id string) []*WebConn { - // Fast path if there is only one or fewer connection. - if len(i.byUserId[id]) <= 1 { - return i.byUserId[id] + userConns, ok := i.byUserId[id] + if !ok { + return nil + } + + // Move to using maps.Keys to use the iterator pattern with 1.23. + // This saves the additional slice copy. + conns := make([]*WebConn, 0, len(userConns)) + for conn := range userConns { + conns = append(conns, conn) } - // If there are multiple connections per user, - // then we have to return a clone of the slice - // to allow connIndex.Remove to be safely called while - // iterating the slice. - conns := make([]*WebConn, len(i.byUserId[id])) - copy(conns, i.byUserId[id]) return conns } // ForChannel returns all connections for a channelID. func (i *hubConnectionIndex) ForChannel(channelID string) []*WebConn { - // Note: this is expensive because usually there will be - // more than 1 member for a channel, and broadcasting - // is a hot path, but worth it. - conns := make([]*WebConn, len(i.byChannelID[channelID])) - copy(conns, i.byChannelID[channelID]) + channelConns, ok := i.byChannelID[channelID] + if !ok { + return nil + } + + conns := make([]*WebConn, 0, len(channelConns)) + for conn := range channelConns { + conns = append(conns, conn) + } return conns } @@ -822,7 +857,7 @@ func (i *hubConnectionIndex) ForConnection(id string) *WebConn { } // All returns the full webConn index. -func (i *hubConnectionIndex) All() map[*WebConn]int { +func (i *hubConnectionIndex) All() map[*WebConn]connMetadata { return i.byConnection } diff --git a/server/channels/app/platform/web_hub_test.go b/server/channels/app/platform/web_hub_test.go index 83eda9a465..74fea05dbf 100644 --- a/server/channels/app/platform/web_hub_test.go +++ b/server/channels/app/platform/web_hub_test.go @@ -6,6 +6,7 @@ package platform import ( "bytes" "encoding/json" + "fmt" "net" "net/http" "net/http/httptest" @@ -197,154 +198,158 @@ func TestHubConnIndex(t *testing.T) { }) require.NoError(t, err) - t.Run("Basic", func(t *testing.T) { - connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger) + for _, fastIterate := range []bool{true, false} { + t.Run(fmt.Sprintf("fastIterate=%t", fastIterate), func(t *testing.T) { + t.Run("Basic", func(t *testing.T) { + connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, fastIterate) - // User1 - wc1 := &WebConn{ - Platform: th.Service, - Suite: th.Suite, - UserId: model.NewId(), - } - wc1.SetConnectionID(model.NewId()) - wc1.SetSession(&model.Session{}) + // User1 + wc1 := &WebConn{ + Platform: th.Service, + Suite: th.Suite, + UserId: model.NewId(), + } + wc1.SetConnectionID(model.NewId()) + wc1.SetSession(&model.Session{}) - // User2 - wc2 := &WebConn{ - Platform: th.Service, - Suite: th.Suite, - UserId: model.NewId(), - } - wc2.SetConnectionID(model.NewId()) - wc2.SetSession(&model.Session{}) + // User2 + wc2 := &WebConn{ + Platform: th.Service, + Suite: th.Suite, + UserId: model.NewId(), + } + wc2.SetConnectionID(model.NewId()) + wc2.SetSession(&model.Session{}) - wc3 := &WebConn{ - Platform: th.Service, - Suite: th.Suite, - UserId: wc2.UserId, - } - wc3.SetConnectionID(model.NewId()) - wc3.SetSession(&model.Session{}) + wc3 := &WebConn{ + Platform: th.Service, + Suite: th.Suite, + UserId: wc2.UserId, + } + wc3.SetConnectionID(model.NewId()) + wc3.SetSession(&model.Session{}) - wc4 := &WebConn{ - Platform: th.Service, - Suite: th.Suite, - UserId: wc2.UserId, - } - wc4.SetConnectionID(model.NewId()) - wc4.SetSession(&model.Session{}) + wc4 := &WebConn{ + Platform: th.Service, + Suite: th.Suite, + UserId: wc2.UserId, + } + wc4.SetConnectionID(model.NewId()) + wc4.SetSession(&model.Session{}) - connIndex.Add(wc1) - connIndex.Add(wc2) - connIndex.Add(wc3) - connIndex.Add(wc4) + connIndex.Add(wc1) + connIndex.Add(wc2) + connIndex.Add(wc3) + connIndex.Add(wc4) - t.Run("Basic", func(t *testing.T) { - assert.True(t, connIndex.Has(wc1)) - assert.True(t, connIndex.Has(wc2)) + t.Run("Basic", func(t *testing.T) { + assert.True(t, connIndex.Has(wc1)) + assert.True(t, connIndex.Has(wc2)) - assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2, wc3, wc4}) - assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{wc1}) - assert.True(t, connIndex.Has(wc2)) - assert.True(t, connIndex.Has(wc1)) - assert.Len(t, connIndex.All(), 4) + assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2, wc3, wc4}) + assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{wc1}) + assert.True(t, connIndex.Has(wc2)) + assert.True(t, connIndex.Has(wc1)) + assert.Len(t, connIndex.All(), 4) + }) + + t.Run("RemoveMiddleUser2", func(t *testing.T) { + connIndex.Remove(wc3) // Remove from middle from user2 + + assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2, wc4}) + assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{wc1}) + assert.True(t, connIndex.Has(wc2)) + assert.False(t, connIndex.Has(wc3)) + assert.True(t, connIndex.Has(wc4)) + assert.Len(t, connIndex.All(), 3) + }) + + t.Run("RemoveUser1", func(t *testing.T) { + connIndex.Remove(wc1) // Remove sole connection from user1 + + assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2, wc4}) + assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{}) + assert.Len(t, connIndex.ForUser(wc1.UserId), 0) + assert.Len(t, connIndex.All(), 2) + assert.False(t, connIndex.Has(wc1)) + assert.True(t, connIndex.Has(wc2)) + }) + + t.Run("RemoveEndUser2", func(t *testing.T) { + connIndex.Remove(wc4) // Remove from end from user2 + + assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2}) + assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{}) + assert.True(t, connIndex.Has(wc2)) + assert.False(t, connIndex.Has(wc3)) + assert.False(t, connIndex.Has(wc4)) + assert.Len(t, connIndex.All(), 1) + }) + }) + + t.Run("ByConnectionId", func(t *testing.T) { + connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, fastIterate) + + // User1 + wc1ID := model.NewId() + wc1 := &WebConn{ + Platform: th.Service, + Suite: th.Suite, + UserId: th.BasicUser.Id, + } + wc1.SetConnectionID(wc1ID) + wc1.SetSession(&model.Session{}) + + // User2 + wc2ID := model.NewId() + wc2 := &WebConn{ + Platform: th.Service, + Suite: th.Suite, + UserId: th.BasicUser2.Id, + } + wc2.SetConnectionID(wc2ID) + wc2.SetSession(&model.Session{}) + + wc3ID := model.NewId() + wc3 := &WebConn{ + Platform: th.Service, + Suite: th.Suite, + UserId: wc2.UserId, + } + wc3.SetConnectionID(wc3ID) + wc3.SetSession(&model.Session{}) + + t.Run("no connections", func(t *testing.T) { + assert.False(t, connIndex.Has(wc1)) + assert.False(t, connIndex.Has(wc2)) + assert.False(t, connIndex.Has(wc3)) + assert.Empty(t, connIndex.byConnectionId) + }) + + t.Run("adding", func(t *testing.T) { + connIndex.Add(wc1) + connIndex.Add(wc3) + + assert.Len(t, connIndex.byConnectionId, 2) + assert.Equal(t, wc1, connIndex.ForConnection(wc1ID)) + assert.Equal(t, wc3, connIndex.ForConnection(wc3ID)) + assert.Equal(t, (*WebConn)(nil), connIndex.ForConnection(wc2ID)) + }) + + t.Run("removing", func(t *testing.T) { + connIndex.Remove(wc3) + + assert.Len(t, connIndex.byConnectionId, 1) + assert.Equal(t, wc1, connIndex.ForConnection(wc1ID)) + assert.Equal(t, (*WebConn)(nil), connIndex.ForConnection(wc3ID)) + assert.Equal(t, (*WebConn)(nil), connIndex.ForConnection(wc2ID)) + }) + }) }) - - t.Run("RemoveMiddleUser2", func(t *testing.T) { - connIndex.Remove(wc3) // Remove from middle from user2 - - assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2, wc4}) - assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{wc1}) - assert.True(t, connIndex.Has(wc2)) - assert.False(t, connIndex.Has(wc3)) - assert.True(t, connIndex.Has(wc4)) - assert.Len(t, connIndex.All(), 3) - }) - - t.Run("RemoveUser1", func(t *testing.T) { - connIndex.Remove(wc1) // Remove sole connection from user1 - - assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2, wc4}) - assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{}) - assert.Len(t, connIndex.ForUser(wc1.UserId), 0) - assert.Len(t, connIndex.All(), 2) - assert.False(t, connIndex.Has(wc1)) - assert.True(t, connIndex.Has(wc2)) - }) - - t.Run("RemoveEndUser2", func(t *testing.T) { - connIndex.Remove(wc4) // Remove from end from user2 - - assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2}) - assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{}) - assert.True(t, connIndex.Has(wc2)) - assert.False(t, connIndex.Has(wc3)) - assert.False(t, connIndex.Has(wc4)) - assert.Len(t, connIndex.All(), 1) - }) - }) - - t.Run("ByConnectionId", func(t *testing.T) { - connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger) - - // User1 - wc1ID := model.NewId() - wc1 := &WebConn{ - Platform: th.Service, - Suite: th.Suite, - UserId: th.BasicUser.Id, - } - wc1.SetConnectionID(wc1ID) - wc1.SetSession(&model.Session{}) - - // User2 - wc2ID := model.NewId() - wc2 := &WebConn{ - Platform: th.Service, - Suite: th.Suite, - UserId: th.BasicUser2.Id, - } - wc2.SetConnectionID(wc2ID) - wc2.SetSession(&model.Session{}) - - wc3ID := model.NewId() - wc3 := &WebConn{ - Platform: th.Service, - Suite: th.Suite, - UserId: wc2.UserId, - } - wc3.SetConnectionID(wc3ID) - wc3.SetSession(&model.Session{}) - - t.Run("no connections", func(t *testing.T) { - assert.False(t, connIndex.Has(wc1)) - assert.False(t, connIndex.Has(wc2)) - assert.False(t, connIndex.Has(wc3)) - assert.Empty(t, connIndex.byConnectionId) - }) - - t.Run("adding", func(t *testing.T) { - connIndex.Add(wc1) - connIndex.Add(wc3) - - assert.Len(t, connIndex.byConnectionId, 2) - assert.Equal(t, wc1, connIndex.ForConnection(wc1ID)) - assert.Equal(t, wc3, connIndex.ForConnection(wc3ID)) - assert.Equal(t, (*WebConn)(nil), connIndex.ForConnection(wc2ID)) - }) - - t.Run("removing", func(t *testing.T) { - connIndex.Remove(wc3) - - assert.Len(t, connIndex.byConnectionId, 1) - assert.Equal(t, wc1, connIndex.ForConnection(wc1ID)) - assert.Equal(t, (*WebConn)(nil), connIndex.ForConnection(wc3ID)) - assert.Equal(t, (*WebConn)(nil), connIndex.ForConnection(wc2ID)) - }) - }) + } t.Run("ByChannelId", func(t *testing.T) { - connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger) + connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, true) // User1 wc1ID := model.NewId() @@ -414,7 +419,7 @@ func TestHubConnIndexIncorrectRemoval(t *testing.T) { th := Setup(t) defer th.TearDown() - connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger) + connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, false) // User2 wc2 := &WebConn{ @@ -461,7 +466,7 @@ func TestHubConnIndexInactive(t *testing.T) { th := Setup(t) defer th.TearDown() - connIndex := newHubConnectionIndex(2*time.Second, th.Service.Store, th.Service.logger) + connIndex := newHubConnectionIndex(2*time.Second, th.Service.Store, th.Service.logger, false) // User1 wc1 := &WebConn{ @@ -621,7 +626,7 @@ func TestHubWebConnCount(t *testing.T) { func BenchmarkHubConnIndex(b *testing.B) { th := Setup(b).InitBasic() defer th.TearDown() - connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger) + connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, false) // User1 wc1 := &WebConn{ @@ -666,7 +671,7 @@ func TestHubConnIndexRemoveMemLeak(t *testing.T) { th := Setup(t) defer th.TearDown() - connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger) + connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, false) wc := &WebConn{ Platform: th.Service, diff --git a/server/public/model/config.go b/server/public/model/config.go index 9fd44cc6c7..4692d5b656 100644 --- a/server/public/model/config.go +++ b/server/public/model/config.go @@ -434,6 +434,7 @@ type ServiceSettings struct { MaximumPayloadSizeBytes *int64 `access:"environment_file_storage,write_restrictable,cloud_restrictable"` MaximumURLLength *int `access:"environment_file_storage,write_restrictable,cloud_restrictable"` ScheduledPosts *bool `access:"site_posts"` + EnableWebHubChannelIteration *bool `access:"write_restrictable,cloud_restrictable"` // telemetry: none } var MattermostGiphySdkKey string @@ -961,6 +962,10 @@ func (s *ServiceSettings) SetDefaults(isUpdate bool) { if s.ScheduledPosts == nil { s.ScheduledPosts = NewPointer(true) } + + if s.EnableWebHubChannelIteration == nil { + s.EnableWebHubChannelIteration = NewPointer(false) + } } type CacheSettings struct {