MM-62960: Improve webConn remove performance from hubConnectionIndex (#30178) (#30220)

Automatic Merge
This commit is contained in:
Agniva De Sarker 2025-02-14 15:20:08 +05:30 committed by GitHub
parent 457735b51a
commit d2e4eadb73
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 304 additions and 225 deletions

View File

@ -3163,7 +3163,9 @@ func TestWebHubMembership(t *testing.T) {
func TestWebHubCloseConnOnDBFail(t *testing.T) {
t.Skip("MM-61780")
th := Setup(t).InitBasic()
th := SetupConfig(t, func(cfg *model.Config) {
*cfg.ServiceSettings.EnableWebHubChannelIteration = true
}).InitBasic()
defer func() {
th.TearDown()
// Asserting that the error message is present in the log

View File

@ -27,6 +27,7 @@ import (
"github.com/mattermost/mattermost/server/public/shared/i18n"
"github.com/mattermost/mattermost/server/public/shared/mlog"
"github.com/mattermost/mattermost/server/public/shared/request"
"github.com/mattermost/mattermost/server/v8/channels/store/sqlstore"
)
const (
@ -95,8 +96,10 @@ type WebConn struct {
UserId string
PostedAck bool
lastUserActivityAt int64
send chan model.WebSocketMessage
allChannelMembers map[string]string
lastAllChannelMembersTime int64
lastUserActivityAt int64
send chan model.WebSocketMessage
// deadQueue behaves like a queue of a finite size
// which is used to store all messages that are sent via the websocket.
// It basically acts as the user-space socket buffer, and is used
@ -735,6 +738,8 @@ func (wc *WebConn) drainDeadQueue(index int) error {
// InvalidateCache resets all internal data of the WebConn.
func (wc *WebConn) InvalidateCache() {
wc.allChannelMembers = nil
wc.lastAllChannelMembersTime = 0
wc.SetSession(nil)
wc.SetSessionExpiresAt(0)
}
@ -912,9 +917,36 @@ func (wc *WebConn) ShouldSendEvent(msg *model.WebSocketEvent) bool {
return false
}
// We don't need to do any further checks because this is already scoped
// to channel members from web_hub.
return true
if *wc.Platform.Config().ServiceSettings.EnableWebHubChannelIteration {
// We don't need to do any further checks because this is already scoped
// to channel members from web_hub.
return true
}
if model.GetMillis()-wc.lastAllChannelMembersTime > webConnMemberCacheTime {
wc.allChannelMembers = nil
wc.lastAllChannelMembersTime = 0
}
if wc.allChannelMembers == nil {
result, err := wc.Platform.Store.Channel().GetAllChannelMembersForUser(
sqlstore.RequestContextWithMaster(request.EmptyContext(wc.Platform.logger)),
wc.UserId,
false,
false,
)
if err != nil {
mlog.Error("webhub.shouldSendEvent.", mlog.Err(err))
return false
}
wc.allChannelMembers = result
wc.lastAllChannelMembersTime = model.GetMillis()
}
if _, ok := wc.allChannelMembers[chID]; ok {
return true
}
return false
}
// Only report events to users who are in the team for the event

View File

@ -406,6 +406,7 @@ func (h *Hub) Start() {
connIndex := newHubConnectionIndex(inactiveConnReaperInterval,
h.platform.Store,
h.platform.logger,
*h.platform.Config().ServiceSettings.EnableWebHubChannelIteration,
)
for {
@ -519,6 +520,11 @@ func (h *Hub) Start() {
for _, webConn := range connIndex.ForUser(userID) {
webConn.InvalidateCache()
}
if !*h.platform.Config().ServiceSettings.EnableWebHubChannelIteration {
continue
}
err := connIndex.InvalidateCMCacheForUser(userID)
if err != nil {
h.platform.Log().Error("Error while invalidating channel member cache", mlog.String("user_id", userID), mlog.Err(err))
@ -586,7 +592,7 @@ func (h *Hub) Start() {
}
} else if userID := msg.GetBroadcast().UserId; userID != "" {
targetConns = connIndex.ForUser(userID)
} else if channelID := msg.GetBroadcast().ChannelId; channelID != "" {
} else if channelID := msg.GetBroadcast().ChannelId; channelID != "" && *h.platform.Config().ServiceSettings.EnableWebHubChannelIteration {
targetConns = connIndex.ForChannel(channelID)
}
if targetConns != nil {
@ -653,6 +659,10 @@ func closeAndRemoveConn(connIndex *hubConnectionIndex, conn *WebConn) {
connIndex.Remove(conn)
}
type connMetadata struct {
channelIDs []string
}
// hubConnectionIndex provides fast addition, removal, and iteration of web connections.
// It requires 4 functionalities which need to be very fast:
// - check if a connection exists or not.
@ -660,90 +670,95 @@ func closeAndRemoveConn(connIndex *hubConnectionIndex, conn *WebConn) {
// - get all connections for a given channelID.
// - get all connections.
type hubConnectionIndex struct {
// byUserId stores the list of connections for a given userID
byUserId map[string][]*WebConn
// byChannelID stores the list of connections for a given channelID.
byChannelID map[string][]*WebConn
// byConnection serves the dual purpose of storing the index of the webconn
// in the value of byUserId map, and also to get all connections.
byConnection map[*WebConn]int
// byUserId stores the set of connections for a given userID
byUserId map[string]map[*WebConn]struct{}
// byChannelID stores the set of connections for a given channelID
byChannelID map[string]map[*WebConn]struct{}
// byConnection serves the dual purpose of storing the channelIDs
// and also to get all connections
byConnection map[*WebConn]connMetadata
byConnectionId map[string]*WebConn
// staleThreshold is the limit beyond which inactive connections
// will be deleted.
staleThreshold time.Duration
store store.Store
logger mlog.LoggerIFace
fastIteration bool
store store.Store
logger mlog.LoggerIFace
}
func newHubConnectionIndex(interval time.Duration,
store store.Store,
logger mlog.LoggerIFace,
fastIteration bool,
) *hubConnectionIndex {
return &hubConnectionIndex{
byUserId: make(map[string][]*WebConn),
byChannelID: make(map[string][]*WebConn),
byConnection: make(map[*WebConn]int),
byUserId: make(map[string]map[*WebConn]struct{}),
byChannelID: make(map[string]map[*WebConn]struct{}),
byConnection: make(map[*WebConn]connMetadata),
byConnectionId: make(map[string]*WebConn),
staleThreshold: interval,
store: store,
logger: logger,
fastIteration: fastIteration,
}
}
func (i *hubConnectionIndex) Add(wc *WebConn) error {
cm, err := i.store.Channel().GetAllChannelMembersForUser(request.EmptyContext(i.logger), wc.UserId, false, false)
if err != nil {
return fmt.Errorf("error getChannelMembersForUser: %v", err)
}
for chID := range cm {
i.byChannelID[chID] = append(i.byChannelID[chID], wc)
var channelIDs []string
if i.fastIteration {
cm, err := i.store.Channel().GetAllChannelMembersForUser(request.EmptyContext(i.logger), wc.UserId, false, false)
if err != nil {
return fmt.Errorf("error getChannelMembersForUser: %v", err)
}
// Store channel IDs and add to byChannelID
channelIDs = make([]string, 0, len(cm))
for chID := range cm {
channelIDs = append(channelIDs, chID)
// Initialize the channel's map if it doesn't exist
if _, ok := i.byChannelID[chID]; !ok {
i.byChannelID[chID] = make(map[*WebConn]struct{})
}
i.byChannelID[chID][wc] = struct{}{}
}
}
i.byUserId[wc.UserId] = append(i.byUserId[wc.UserId], wc)
i.byConnection[wc] = len(i.byUserId[wc.UserId]) - 1
// Initialize the user's map if it doesn't exist
if _, ok := i.byUserId[wc.UserId]; !ok {
i.byUserId[wc.UserId] = make(map[*WebConn]struct{})
}
i.byUserId[wc.UserId][wc] = struct{}{}
i.byConnection[wc] = connMetadata{
channelIDs: channelIDs,
}
i.byConnectionId[wc.GetConnectionID()] = wc
return nil
}
func (i *hubConnectionIndex) Remove(wc *WebConn) {
userConnIndex, ok := i.byConnection[wc]
connMeta, ok := i.byConnection[wc]
if !ok {
return
}
// Remove the wc from i.byUserId
// get the conn slice.
userConnections := i.byUserId[wc.UserId]
// get the last connection.
last := userConnections[len(userConnections)-1]
// https://go.dev/wiki/SliceTricks#delete-without-preserving-order
userConnections[userConnIndex] = last
userConnections[len(userConnections)-1] = nil
i.byUserId[wc.UserId] = userConnections[:len(userConnections)-1]
// set the index of the connection that was moved to the new index.
i.byConnection[last] = userConnIndex
// Remove from byUserId
if userConns, ok := i.byUserId[wc.UserId]; ok {
delete(userConns, wc)
}
connectionID := wc.GetConnectionID()
// Remove webconns from i.byChannelID
// This has O(n) complexity. We are trading off speed while removing
// a connection, to improve broadcasting a message.
for chID, webConns := range i.byChannelID {
// https://go.dev/wiki/SliceTricks#filtering-without-allocating
filtered := webConns[:0]
for _, conn := range webConns {
if conn.GetConnectionID() != connectionID {
filtered = append(filtered, conn)
if i.fastIteration {
// Remove from byChannelID for each channel
for _, chID := range connMeta.channelIDs {
if channelConns, ok := i.byChannelID[chID]; ok {
delete(channelConns, wc)
}
}
for i := len(filtered); i < len(webConns); i++ {
webConns[i] = nil
}
i.byChannelID[chID] = filtered
}
delete(i.byConnection, wc)
delete(i.byConnectionId, connectionID)
delete(i.byConnectionId, wc.GetConnectionID())
}
func (i *hubConnectionIndex) InvalidateCMCacheForUser(userID string) error {
@ -753,25 +768,40 @@ func (i *hubConnectionIndex) InvalidateCMCacheForUser(userID string) error {
return err
}
// Clear out all user entries which belong to channels.
for chID, webConns := range i.byChannelID {
// https://go.dev/wiki/SliceTricks#filtering-without-allocating
filtered := webConns[:0]
for _, conn := range webConns {
if conn.UserId != userID {
filtered = append(filtered, conn)
// Get all connections for this user
conns := i.ForUser(userID)
// Remove all user connections from existing channels
for _, conn := range conns {
if meta, ok := i.byConnection[conn]; ok {
// Remove from old channels
for _, chID := range meta.channelIDs {
if channelConns, ok := i.byChannelID[chID]; ok {
delete(channelConns, conn)
}
}
}
for i := len(filtered); i < len(webConns); i++ {
webConns[i] = nil
}
i.byChannelID[chID] = filtered
}
// re-populate the cache
for chID := range cm {
i.byChannelID[chID] = append(i.byChannelID[chID], i.ForUser(userID)...)
// Add connections to new channels
for _, conn := range conns {
newChannelIDs := make([]string, 0, len(cm))
for chID := range cm {
newChannelIDs = append(newChannelIDs, chID)
// Initialize channel map if needed
if _, ok := i.byChannelID[chID]; !ok {
i.byChannelID[chID] = make(map[*WebConn]struct{})
}
i.byChannelID[chID][conn] = struct{}{}
}
// Update connection metadata
if meta, ok := i.byConnection[conn]; ok {
meta.channelIDs = newChannelIDs
i.byConnection[conn] = meta
}
}
return nil
}
@ -782,26 +812,31 @@ func (i *hubConnectionIndex) Has(wc *WebConn) bool {
// ForUser returns all connections for a user ID.
func (i *hubConnectionIndex) ForUser(id string) []*WebConn {
// Fast path if there is only one or fewer connection.
if len(i.byUserId[id]) <= 1 {
return i.byUserId[id]
userConns, ok := i.byUserId[id]
if !ok {
return nil
}
// Move to using maps.Keys to use the iterator pattern with 1.23.
// This saves the additional slice copy.
conns := make([]*WebConn, 0, len(userConns))
for conn := range userConns {
conns = append(conns, conn)
}
// If there are multiple connections per user,
// then we have to return a clone of the slice
// to allow connIndex.Remove to be safely called while
// iterating the slice.
conns := make([]*WebConn, len(i.byUserId[id]))
copy(conns, i.byUserId[id])
return conns
}
// ForChannel returns all connections for a channelID.
func (i *hubConnectionIndex) ForChannel(channelID string) []*WebConn {
// Note: this is expensive because usually there will be
// more than 1 member for a channel, and broadcasting
// is a hot path, but worth it.
conns := make([]*WebConn, len(i.byChannelID[channelID]))
copy(conns, i.byChannelID[channelID])
channelConns, ok := i.byChannelID[channelID]
if !ok {
return nil
}
conns := make([]*WebConn, 0, len(channelConns))
for conn := range channelConns {
conns = append(conns, conn)
}
return conns
}
@ -822,7 +857,7 @@ func (i *hubConnectionIndex) ForConnection(id string) *WebConn {
}
// All returns the full webConn index.
func (i *hubConnectionIndex) All() map[*WebConn]int {
func (i *hubConnectionIndex) All() map[*WebConn]connMetadata {
return i.byConnection
}

View File

@ -6,6 +6,7 @@ package platform
import (
"bytes"
"encoding/json"
"fmt"
"net"
"net/http"
"net/http/httptest"
@ -197,154 +198,158 @@ func TestHubConnIndex(t *testing.T) {
})
require.NoError(t, err)
t.Run("Basic", func(t *testing.T) {
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger)
for _, fastIterate := range []bool{true, false} {
t.Run(fmt.Sprintf("fastIterate=%t", fastIterate), func(t *testing.T) {
t.Run("Basic", func(t *testing.T) {
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, fastIterate)
// User1
wc1 := &WebConn{
Platform: th.Service,
Suite: th.Suite,
UserId: model.NewId(),
}
wc1.SetConnectionID(model.NewId())
wc1.SetSession(&model.Session{})
// User1
wc1 := &WebConn{
Platform: th.Service,
Suite: th.Suite,
UserId: model.NewId(),
}
wc1.SetConnectionID(model.NewId())
wc1.SetSession(&model.Session{})
// User2
wc2 := &WebConn{
Platform: th.Service,
Suite: th.Suite,
UserId: model.NewId(),
}
wc2.SetConnectionID(model.NewId())
wc2.SetSession(&model.Session{})
// User2
wc2 := &WebConn{
Platform: th.Service,
Suite: th.Suite,
UserId: model.NewId(),
}
wc2.SetConnectionID(model.NewId())
wc2.SetSession(&model.Session{})
wc3 := &WebConn{
Platform: th.Service,
Suite: th.Suite,
UserId: wc2.UserId,
}
wc3.SetConnectionID(model.NewId())
wc3.SetSession(&model.Session{})
wc3 := &WebConn{
Platform: th.Service,
Suite: th.Suite,
UserId: wc2.UserId,
}
wc3.SetConnectionID(model.NewId())
wc3.SetSession(&model.Session{})
wc4 := &WebConn{
Platform: th.Service,
Suite: th.Suite,
UserId: wc2.UserId,
}
wc4.SetConnectionID(model.NewId())
wc4.SetSession(&model.Session{})
wc4 := &WebConn{
Platform: th.Service,
Suite: th.Suite,
UserId: wc2.UserId,
}
wc4.SetConnectionID(model.NewId())
wc4.SetSession(&model.Session{})
connIndex.Add(wc1)
connIndex.Add(wc2)
connIndex.Add(wc3)
connIndex.Add(wc4)
connIndex.Add(wc1)
connIndex.Add(wc2)
connIndex.Add(wc3)
connIndex.Add(wc4)
t.Run("Basic", func(t *testing.T) {
assert.True(t, connIndex.Has(wc1))
assert.True(t, connIndex.Has(wc2))
t.Run("Basic", func(t *testing.T) {
assert.True(t, connIndex.Has(wc1))
assert.True(t, connIndex.Has(wc2))
assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2, wc3, wc4})
assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{wc1})
assert.True(t, connIndex.Has(wc2))
assert.True(t, connIndex.Has(wc1))
assert.Len(t, connIndex.All(), 4)
assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2, wc3, wc4})
assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{wc1})
assert.True(t, connIndex.Has(wc2))
assert.True(t, connIndex.Has(wc1))
assert.Len(t, connIndex.All(), 4)
})
t.Run("RemoveMiddleUser2", func(t *testing.T) {
connIndex.Remove(wc3) // Remove from middle from user2
assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2, wc4})
assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{wc1})
assert.True(t, connIndex.Has(wc2))
assert.False(t, connIndex.Has(wc3))
assert.True(t, connIndex.Has(wc4))
assert.Len(t, connIndex.All(), 3)
})
t.Run("RemoveUser1", func(t *testing.T) {
connIndex.Remove(wc1) // Remove sole connection from user1
assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2, wc4})
assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{})
assert.Len(t, connIndex.ForUser(wc1.UserId), 0)
assert.Len(t, connIndex.All(), 2)
assert.False(t, connIndex.Has(wc1))
assert.True(t, connIndex.Has(wc2))
})
t.Run("RemoveEndUser2", func(t *testing.T) {
connIndex.Remove(wc4) // Remove from end from user2
assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2})
assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{})
assert.True(t, connIndex.Has(wc2))
assert.False(t, connIndex.Has(wc3))
assert.False(t, connIndex.Has(wc4))
assert.Len(t, connIndex.All(), 1)
})
})
t.Run("ByConnectionId", func(t *testing.T) {
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, fastIterate)
// User1
wc1ID := model.NewId()
wc1 := &WebConn{
Platform: th.Service,
Suite: th.Suite,
UserId: th.BasicUser.Id,
}
wc1.SetConnectionID(wc1ID)
wc1.SetSession(&model.Session{})
// User2
wc2ID := model.NewId()
wc2 := &WebConn{
Platform: th.Service,
Suite: th.Suite,
UserId: th.BasicUser2.Id,
}
wc2.SetConnectionID(wc2ID)
wc2.SetSession(&model.Session{})
wc3ID := model.NewId()
wc3 := &WebConn{
Platform: th.Service,
Suite: th.Suite,
UserId: wc2.UserId,
}
wc3.SetConnectionID(wc3ID)
wc3.SetSession(&model.Session{})
t.Run("no connections", func(t *testing.T) {
assert.False(t, connIndex.Has(wc1))
assert.False(t, connIndex.Has(wc2))
assert.False(t, connIndex.Has(wc3))
assert.Empty(t, connIndex.byConnectionId)
})
t.Run("adding", func(t *testing.T) {
connIndex.Add(wc1)
connIndex.Add(wc3)
assert.Len(t, connIndex.byConnectionId, 2)
assert.Equal(t, wc1, connIndex.ForConnection(wc1ID))
assert.Equal(t, wc3, connIndex.ForConnection(wc3ID))
assert.Equal(t, (*WebConn)(nil), connIndex.ForConnection(wc2ID))
})
t.Run("removing", func(t *testing.T) {
connIndex.Remove(wc3)
assert.Len(t, connIndex.byConnectionId, 1)
assert.Equal(t, wc1, connIndex.ForConnection(wc1ID))
assert.Equal(t, (*WebConn)(nil), connIndex.ForConnection(wc3ID))
assert.Equal(t, (*WebConn)(nil), connIndex.ForConnection(wc2ID))
})
})
})
t.Run("RemoveMiddleUser2", func(t *testing.T) {
connIndex.Remove(wc3) // Remove from middle from user2
assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2, wc4})
assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{wc1})
assert.True(t, connIndex.Has(wc2))
assert.False(t, connIndex.Has(wc3))
assert.True(t, connIndex.Has(wc4))
assert.Len(t, connIndex.All(), 3)
})
t.Run("RemoveUser1", func(t *testing.T) {
connIndex.Remove(wc1) // Remove sole connection from user1
assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2, wc4})
assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{})
assert.Len(t, connIndex.ForUser(wc1.UserId), 0)
assert.Len(t, connIndex.All(), 2)
assert.False(t, connIndex.Has(wc1))
assert.True(t, connIndex.Has(wc2))
})
t.Run("RemoveEndUser2", func(t *testing.T) {
connIndex.Remove(wc4) // Remove from end from user2
assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2})
assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{})
assert.True(t, connIndex.Has(wc2))
assert.False(t, connIndex.Has(wc3))
assert.False(t, connIndex.Has(wc4))
assert.Len(t, connIndex.All(), 1)
})
})
t.Run("ByConnectionId", func(t *testing.T) {
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger)
// User1
wc1ID := model.NewId()
wc1 := &WebConn{
Platform: th.Service,
Suite: th.Suite,
UserId: th.BasicUser.Id,
}
wc1.SetConnectionID(wc1ID)
wc1.SetSession(&model.Session{})
// User2
wc2ID := model.NewId()
wc2 := &WebConn{
Platform: th.Service,
Suite: th.Suite,
UserId: th.BasicUser2.Id,
}
wc2.SetConnectionID(wc2ID)
wc2.SetSession(&model.Session{})
wc3ID := model.NewId()
wc3 := &WebConn{
Platform: th.Service,
Suite: th.Suite,
UserId: wc2.UserId,
}
wc3.SetConnectionID(wc3ID)
wc3.SetSession(&model.Session{})
t.Run("no connections", func(t *testing.T) {
assert.False(t, connIndex.Has(wc1))
assert.False(t, connIndex.Has(wc2))
assert.False(t, connIndex.Has(wc3))
assert.Empty(t, connIndex.byConnectionId)
})
t.Run("adding", func(t *testing.T) {
connIndex.Add(wc1)
connIndex.Add(wc3)
assert.Len(t, connIndex.byConnectionId, 2)
assert.Equal(t, wc1, connIndex.ForConnection(wc1ID))
assert.Equal(t, wc3, connIndex.ForConnection(wc3ID))
assert.Equal(t, (*WebConn)(nil), connIndex.ForConnection(wc2ID))
})
t.Run("removing", func(t *testing.T) {
connIndex.Remove(wc3)
assert.Len(t, connIndex.byConnectionId, 1)
assert.Equal(t, wc1, connIndex.ForConnection(wc1ID))
assert.Equal(t, (*WebConn)(nil), connIndex.ForConnection(wc3ID))
assert.Equal(t, (*WebConn)(nil), connIndex.ForConnection(wc2ID))
})
})
}
t.Run("ByChannelId", func(t *testing.T) {
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger)
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, true)
// User1
wc1ID := model.NewId()
@ -414,7 +419,7 @@ func TestHubConnIndexIncorrectRemoval(t *testing.T) {
th := Setup(t)
defer th.TearDown()
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger)
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, false)
// User2
wc2 := &WebConn{
@ -461,7 +466,7 @@ func TestHubConnIndexInactive(t *testing.T) {
th := Setup(t)
defer th.TearDown()
connIndex := newHubConnectionIndex(2*time.Second, th.Service.Store, th.Service.logger)
connIndex := newHubConnectionIndex(2*time.Second, th.Service.Store, th.Service.logger, false)
// User1
wc1 := &WebConn{
@ -621,7 +626,7 @@ func TestHubWebConnCount(t *testing.T) {
func BenchmarkHubConnIndex(b *testing.B) {
th := Setup(b).InitBasic()
defer th.TearDown()
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger)
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, false)
// User1
wc1 := &WebConn{
@ -666,7 +671,7 @@ func TestHubConnIndexRemoveMemLeak(t *testing.T) {
th := Setup(t)
defer th.TearDown()
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger)
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, false)
wc := &WebConn{
Platform: th.Service,

View File

@ -434,6 +434,7 @@ type ServiceSettings struct {
MaximumPayloadSizeBytes *int64 `access:"environment_file_storage,write_restrictable,cloud_restrictable"`
MaximumURLLength *int `access:"environment_file_storage,write_restrictable,cloud_restrictable"`
ScheduledPosts *bool `access:"site_posts"`
EnableWebHubChannelIteration *bool `access:"write_restrictable,cloud_restrictable"` // telemetry: none
}
var MattermostGiphySdkKey string
@ -961,6 +962,10 @@ func (s *ServiceSettings) SetDefaults(isUpdate bool) {
if s.ScheduledPosts == nil {
s.ScheduledPosts = NewPointer(true)
}
if s.EnableWebHubChannelIteration == nil {
s.EnableWebHubChannelIteration = NewPointer(false)
}
}
type CacheSettings struct {