mirror of
https://github.com/mattermost/mattermost.git
synced 2025-02-25 18:55:24 -06:00
MM-62960: Improve webConn remove performance from hubConnectionIndex (#30178)
When we added iteration by channelID, this was a known tradeoff during that. However, it has been observed that the regular connection removal function creates considerable blocking of the processing loop, leading to high CPU usage and API latencies. To fix this, we add a reverse mapping of channelIDs to connections and their positions in the slice. This helps us to remove the connection from the slice without iteration. Unfortunately, this still needs to iterate through all channelIDs during invalidation of the channel member cache. However, the user cache invalidation is not a regular activity. So it should be an acceptable tradeoff to make. https://mattermost.atlassian.net/browse/MM-62960 ```release-note A new config knob ServiceSettings.EnableWebHubChannelIteration which allows a user to control the performance of websocket broadcasting. By default, this setting is turned off. If it is turned on, it improves the websocket broadcasting performance at the expense of poor performance when users join/leave a channel. It is not recommended to turn it on unless you have atleast 200,000 concurrent users actively using MM. ``` Co-authored-by: Mattermost Build <build@mattermost.com>
This commit is contained in:
parent
06d8c92504
commit
da7192246e
@ -3475,7 +3475,9 @@ func TestWebHubMembership(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestWebHubCloseConnOnDBFail(t *testing.T) {
|
||||
th := Setup(t).InitBasic()
|
||||
th := SetupConfig(t, func(cfg *model.Config) {
|
||||
*cfg.ServiceSettings.EnableWebHubChannelIteration = true
|
||||
}).InitBasic()
|
||||
defer func() {
|
||||
th.TearDown()
|
||||
_, err := th.Server.Store().GetInternalMasterDB().Exec(`ALTER TABLE dummy RENAME to ChannelMembers`)
|
||||
|
@ -27,6 +27,7 @@ import (
|
||||
"github.com/mattermost/mattermost/server/public/shared/i18n"
|
||||
"github.com/mattermost/mattermost/server/public/shared/mlog"
|
||||
"github.com/mattermost/mattermost/server/public/shared/request"
|
||||
"github.com/mattermost/mattermost/server/v8/channels/store/sqlstore"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -95,8 +96,10 @@ type WebConn struct {
|
||||
UserId string
|
||||
PostedAck bool
|
||||
|
||||
lastUserActivityAt int64
|
||||
send chan model.WebSocketMessage
|
||||
allChannelMembers map[string]string
|
||||
lastAllChannelMembersTime int64
|
||||
lastUserActivityAt int64
|
||||
send chan model.WebSocketMessage
|
||||
// deadQueue behaves like a queue of a finite size
|
||||
// which is used to store all messages that are sent via the websocket.
|
||||
// It basically acts as the user-space socket buffer, and is used
|
||||
@ -758,6 +761,8 @@ func (wc *WebConn) drainDeadQueue(index int) error {
|
||||
|
||||
// InvalidateCache resets all internal data of the WebConn.
|
||||
func (wc *WebConn) InvalidateCache() {
|
||||
wc.allChannelMembers = nil
|
||||
wc.lastAllChannelMembersTime = 0
|
||||
wc.SetSession(nil)
|
||||
wc.SetSessionExpiresAt(0)
|
||||
}
|
||||
@ -938,9 +943,36 @@ func (wc *WebConn) ShouldSendEvent(msg *model.WebSocketEvent) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// We don't need to do any further checks because this is already scoped
|
||||
// to channel members from web_hub.
|
||||
return true
|
||||
if *wc.Platform.Config().ServiceSettings.EnableWebHubChannelIteration {
|
||||
// We don't need to do any further checks because this is already scoped
|
||||
// to channel members from web_hub.
|
||||
return true
|
||||
}
|
||||
|
||||
if model.GetMillis()-wc.lastAllChannelMembersTime > webConnMemberCacheTime {
|
||||
wc.allChannelMembers = nil
|
||||
wc.lastAllChannelMembersTime = 0
|
||||
}
|
||||
|
||||
if wc.allChannelMembers == nil {
|
||||
result, err := wc.Platform.Store.Channel().GetAllChannelMembersForUser(
|
||||
sqlstore.RequestContextWithMaster(request.EmptyContext(wc.Platform.logger)),
|
||||
wc.UserId,
|
||||
false,
|
||||
false,
|
||||
)
|
||||
if err != nil {
|
||||
mlog.Error("webhub.shouldSendEvent.", mlog.Err(err))
|
||||
return false
|
||||
}
|
||||
wc.allChannelMembers = result
|
||||
wc.lastAllChannelMembersTime = model.GetMillis()
|
||||
}
|
||||
|
||||
if _, ok := wc.allChannelMembers[chID]; ok {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Only report events to users who are in the team for the event
|
||||
|
@ -486,6 +486,7 @@ func (h *Hub) Start() {
|
||||
connIndex := newHubConnectionIndex(inactiveConnReaperInterval,
|
||||
h.platform.Store,
|
||||
h.platform.logger,
|
||||
*h.platform.Config().ServiceSettings.EnableWebHubChannelIteration,
|
||||
)
|
||||
|
||||
for {
|
||||
@ -599,6 +600,11 @@ func (h *Hub) Start() {
|
||||
for _, webConn := range connIndex.ForUser(userID) {
|
||||
webConn.InvalidateCache()
|
||||
}
|
||||
|
||||
if !*h.platform.Config().ServiceSettings.EnableWebHubChannelIteration {
|
||||
continue
|
||||
}
|
||||
|
||||
err := connIndex.InvalidateCMCacheForUser(userID)
|
||||
if err != nil {
|
||||
h.platform.Log().Error("Error while invalidating channel member cache", mlog.String("user_id", userID), mlog.Err(err))
|
||||
@ -666,7 +672,7 @@ func (h *Hub) Start() {
|
||||
}
|
||||
} else if userID := msg.GetBroadcast().UserId; userID != "" {
|
||||
targetConns = connIndex.ForUser(userID)
|
||||
} else if channelID := msg.GetBroadcast().ChannelId; channelID != "" {
|
||||
} else if channelID := msg.GetBroadcast().ChannelId; channelID != "" && *h.platform.Config().ServiceSettings.EnableWebHubChannelIteration {
|
||||
targetConns = connIndex.ForChannel(channelID)
|
||||
}
|
||||
if targetConns != nil {
|
||||
@ -733,6 +739,10 @@ func closeAndRemoveConn(connIndex *hubConnectionIndex, conn *WebConn) {
|
||||
connIndex.Remove(conn)
|
||||
}
|
||||
|
||||
type connMetadata struct {
|
||||
channelIDs []string
|
||||
}
|
||||
|
||||
// hubConnectionIndex provides fast addition, removal, and iteration of web connections.
|
||||
// It requires 4 functionalities which need to be very fast:
|
||||
// - check if a connection exists or not.
|
||||
@ -740,90 +750,95 @@ func closeAndRemoveConn(connIndex *hubConnectionIndex, conn *WebConn) {
|
||||
// - get all connections for a given channelID.
|
||||
// - get all connections.
|
||||
type hubConnectionIndex struct {
|
||||
// byUserId stores the list of connections for a given userID
|
||||
byUserId map[string][]*WebConn
|
||||
// byChannelID stores the list of connections for a given channelID.
|
||||
byChannelID map[string][]*WebConn
|
||||
// byConnection serves the dual purpose of storing the index of the webconn
|
||||
// in the value of byUserId map, and also to get all connections.
|
||||
byConnection map[*WebConn]int
|
||||
// byUserId stores the set of connections for a given userID
|
||||
byUserId map[string]map[*WebConn]struct{}
|
||||
// byChannelID stores the set of connections for a given channelID
|
||||
byChannelID map[string]map[*WebConn]struct{}
|
||||
// byConnection serves the dual purpose of storing the channelIDs
|
||||
// and also to get all connections
|
||||
byConnection map[*WebConn]connMetadata
|
||||
byConnectionId map[string]*WebConn
|
||||
// staleThreshold is the limit beyond which inactive connections
|
||||
// will be deleted.
|
||||
staleThreshold time.Duration
|
||||
|
||||
store store.Store
|
||||
logger mlog.LoggerIFace
|
||||
fastIteration bool
|
||||
store store.Store
|
||||
logger mlog.LoggerIFace
|
||||
}
|
||||
|
||||
func newHubConnectionIndex(interval time.Duration,
|
||||
store store.Store,
|
||||
logger mlog.LoggerIFace,
|
||||
fastIteration bool,
|
||||
) *hubConnectionIndex {
|
||||
return &hubConnectionIndex{
|
||||
byUserId: make(map[string][]*WebConn),
|
||||
byChannelID: make(map[string][]*WebConn),
|
||||
byConnection: make(map[*WebConn]int),
|
||||
byUserId: make(map[string]map[*WebConn]struct{}),
|
||||
byChannelID: make(map[string]map[*WebConn]struct{}),
|
||||
byConnection: make(map[*WebConn]connMetadata),
|
||||
byConnectionId: make(map[string]*WebConn),
|
||||
staleThreshold: interval,
|
||||
store: store,
|
||||
logger: logger,
|
||||
fastIteration: fastIteration,
|
||||
}
|
||||
}
|
||||
|
||||
func (i *hubConnectionIndex) Add(wc *WebConn) error {
|
||||
cm, err := i.store.Channel().GetAllChannelMembersForUser(request.EmptyContext(i.logger), wc.UserId, false, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getChannelMembersForUser: %v", err)
|
||||
}
|
||||
for chID := range cm {
|
||||
i.byChannelID[chID] = append(i.byChannelID[chID], wc)
|
||||
var channelIDs []string
|
||||
if i.fastIteration {
|
||||
cm, err := i.store.Channel().GetAllChannelMembersForUser(request.EmptyContext(i.logger), wc.UserId, false, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getChannelMembersForUser: %v", err)
|
||||
}
|
||||
|
||||
// Store channel IDs and add to byChannelID
|
||||
channelIDs = make([]string, 0, len(cm))
|
||||
for chID := range cm {
|
||||
channelIDs = append(channelIDs, chID)
|
||||
|
||||
// Initialize the channel's map if it doesn't exist
|
||||
if _, ok := i.byChannelID[chID]; !ok {
|
||||
i.byChannelID[chID] = make(map[*WebConn]struct{})
|
||||
}
|
||||
i.byChannelID[chID][wc] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
i.byUserId[wc.UserId] = append(i.byUserId[wc.UserId], wc)
|
||||
i.byConnection[wc] = len(i.byUserId[wc.UserId]) - 1
|
||||
// Initialize the user's map if it doesn't exist
|
||||
if _, ok := i.byUserId[wc.UserId]; !ok {
|
||||
i.byUserId[wc.UserId] = make(map[*WebConn]struct{})
|
||||
}
|
||||
i.byUserId[wc.UserId][wc] = struct{}{}
|
||||
i.byConnection[wc] = connMetadata{
|
||||
channelIDs: channelIDs,
|
||||
}
|
||||
i.byConnectionId[wc.GetConnectionID()] = wc
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *hubConnectionIndex) Remove(wc *WebConn) {
|
||||
userConnIndex, ok := i.byConnection[wc]
|
||||
connMeta, ok := i.byConnection[wc]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// Remove the wc from i.byUserId
|
||||
// get the conn slice.
|
||||
userConnections := i.byUserId[wc.UserId]
|
||||
// get the last connection.
|
||||
last := userConnections[len(userConnections)-1]
|
||||
// https://go.dev/wiki/SliceTricks#delete-without-preserving-order
|
||||
userConnections[userConnIndex] = last
|
||||
userConnections[len(userConnections)-1] = nil
|
||||
i.byUserId[wc.UserId] = userConnections[:len(userConnections)-1]
|
||||
// set the index of the connection that was moved to the new index.
|
||||
i.byConnection[last] = userConnIndex
|
||||
// Remove from byUserId
|
||||
if userConns, ok := i.byUserId[wc.UserId]; ok {
|
||||
delete(userConns, wc)
|
||||
}
|
||||
|
||||
connectionID := wc.GetConnectionID()
|
||||
// Remove webconns from i.byChannelID
|
||||
// This has O(n) complexity. We are trading off speed while removing
|
||||
// a connection, to improve broadcasting a message.
|
||||
for chID, webConns := range i.byChannelID {
|
||||
// https://go.dev/wiki/SliceTricks#filtering-without-allocating
|
||||
filtered := webConns[:0]
|
||||
for _, conn := range webConns {
|
||||
if conn.GetConnectionID() != connectionID {
|
||||
filtered = append(filtered, conn)
|
||||
if i.fastIteration {
|
||||
// Remove from byChannelID for each channel
|
||||
for _, chID := range connMeta.channelIDs {
|
||||
if channelConns, ok := i.byChannelID[chID]; ok {
|
||||
delete(channelConns, wc)
|
||||
}
|
||||
}
|
||||
for i := len(filtered); i < len(webConns); i++ {
|
||||
webConns[i] = nil
|
||||
}
|
||||
i.byChannelID[chID] = filtered
|
||||
}
|
||||
|
||||
delete(i.byConnection, wc)
|
||||
delete(i.byConnectionId, connectionID)
|
||||
delete(i.byConnectionId, wc.GetConnectionID())
|
||||
}
|
||||
|
||||
func (i *hubConnectionIndex) InvalidateCMCacheForUser(userID string) error {
|
||||
@ -833,25 +848,40 @@ func (i *hubConnectionIndex) InvalidateCMCacheForUser(userID string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Clear out all user entries which belong to channels.
|
||||
for chID, webConns := range i.byChannelID {
|
||||
// https://go.dev/wiki/SliceTricks#filtering-without-allocating
|
||||
filtered := webConns[:0]
|
||||
for _, conn := range webConns {
|
||||
if conn.UserId != userID {
|
||||
filtered = append(filtered, conn)
|
||||
// Get all connections for this user
|
||||
conns := i.ForUser(userID)
|
||||
|
||||
// Remove all user connections from existing channels
|
||||
for _, conn := range conns {
|
||||
if meta, ok := i.byConnection[conn]; ok {
|
||||
// Remove from old channels
|
||||
for _, chID := range meta.channelIDs {
|
||||
if channelConns, ok := i.byChannelID[chID]; ok {
|
||||
delete(channelConns, conn)
|
||||
}
|
||||
}
|
||||
}
|
||||
for i := len(filtered); i < len(webConns); i++ {
|
||||
webConns[i] = nil
|
||||
}
|
||||
i.byChannelID[chID] = filtered
|
||||
}
|
||||
|
||||
// re-populate the cache
|
||||
for chID := range cm {
|
||||
i.byChannelID[chID] = append(i.byChannelID[chID], i.ForUser(userID)...)
|
||||
// Add connections to new channels
|
||||
for _, conn := range conns {
|
||||
newChannelIDs := make([]string, 0, len(cm))
|
||||
for chID := range cm {
|
||||
newChannelIDs = append(newChannelIDs, chID)
|
||||
// Initialize channel map if needed
|
||||
if _, ok := i.byChannelID[chID]; !ok {
|
||||
i.byChannelID[chID] = make(map[*WebConn]struct{})
|
||||
}
|
||||
i.byChannelID[chID][conn] = struct{}{}
|
||||
}
|
||||
|
||||
// Update connection metadata
|
||||
if meta, ok := i.byConnection[conn]; ok {
|
||||
meta.channelIDs = newChannelIDs
|
||||
i.byConnection[conn] = meta
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -862,26 +892,31 @@ func (i *hubConnectionIndex) Has(wc *WebConn) bool {
|
||||
|
||||
// ForUser returns all connections for a user ID.
|
||||
func (i *hubConnectionIndex) ForUser(id string) []*WebConn {
|
||||
// Fast path if there is only one or fewer connection.
|
||||
if len(i.byUserId[id]) <= 1 {
|
||||
return i.byUserId[id]
|
||||
userConns, ok := i.byUserId[id]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Move to using maps.Keys to use the iterator pattern with 1.23.
|
||||
// This saves the additional slice copy.
|
||||
conns := make([]*WebConn, 0, len(userConns))
|
||||
for conn := range userConns {
|
||||
conns = append(conns, conn)
|
||||
}
|
||||
// If there are multiple connections per user,
|
||||
// then we have to return a clone of the slice
|
||||
// to allow connIndex.Remove to be safely called while
|
||||
// iterating the slice.
|
||||
conns := make([]*WebConn, len(i.byUserId[id]))
|
||||
copy(conns, i.byUserId[id])
|
||||
return conns
|
||||
}
|
||||
|
||||
// ForChannel returns all connections for a channelID.
|
||||
func (i *hubConnectionIndex) ForChannel(channelID string) []*WebConn {
|
||||
// Note: this is expensive because usually there will be
|
||||
// more than 1 member for a channel, and broadcasting
|
||||
// is a hot path, but worth it.
|
||||
conns := make([]*WebConn, len(i.byChannelID[channelID]))
|
||||
copy(conns, i.byChannelID[channelID])
|
||||
channelConns, ok := i.byChannelID[channelID]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
conns := make([]*WebConn, 0, len(channelConns))
|
||||
for conn := range channelConns {
|
||||
conns = append(conns, conn)
|
||||
}
|
||||
return conns
|
||||
}
|
||||
|
||||
@ -902,7 +937,7 @@ func (i *hubConnectionIndex) ForConnection(id string) *WebConn {
|
||||
}
|
||||
|
||||
// All returns the full webConn index.
|
||||
func (i *hubConnectionIndex) All() map[*WebConn]int {
|
||||
func (i *hubConnectionIndex) All() map[*WebConn]connMetadata {
|
||||
return i.byConnection
|
||||
}
|
||||
|
||||
|
@ -6,6 +6,7 @@ package platform
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
@ -197,154 +198,158 @@ func TestHubConnIndex(t *testing.T) {
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Run("Basic", func(t *testing.T) {
|
||||
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger)
|
||||
for _, fastIterate := range []bool{true, false} {
|
||||
t.Run(fmt.Sprintf("fastIterate=%t", fastIterate), func(t *testing.T) {
|
||||
t.Run("Basic", func(t *testing.T) {
|
||||
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, fastIterate)
|
||||
|
||||
// User1
|
||||
wc1 := &WebConn{
|
||||
Platform: th.Service,
|
||||
Suite: th.Suite,
|
||||
UserId: model.NewId(),
|
||||
}
|
||||
wc1.SetConnectionID(model.NewId())
|
||||
wc1.SetSession(&model.Session{})
|
||||
// User1
|
||||
wc1 := &WebConn{
|
||||
Platform: th.Service,
|
||||
Suite: th.Suite,
|
||||
UserId: model.NewId(),
|
||||
}
|
||||
wc1.SetConnectionID(model.NewId())
|
||||
wc1.SetSession(&model.Session{})
|
||||
|
||||
// User2
|
||||
wc2 := &WebConn{
|
||||
Platform: th.Service,
|
||||
Suite: th.Suite,
|
||||
UserId: model.NewId(),
|
||||
}
|
||||
wc2.SetConnectionID(model.NewId())
|
||||
wc2.SetSession(&model.Session{})
|
||||
// User2
|
||||
wc2 := &WebConn{
|
||||
Platform: th.Service,
|
||||
Suite: th.Suite,
|
||||
UserId: model.NewId(),
|
||||
}
|
||||
wc2.SetConnectionID(model.NewId())
|
||||
wc2.SetSession(&model.Session{})
|
||||
|
||||
wc3 := &WebConn{
|
||||
Platform: th.Service,
|
||||
Suite: th.Suite,
|
||||
UserId: wc2.UserId,
|
||||
}
|
||||
wc3.SetConnectionID(model.NewId())
|
||||
wc3.SetSession(&model.Session{})
|
||||
wc3 := &WebConn{
|
||||
Platform: th.Service,
|
||||
Suite: th.Suite,
|
||||
UserId: wc2.UserId,
|
||||
}
|
||||
wc3.SetConnectionID(model.NewId())
|
||||
wc3.SetSession(&model.Session{})
|
||||
|
||||
wc4 := &WebConn{
|
||||
Platform: th.Service,
|
||||
Suite: th.Suite,
|
||||
UserId: wc2.UserId,
|
||||
}
|
||||
wc4.SetConnectionID(model.NewId())
|
||||
wc4.SetSession(&model.Session{})
|
||||
wc4 := &WebConn{
|
||||
Platform: th.Service,
|
||||
Suite: th.Suite,
|
||||
UserId: wc2.UserId,
|
||||
}
|
||||
wc4.SetConnectionID(model.NewId())
|
||||
wc4.SetSession(&model.Session{})
|
||||
|
||||
connIndex.Add(wc1)
|
||||
connIndex.Add(wc2)
|
||||
connIndex.Add(wc3)
|
||||
connIndex.Add(wc4)
|
||||
connIndex.Add(wc1)
|
||||
connIndex.Add(wc2)
|
||||
connIndex.Add(wc3)
|
||||
connIndex.Add(wc4)
|
||||
|
||||
t.Run("Basic", func(t *testing.T) {
|
||||
assert.True(t, connIndex.Has(wc1))
|
||||
assert.True(t, connIndex.Has(wc2))
|
||||
t.Run("Basic", func(t *testing.T) {
|
||||
assert.True(t, connIndex.Has(wc1))
|
||||
assert.True(t, connIndex.Has(wc2))
|
||||
|
||||
assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2, wc3, wc4})
|
||||
assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{wc1})
|
||||
assert.True(t, connIndex.Has(wc2))
|
||||
assert.True(t, connIndex.Has(wc1))
|
||||
assert.Len(t, connIndex.All(), 4)
|
||||
assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2, wc3, wc4})
|
||||
assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{wc1})
|
||||
assert.True(t, connIndex.Has(wc2))
|
||||
assert.True(t, connIndex.Has(wc1))
|
||||
assert.Len(t, connIndex.All(), 4)
|
||||
})
|
||||
|
||||
t.Run("RemoveMiddleUser2", func(t *testing.T) {
|
||||
connIndex.Remove(wc3) // Remove from middle from user2
|
||||
|
||||
assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2, wc4})
|
||||
assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{wc1})
|
||||
assert.True(t, connIndex.Has(wc2))
|
||||
assert.False(t, connIndex.Has(wc3))
|
||||
assert.True(t, connIndex.Has(wc4))
|
||||
assert.Len(t, connIndex.All(), 3)
|
||||
})
|
||||
|
||||
t.Run("RemoveUser1", func(t *testing.T) {
|
||||
connIndex.Remove(wc1) // Remove sole connection from user1
|
||||
|
||||
assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2, wc4})
|
||||
assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{})
|
||||
assert.Len(t, connIndex.ForUser(wc1.UserId), 0)
|
||||
assert.Len(t, connIndex.All(), 2)
|
||||
assert.False(t, connIndex.Has(wc1))
|
||||
assert.True(t, connIndex.Has(wc2))
|
||||
})
|
||||
|
||||
t.Run("RemoveEndUser2", func(t *testing.T) {
|
||||
connIndex.Remove(wc4) // Remove from end from user2
|
||||
|
||||
assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2})
|
||||
assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{})
|
||||
assert.True(t, connIndex.Has(wc2))
|
||||
assert.False(t, connIndex.Has(wc3))
|
||||
assert.False(t, connIndex.Has(wc4))
|
||||
assert.Len(t, connIndex.All(), 1)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("ByConnectionId", func(t *testing.T) {
|
||||
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, fastIterate)
|
||||
|
||||
// User1
|
||||
wc1ID := model.NewId()
|
||||
wc1 := &WebConn{
|
||||
Platform: th.Service,
|
||||
Suite: th.Suite,
|
||||
UserId: th.BasicUser.Id,
|
||||
}
|
||||
wc1.SetConnectionID(wc1ID)
|
||||
wc1.SetSession(&model.Session{})
|
||||
|
||||
// User2
|
||||
wc2ID := model.NewId()
|
||||
wc2 := &WebConn{
|
||||
Platform: th.Service,
|
||||
Suite: th.Suite,
|
||||
UserId: th.BasicUser2.Id,
|
||||
}
|
||||
wc2.SetConnectionID(wc2ID)
|
||||
wc2.SetSession(&model.Session{})
|
||||
|
||||
wc3ID := model.NewId()
|
||||
wc3 := &WebConn{
|
||||
Platform: th.Service,
|
||||
Suite: th.Suite,
|
||||
UserId: wc2.UserId,
|
||||
}
|
||||
wc3.SetConnectionID(wc3ID)
|
||||
wc3.SetSession(&model.Session{})
|
||||
|
||||
t.Run("no connections", func(t *testing.T) {
|
||||
assert.False(t, connIndex.Has(wc1))
|
||||
assert.False(t, connIndex.Has(wc2))
|
||||
assert.False(t, connIndex.Has(wc3))
|
||||
assert.Empty(t, connIndex.byConnectionId)
|
||||
})
|
||||
|
||||
t.Run("adding", func(t *testing.T) {
|
||||
connIndex.Add(wc1)
|
||||
connIndex.Add(wc3)
|
||||
|
||||
assert.Len(t, connIndex.byConnectionId, 2)
|
||||
assert.Equal(t, wc1, connIndex.ForConnection(wc1ID))
|
||||
assert.Equal(t, wc3, connIndex.ForConnection(wc3ID))
|
||||
assert.Equal(t, (*WebConn)(nil), connIndex.ForConnection(wc2ID))
|
||||
})
|
||||
|
||||
t.Run("removing", func(t *testing.T) {
|
||||
connIndex.Remove(wc3)
|
||||
|
||||
assert.Len(t, connIndex.byConnectionId, 1)
|
||||
assert.Equal(t, wc1, connIndex.ForConnection(wc1ID))
|
||||
assert.Equal(t, (*WebConn)(nil), connIndex.ForConnection(wc3ID))
|
||||
assert.Equal(t, (*WebConn)(nil), connIndex.ForConnection(wc2ID))
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("RemoveMiddleUser2", func(t *testing.T) {
|
||||
connIndex.Remove(wc3) // Remove from middle from user2
|
||||
|
||||
assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2, wc4})
|
||||
assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{wc1})
|
||||
assert.True(t, connIndex.Has(wc2))
|
||||
assert.False(t, connIndex.Has(wc3))
|
||||
assert.True(t, connIndex.Has(wc4))
|
||||
assert.Len(t, connIndex.All(), 3)
|
||||
})
|
||||
|
||||
t.Run("RemoveUser1", func(t *testing.T) {
|
||||
connIndex.Remove(wc1) // Remove sole connection from user1
|
||||
|
||||
assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2, wc4})
|
||||
assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{})
|
||||
assert.Len(t, connIndex.ForUser(wc1.UserId), 0)
|
||||
assert.Len(t, connIndex.All(), 2)
|
||||
assert.False(t, connIndex.Has(wc1))
|
||||
assert.True(t, connIndex.Has(wc2))
|
||||
})
|
||||
|
||||
t.Run("RemoveEndUser2", func(t *testing.T) {
|
||||
connIndex.Remove(wc4) // Remove from end from user2
|
||||
|
||||
assert.ElementsMatch(t, connIndex.ForUser(wc2.UserId), []*WebConn{wc2})
|
||||
assert.ElementsMatch(t, connIndex.ForUser(wc1.UserId), []*WebConn{})
|
||||
assert.True(t, connIndex.Has(wc2))
|
||||
assert.False(t, connIndex.Has(wc3))
|
||||
assert.False(t, connIndex.Has(wc4))
|
||||
assert.Len(t, connIndex.All(), 1)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("ByConnectionId", func(t *testing.T) {
|
||||
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger)
|
||||
|
||||
// User1
|
||||
wc1ID := model.NewId()
|
||||
wc1 := &WebConn{
|
||||
Platform: th.Service,
|
||||
Suite: th.Suite,
|
||||
UserId: th.BasicUser.Id,
|
||||
}
|
||||
wc1.SetConnectionID(wc1ID)
|
||||
wc1.SetSession(&model.Session{})
|
||||
|
||||
// User2
|
||||
wc2ID := model.NewId()
|
||||
wc2 := &WebConn{
|
||||
Platform: th.Service,
|
||||
Suite: th.Suite,
|
||||
UserId: th.BasicUser2.Id,
|
||||
}
|
||||
wc2.SetConnectionID(wc2ID)
|
||||
wc2.SetSession(&model.Session{})
|
||||
|
||||
wc3ID := model.NewId()
|
||||
wc3 := &WebConn{
|
||||
Platform: th.Service,
|
||||
Suite: th.Suite,
|
||||
UserId: wc2.UserId,
|
||||
}
|
||||
wc3.SetConnectionID(wc3ID)
|
||||
wc3.SetSession(&model.Session{})
|
||||
|
||||
t.Run("no connections", func(t *testing.T) {
|
||||
assert.False(t, connIndex.Has(wc1))
|
||||
assert.False(t, connIndex.Has(wc2))
|
||||
assert.False(t, connIndex.Has(wc3))
|
||||
assert.Empty(t, connIndex.byConnectionId)
|
||||
})
|
||||
|
||||
t.Run("adding", func(t *testing.T) {
|
||||
connIndex.Add(wc1)
|
||||
connIndex.Add(wc3)
|
||||
|
||||
assert.Len(t, connIndex.byConnectionId, 2)
|
||||
assert.Equal(t, wc1, connIndex.ForConnection(wc1ID))
|
||||
assert.Equal(t, wc3, connIndex.ForConnection(wc3ID))
|
||||
assert.Equal(t, (*WebConn)(nil), connIndex.ForConnection(wc2ID))
|
||||
})
|
||||
|
||||
t.Run("removing", func(t *testing.T) {
|
||||
connIndex.Remove(wc3)
|
||||
|
||||
assert.Len(t, connIndex.byConnectionId, 1)
|
||||
assert.Equal(t, wc1, connIndex.ForConnection(wc1ID))
|
||||
assert.Equal(t, (*WebConn)(nil), connIndex.ForConnection(wc3ID))
|
||||
assert.Equal(t, (*WebConn)(nil), connIndex.ForConnection(wc2ID))
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("ByChannelId", func(t *testing.T) {
|
||||
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger)
|
||||
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, true)
|
||||
|
||||
// User1
|
||||
wc1ID := model.NewId()
|
||||
@ -414,7 +419,7 @@ func TestHubConnIndexIncorrectRemoval(t *testing.T) {
|
||||
th := Setup(t)
|
||||
defer th.TearDown()
|
||||
|
||||
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger)
|
||||
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, false)
|
||||
|
||||
// User2
|
||||
wc2 := &WebConn{
|
||||
@ -461,7 +466,7 @@ func TestHubConnIndexInactive(t *testing.T) {
|
||||
th := Setup(t)
|
||||
defer th.TearDown()
|
||||
|
||||
connIndex := newHubConnectionIndex(2*time.Second, th.Service.Store, th.Service.logger)
|
||||
connIndex := newHubConnectionIndex(2*time.Second, th.Service.Store, th.Service.logger, false)
|
||||
|
||||
// User1
|
||||
wc1 := &WebConn{
|
||||
@ -621,7 +626,7 @@ func TestHubWebConnCount(t *testing.T) {
|
||||
func BenchmarkHubConnIndex(b *testing.B) {
|
||||
th := Setup(b).InitBasic()
|
||||
defer th.TearDown()
|
||||
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger)
|
||||
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, false)
|
||||
|
||||
// User1
|
||||
wc1 := &WebConn{
|
||||
@ -666,7 +671,7 @@ func TestHubConnIndexRemoveMemLeak(t *testing.T) {
|
||||
th := Setup(t)
|
||||
defer th.TearDown()
|
||||
|
||||
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger)
|
||||
connIndex := newHubConnectionIndex(1*time.Second, th.Service.Store, th.Service.logger, false)
|
||||
|
||||
wc := &WebConn{
|
||||
Platform: th.Service,
|
||||
|
@ -439,6 +439,7 @@ type ServiceSettings struct {
|
||||
MaximumPayloadSizeBytes *int64 `access:"environment_file_storage,write_restrictable,cloud_restrictable"`
|
||||
MaximumURLLength *int `access:"environment_file_storage,write_restrictable,cloud_restrictable"`
|
||||
ScheduledPosts *bool `access:"site_posts"`
|
||||
EnableWebHubChannelIteration *bool `access:"write_restrictable,cloud_restrictable"` // telemetry: none
|
||||
}
|
||||
|
||||
var MattermostGiphySdkKey string
|
||||
@ -962,6 +963,10 @@ func (s *ServiceSettings) SetDefaults(isUpdate bool) {
|
||||
if s.ScheduledPosts == nil {
|
||||
s.ScheduledPosts = NewPointer(true)
|
||||
}
|
||||
|
||||
if s.EnableWebHubChannelIteration == nil {
|
||||
s.EnableWebHubChannelIteration = NewPointer(false)
|
||||
}
|
||||
}
|
||||
|
||||
type CacheSettings struct {
|
||||
|
Loading…
Reference in New Issue
Block a user