MM-24397: Reusing the read buffer while reading messages from websockets (#14368)

* MM-24397: Reusing the read buffer while reading messages from websockets

The core problem was that conn.ReadMessage allocated a buffer every time it was read.
This created heavy amount of allocations every single time we read a message from the websocket.

To avoid this, we bypass the ReadMessage which was more of a helper method,
and actually call the NextReader which returns a reader object.
We can then reuse a single byte.Buffer instance to read the object unmarshal
into a WebSocketEvent object. This gets rid of the allocation in the read path completly
and allows GC more time to do other tasks.

* Incorporate review comments

* Move reset buffer to top of loop

* Cleanup further

* Fix test

* Final fix
This commit is contained in:
Agniva De Sarker
2020-04-28 19:38:11 +05:30
committed by GitHub
parent 8781c36eb3
commit f0eb0a9a01

View File

@@ -30,6 +30,8 @@ type writeMessage struct {
data interface{}
}
const avgReadMsgSizeBytes = 1024
// WebSocketClient stores the necessary information required to
// communicate with a WebSocket endpoint.
type WebSocketClient struct {
@@ -167,19 +169,30 @@ func (wsc *WebSocketClient) Listen() {
close(wsc.ResponseChannel)
close(wsc.quitPingWatchdog)
}()
var buf bytes.Buffer
buf.Grow(avgReadMsgSizeBytes)
for {
var rawMsg json.RawMessage
var err error
if _, rawMsg, err = wsc.Conn.ReadMessage(); err != nil {
// Reset buffer.
buf.Reset()
_, r, err := wsc.Conn.NextReader()
if err != nil {
if !websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
wsc.ListenError = NewAppError("NewWebSocketClient", "model.websocket_client.connect_fail.app_error", nil, err.Error(), http.StatusInternalServerError)
}
return
}
// Use pre-allocated buffer.
_, err = buf.ReadFrom(r)
if err != nil {
// This should use a different error ID, but en.json is not imported anyways.
// It's a different bug altogether but we let it be for now.
// See MM-24520.
wsc.ListenError = NewAppError("NewWebSocketClient", "model.websocket_client.connect_fail.app_error", nil, err.Error(), http.StatusInternalServerError)
return
}
event := WebSocketEventFromJson(bytes.NewReader(rawMsg))
event := WebSocketEventFromJson(bytes.NewReader(buf.Bytes()))
if event == nil {
continue
}
@@ -189,7 +202,7 @@ func (wsc *WebSocketClient) Listen() {
}
var response WebSocketResponse
if err := json.Unmarshal(rawMsg, &response); err == nil && response.IsValid() {
if err := json.Unmarshal(buf.Bytes(), &response); err == nil && response.IsValid() {
wsc.ResponseChannel <- &response
continue
}