mirror of
https://github.com/mattermost/mattermost.git
synced 2025-02-25 18:55:24 -06:00
* MM-24547: Fix writer leak when connection closes When the connection is closed, the exit path does not shut down the writer goroutine. In which case, it will keep spinning forever. Since we already have the CAS mechanism now, we can move the closing functionality into the main Close method and just call that in the defer block. This makes closing the websocket client idempotent from both perspective - - Explicitly closing. - Closing due to connection tear down. There are still 2 races left: - Using the exported Conn to directly write messages. We cannot do anything about that as long as clients directly using that. - Setting the wsc.pingTimeoutTimer field in a separate goroutine when calling .Connect(). This will need to be seen later. * Fix ineffectual assignment * Duplicate the closing of writer The problem with refactoring the writer closing to a common function was that we needed to wait for the reader to exit before closing the EventChannel and ResponseChannel. But then there is another problem that the API can be used in such a way that the client is liable to call Close without even calling Listen. In that case, we cannot wait for Listen to quit. So from Close, we can only close the connection. And therefore we need to duplicate the writer closing in the read loop's defer block. * Cleanup some comments
147 lines
3.3 KiB
Go
147 lines
3.3 KiB
Go
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
|
|
// See LICENSE.txt for license information.
|
|
|
|
package model
|
|
|
|
import (
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func dummyWebsocketHandler(t *testing.T) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, req *http.Request) {
|
|
upgrader := &websocket.Upgrader{
|
|
ReadBufferSize: 1024,
|
|
WriteBufferSize: 1024,
|
|
}
|
|
conn, err := upgrader.Upgrade(w, req, nil)
|
|
require.Nil(t, err)
|
|
var buf []byte
|
|
for {
|
|
_, buf, err = conn.ReadMessage()
|
|
if err != nil {
|
|
break
|
|
}
|
|
t.Logf("%s\n", buf)
|
|
err = conn.WriteMessage(websocket.PingMessage, []byte("ping"))
|
|
if err != nil {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestWebSocketRace needs to be run with -race to verify that
|
|
// there is no race.
|
|
func TestWebSocketRace(t *testing.T) {
|
|
s := httptest.NewServer(dummyWebsocketHandler(t))
|
|
defer s.Close()
|
|
|
|
url := strings.Replace(s.URL, "http://", "ws://", 1)
|
|
cli, err := NewWebSocketClient4(url, "authToken")
|
|
require.Nil(t, err)
|
|
|
|
cli.Listen()
|
|
|
|
for i := 0; i < 10; i++ {
|
|
time.Sleep(500 * time.Millisecond)
|
|
cli.UserTyping("channel", "parentId")
|
|
}
|
|
}
|
|
|
|
func TestWebSocketClose(t *testing.T) {
|
|
s := httptest.NewServer(dummyWebsocketHandler(t))
|
|
defer s.Close()
|
|
|
|
url := strings.Replace(s.URL, "http://", "ws://", 1)
|
|
|
|
// Check whether the Event and Response channels
|
|
// have been closed or not.
|
|
waitClose := func(doneChan chan struct{}) int {
|
|
numClosed := 0
|
|
timeout := time.After(300 * time.Millisecond)
|
|
for {
|
|
select {
|
|
case <-doneChan:
|
|
numClosed++
|
|
if numClosed == 2 {
|
|
return numClosed
|
|
}
|
|
case <-timeout:
|
|
require.Fail(t, "timed out waiting for channels to be closed")
|
|
return numClosed
|
|
}
|
|
}
|
|
}
|
|
|
|
checkWriteChan := func(writeChan chan writeMessage) {
|
|
defer func() {
|
|
if x := recover(); x == nil {
|
|
require.Fail(t, "should have panicked due to closing a closed channel")
|
|
}
|
|
}()
|
|
close(writeChan)
|
|
}
|
|
|
|
waitForResponses := func(doneChan chan struct{}, cli *WebSocketClient) {
|
|
go func() {
|
|
for range cli.EventChannel {
|
|
}
|
|
doneChan <- struct{}{}
|
|
}()
|
|
go func() {
|
|
for range cli.ResponseChannel {
|
|
}
|
|
doneChan <- struct{}{}
|
|
}()
|
|
}
|
|
|
|
t.Run("SuddenClose", func(t *testing.T) {
|
|
cli, err := NewWebSocketClient4(url, "authToken")
|
|
require.Nil(t, err)
|
|
|
|
cli.Listen()
|
|
|
|
doneChan := make(chan struct{}, 2)
|
|
waitForResponses(doneChan, cli)
|
|
|
|
cli.UserTyping("channelId", "parentId")
|
|
cli.Conn.Close()
|
|
|
|
numClosed := waitClose(doneChan)
|
|
assert.Equal(t, 2, numClosed, "unexpected number of channels closed")
|
|
|
|
// Check whether the write channel was closed or not.
|
|
checkWriteChan(cli.writeChan)
|
|
|
|
require.NotNil(t, cli.ListenError, "non-nil listen error")
|
|
assert.Equal(t, "model.websocket_client.connect_fail.app_error", cli.ListenError.Id, "unexpected error id")
|
|
})
|
|
|
|
t.Run("ExplicitClose", func(t *testing.T) {
|
|
cli, err := NewWebSocketClient4(url, "authToken")
|
|
require.Nil(t, err)
|
|
|
|
cli.Listen()
|
|
|
|
doneChan := make(chan struct{}, 2)
|
|
waitForResponses(doneChan, cli)
|
|
|
|
cli.UserTyping("channelId", "parentId")
|
|
cli.Close()
|
|
|
|
numClosed := waitClose(doneChan)
|
|
assert.Equal(t, 2, numClosed, "unexpected number of channels closed")
|
|
|
|
// Check whether the write channel was closed or not.
|
|
checkWriteChan(cli.writeChan)
|
|
})
|
|
}
|