mirror of
https://github.com/mattermost/mattermost.git
synced 2025-02-25 18:55:24 -06:00
MM-24547: Fix writer leak when connection closes (#14406)
* MM-24547: Fix writer leak when connection closes When the connection is closed, the exit path does not shut down the writer goroutine. In which case, it will keep spinning forever. Since we already have the CAS mechanism now, we can move the closing functionality into the main Close method and just call that in the defer block. This makes closing the websocket client idempotent from both perspective - - Explicitly closing. - Closing due to connection tear down. There are still 2 races left: - Using the exported Conn to directly write messages. We cannot do anything about that as long as clients directly using that. - Setting the wsc.pingTimeoutTimer field in a separate goroutine when calling .Connect(). This will need to be seen later. * Fix ineffectual assignment * Duplicate the closing of writer The problem with refactoring the writer closing to a common function was that we needed to wait for the reader to exit before closing the EventChannel and ResponseChannel. But then there is another problem that the API can be used in such a way that the client is liable to call Close without even calling Listen. In that case, we cannot wait for Listen to quit. So from Close, we can only close the connection. And therefore we need to duplicate the writer closing in the read loop's defer block. * Cleanup some comments
This commit is contained in:
@@ -125,6 +125,7 @@ func (wsc *WebSocketClient) ConnectWithDialer(dialer *websocket.Dialer) *AppErro
|
||||
wsc.writeChan = make(chan writeMessage)
|
||||
wsc.quitWriterChan = make(chan struct{})
|
||||
go wsc.writer()
|
||||
wsc.quitPingWatchdog = make(chan struct{})
|
||||
}
|
||||
|
||||
wsc.EventChannel = make(chan *WebSocketEvent, 100)
|
||||
@@ -135,6 +136,8 @@ func (wsc *WebSocketClient) ConnectWithDialer(dialer *websocket.Dialer) *AppErro
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes the websocket client. It is recommended that a closed client should not be
|
||||
// reused again. Rather a new client should be created anew.
|
||||
func (wsc *WebSocketClient) Close() {
|
||||
// CAS to 1 and proceed. Return if already 1.
|
||||
if !atomic.CompareAndSwapInt32(&wsc.closed, 0, 1) {
|
||||
@@ -142,9 +145,12 @@ func (wsc *WebSocketClient) Close() {
|
||||
}
|
||||
wsc.quitWriterChan <- struct{}{}
|
||||
close(wsc.writeChan)
|
||||
// We close the connection, which breaks the reader loop.
|
||||
// Then we let the defer block in the reader do further cleanup.
|
||||
wsc.Conn.Close()
|
||||
}
|
||||
|
||||
// TODO: un-export the Conn so that Write methods go through the writer
|
||||
func (wsc *WebSocketClient) writer() {
|
||||
for {
|
||||
select {
|
||||
@@ -161,14 +167,34 @@ func (wsc *WebSocketClient) writer() {
|
||||
}
|
||||
}
|
||||
|
||||
// Listen starts the read loop of the websocket client.
|
||||
func (wsc *WebSocketClient) Listen() {
|
||||
// This loop can exit in 2 conditions:
|
||||
// 1. Either the connection breaks naturally.
|
||||
// 2. Close was explicitly called, which closes the connection manually.
|
||||
//
|
||||
// Due to the way the API is written, there is a requirement that a client may NOT
|
||||
// call Listen at all and can still call Close and Connect.
|
||||
// Therefore, we let the cleanup of the reader stuff rely on closing the connection
|
||||
// and then we do the cleanup in the defer block.
|
||||
//
|
||||
// First, we close some channels and then CAS to 1 and proceed to close the writer chan also.
|
||||
// This is needed because then the defer clause does not double-close the writer when (2) happens.
|
||||
// But if (1) happens, we set the closed bit, and close the rest of the stuff.
|
||||
go func() {
|
||||
defer func() {
|
||||
wsc.Conn.Close()
|
||||
close(wsc.EventChannel)
|
||||
close(wsc.ResponseChannel)
|
||||
close(wsc.quitPingWatchdog)
|
||||
// We CAS to 1 and proceed.
|
||||
if !atomic.CompareAndSwapInt32(&wsc.closed, 0, 1) {
|
||||
return
|
||||
}
|
||||
wsc.quitWriterChan <- struct{}{}
|
||||
close(wsc.writeChan)
|
||||
wsc.Conn.Close() // This can most likely be removed. Needs to be checked.
|
||||
}()
|
||||
|
||||
var buf bytes.Buffer
|
||||
buf.Grow(avgReadMsgSizeBytes)
|
||||
|
||||
@@ -206,7 +232,6 @@ func (wsc *WebSocketClient) Listen() {
|
||||
wsc.ResponseChannel <- &response
|
||||
continue
|
||||
}
|
||||
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
@@ -21,6 +22,7 @@ func dummyWebsocketHandler(t *testing.T) http.HandlerFunc {
|
||||
WriteBufferSize: 1024,
|
||||
}
|
||||
conn, err := upgrader.Upgrade(w, req, nil)
|
||||
require.Nil(t, err)
|
||||
var buf []byte
|
||||
for {
|
||||
_, buf, err = conn.ReadMessage()
|
||||
@@ -33,9 +35,6 @@ func dummyWebsocketHandler(t *testing.T) http.HandlerFunc {
|
||||
break
|
||||
}
|
||||
}
|
||||
if _, ok := err.(*websocket.CloseError); !ok {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,3 +55,92 @@ func TestWebSocketRace(t *testing.T) {
|
||||
cli.UserTyping("channel", "parentId")
|
||||
}
|
||||
}
|
||||
|
||||
func TestWebSocketClose(t *testing.T) {
|
||||
s := httptest.NewServer(dummyWebsocketHandler(t))
|
||||
defer s.Close()
|
||||
|
||||
url := strings.Replace(s.URL, "http://", "ws://", 1)
|
||||
|
||||
// Check whether the Event and Response channels
|
||||
// have been closed or not.
|
||||
waitClose := func(doneChan chan struct{}) int {
|
||||
numClosed := 0
|
||||
timeout := time.After(300 * time.Millisecond)
|
||||
for {
|
||||
select {
|
||||
case <-doneChan:
|
||||
numClosed++
|
||||
if numClosed == 2 {
|
||||
return numClosed
|
||||
}
|
||||
case <-timeout:
|
||||
require.Fail(t, "timed out waiting for channels to be closed")
|
||||
return numClosed
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
checkWriteChan := func(writeChan chan writeMessage) {
|
||||
defer func() {
|
||||
if x := recover(); x == nil {
|
||||
require.Fail(t, "should have panicked due to closing a closed channel")
|
||||
}
|
||||
}()
|
||||
close(writeChan)
|
||||
}
|
||||
|
||||
waitForResponses := func(doneChan chan struct{}, cli *WebSocketClient) {
|
||||
go func() {
|
||||
for range cli.EventChannel {
|
||||
}
|
||||
doneChan <- struct{}{}
|
||||
}()
|
||||
go func() {
|
||||
for range cli.ResponseChannel {
|
||||
}
|
||||
doneChan <- struct{}{}
|
||||
}()
|
||||
}
|
||||
|
||||
t.Run("SuddenClose", func(t *testing.T) {
|
||||
cli, err := NewWebSocketClient4(url, "authToken")
|
||||
require.Nil(t, err)
|
||||
|
||||
cli.Listen()
|
||||
|
||||
doneChan := make(chan struct{}, 2)
|
||||
waitForResponses(doneChan, cli)
|
||||
|
||||
cli.UserTyping("channelId", "parentId")
|
||||
cli.Conn.Close()
|
||||
|
||||
numClosed := waitClose(doneChan)
|
||||
assert.Equal(t, 2, numClosed, "unexpected number of channels closed")
|
||||
|
||||
// Check whether the write channel was closed or not.
|
||||
checkWriteChan(cli.writeChan)
|
||||
|
||||
require.NotNil(t, cli.ListenError, "non-nil listen error")
|
||||
assert.Equal(t, "model.websocket_client.connect_fail.app_error", cli.ListenError.Id, "unexpected error id")
|
||||
})
|
||||
|
||||
t.Run("ExplicitClose", func(t *testing.T) {
|
||||
cli, err := NewWebSocketClient4(url, "authToken")
|
||||
require.Nil(t, err)
|
||||
|
||||
cli.Listen()
|
||||
|
||||
doneChan := make(chan struct{}, 2)
|
||||
waitForResponses(doneChan, cli)
|
||||
|
||||
cli.UserTyping("channelId", "parentId")
|
||||
cli.Close()
|
||||
|
||||
numClosed := waitClose(doneChan)
|
||||
assert.Equal(t, 2, numClosed, "unexpected number of channels closed")
|
||||
|
||||
// Check whether the write channel was closed or not.
|
||||
checkWriteChan(cli.writeChan)
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user