Improve websocket logging (#24259)

After reliable websockets were introduced,
we would keep pushing to a webConn even after
the client has disconnected. This would lead
to the websocket.slow/full and eventually
the disconnect messages which unnecessarily
confuse the admin.

We don't log them if the connection is inactive.

We use the active field and convert it to an atomic
to use more widely.

```release-note
NONE
```
This commit is contained in:
Agniva De Sarker 2023-08-16 11:03:05 +05:30 committed by GitHub
parent 2ed0c6495b
commit 0e75982f2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 28 additions and 24 deletions

View File

@ -98,9 +98,7 @@ type WebConn struct {
deadQueuePointer int
// active indicates whether there is an open websocket connection attached
// to this webConn or not.
// It is not used as an atomic, because there is no need to.
// So do not use this outside the web hub.
active bool
active atomic.Bool
// reuseCount indicates how many times this connection has been reused.
// This is used to differentiate between a fresh connection and
// a reused connection.
@ -220,7 +218,6 @@ func (ps *PlatformService) NewWebConn(cfg *WebConnConfig, suite SuiteIFace, runn
UserId: cfg.Session.UserId,
T: cfg.TFunc,
Locale: cfg.Locale,
active: cfg.Active,
reuseCount: cfg.ReuseCount,
endWritePump: make(chan struct{}),
pumpFinished: make(chan struct{}),
@ -228,6 +225,7 @@ func (ps *PlatformService) NewWebConn(cfg *WebConnConfig, suite SuiteIFace, runn
lastLogTimeSlow: time.Now(),
lastLogTimeFull: time.Now(),
}
wc.active.Store(cfg.Active)
wc.SetSession(&cfg.Session)
wc.SetSessionToken(cfg.Session.Token)
@ -295,7 +293,7 @@ func (wc *WebConn) GetConnectionID() string {
// are inactive or not.
func areAllInactive(conns []*WebConn) bool {
for _, conn := range conns {
if conn.active {
if conn.active.Load() {
return false
}
}
@ -474,7 +472,7 @@ func (wc *WebConn) writePump() {
continue
}
if len(wc.send) >= sendFullWarn && time.Since(wc.lastLogTimeFull) > websocketSuppressWarnThreshold {
if wc.active.Load() && len(wc.send) >= sendFullWarn && time.Since(wc.lastLogTimeFull) > websocketSuppressWarnThreshold {
logData := []mlog.Field{
mlog.String("user_id", wc.UserId),
mlog.String("conn_id", wc.GetConnectionID()),
@ -727,7 +725,7 @@ func (wc *WebConn) ShouldSendEvent(msg *model.WebSocketEvent) bool {
case model.WebsocketEventTyping,
model.WebsocketEventStatusChange,
model.WebsocketEventMultipleChannelsViewed:
if time.Since(wc.lastLogTimeSlow) > websocketSuppressWarnThreshold {
if wc.active.Load() && time.Since(wc.lastLogTimeSlow) > websocketSuppressWarnThreshold {
mlog.Warn(
"websocket.slow: dropping message",
mlog.String("user_id", wc.UserId),

View File

@ -383,7 +383,7 @@ func (h *Hub) Start() {
conns := connIndex.ForUser(webSessionMessage.userID)
var isRegistered bool
for _, conn := range conns {
if !conn.active {
if !conn.active.Load() {
continue
}
if conn.GetSessionToken() == webSessionMessage.sessionToken {
@ -411,7 +411,7 @@ func (h *Hub) Start() {
// Mark the current one as active.
// There is no need to check if it was inactive or not,
// we will anyways need to make it active.
webConn.active = true
webConn.active.Store(true)
connIndex.Add(webConn)
atomic.StoreInt64(&h.connectionCount, int64(connIndex.AllActive()))
@ -426,7 +426,7 @@ func (h *Hub) Start() {
case webConn := <-h.unregister:
// If already removed (via queue full), then removing again becomes a noop.
// But if not removed, mark inactive.
webConn.active = false
webConn.active.Store(false)
atomic.StoreInt64(&h.connectionCount, int64(connIndex.AllActive()))
@ -444,7 +444,7 @@ func (h *Hub) Start() {
}
var latestActivity int64
for _, conn := range conns {
if !conn.active {
if !conn.active.Load() {
continue
}
if conn.lastUserActivityAt > latestActivity {
@ -464,7 +464,7 @@ func (h *Hub) Start() {
}
case activity := <-h.activity:
for _, webConn := range connIndex.ForUser(activity.userID) {
if !webConn.active {
if !webConn.active.Load() {
continue
}
if webConn.GetSessionToken() == activity.sessionToken {
@ -478,9 +478,12 @@ func (h *Hub) Start() {
select {
case directMsg.conn.send <- directMsg.msg:
default:
mlog.Error("webhub.broadcast: cannot send, closing websocket for user",
mlog.String("user_id", directMsg.conn.UserId),
mlog.String("conn_id", directMsg.conn.GetConnectionID()))
// Don't log the warning if it's an inactive connection.
if directMsg.conn.active.Load() {
mlog.Error("webhub.broadcast: cannot send, closing websocket for user",
mlog.String("user_id", directMsg.conn.UserId),
mlog.String("conn_id", directMsg.conn.GetConnectionID()))
}
close(directMsg.conn.send)
connIndex.Remove(directMsg.conn)
}
@ -497,9 +500,12 @@ func (h *Hub) Start() {
select {
case webConn.send <- msg:
default:
mlog.Error("webhub.broadcast: cannot send, closing websocket for user",
mlog.String("user_id", webConn.UserId),
mlog.String("conn_id", webConn.GetConnectionID()))
// Don't log the warning if it's an inactive connection.
if webConn.active.Load() {
mlog.Error("webhub.broadcast: cannot send, closing websocket for user",
mlog.String("user_id", webConn.UserId),
mlog.String("conn_id", webConn.GetConnectionID()))
}
close(webConn.send)
connIndex.Remove(webConn)
}
@ -639,7 +645,7 @@ func (i *hubConnectionIndex) RemoveInactiveByConnectionID(userID, connectionID s
return nil
}
for _, conn := range i.ForUser(userID) {
if conn.GetConnectionID() == connectionID && !conn.active {
if conn.GetConnectionID() == connectionID && !conn.active.Load() {
i.Remove(conn)
return conn
}
@ -652,7 +658,7 @@ func (i *hubConnectionIndex) RemoveInactiveByConnectionID(userID, connectionID s
func (i *hubConnectionIndex) RemoveInactiveConnections() {
now := model.GetMillis()
for conn := range i.byConnection {
if !conn.active && now-conn.lastUserActivityAt > i.staleThreshold.Milliseconds() {
if !conn.active.Load() && now-conn.lastUserActivityAt > i.staleThreshold.Milliseconds() {
i.Remove(conn)
}
}
@ -664,7 +670,7 @@ func (i *hubConnectionIndex) RemoveInactiveConnections() {
func (i *hubConnectionIndex) AllActive() int {
cnt := 0
for conn := range i.byConnection {
if conn.active {
if conn.active.Load() {
cnt++
}
}

View File

@ -379,8 +379,8 @@ func TestHubConnIndexInactive(t *testing.T) {
wc1 := &WebConn{
Platform: th.Service,
UserId: model.NewId(),
active: true,
}
wc1.active.Store(true)
wc1.SetConnectionID("conn1")
wc1.SetSession(&model.Session{})
@ -388,16 +388,16 @@ func TestHubConnIndexInactive(t *testing.T) {
wc2 := &WebConn{
Platform: th.Service,
UserId: model.NewId(),
active: true,
}
wc2.active.Store(true)
wc2.SetConnectionID("conn2")
wc2.SetSession(&model.Session{})
wc3 := &WebConn{
Platform: th.Service,
UserId: wc2.UserId,
active: false,
}
wc3.active.Store(false)
wc3.SetConnectionID("conn3")
wc3.SetSession(&model.Session{})