MM-32950: Reliable WebSockets: Basic single server (#17406)

* MM-32950: Reliable WebSockets: Basic single server

This PR adds reliable websocket support for a single server.

Below is a brief overview of the three states of a connection:

Normal:
- All messages are routed via web hub.
- Each web conn has a send queue to which it gets pushed.
- A message gets pulled from the queue, and before it
gets written to the wire, it is added to the dead queue.

Disconnect:
- Hub Unregister gets called, where the connection is just
marked as inactive. And new messages keep getting pushed
to the send queue.

If it gets full, the channel is closed and the conn gets removed
from conn index.

Reconnect:
- We query the hub for the connection ID, and get back the
queues.
- We construct a WebConn reusing the old queues, or a fresh one
depending on whether the connection ID was found or not.
- Now there is a tricky bit here which needs to be carefully processed.
On register, we would always send the hello message in the send queue.
But we cannot do that now because the send queue might already have messages.

Therefore, we don't send the hello message from web hub, if we reuse a connection.

Instead, we move that logic to the web conn write pump. We check if
the sequence number is in dead queue, and if it is, then we drain
the dead queue, and start consuming from the active queue.
No hello message is sent here.

But if the message does not exist in the dead queue, and the sequence number
is actually something that should have existed, then we set
a new connction id and clear the dead queue, and send a hello message.
The client, on receiving a new connection id will automatically
set its sequence number to 0, and make the sync API calls to manage
any lost data.

https://mattermost.atlassian.net/browse/MM-32590

```release-note
NONE
```

* gofmt

* Add EnableReliableWebSockets to the client config

* Refactoring isInDeadQueue

* Passing index to drainDeadQueue

* refactoring webconn

* fix pointer

* review comments

* simplify hasMsgLoss

* safety comment

* fix test

* Trigger CI

* Trigger CI

Co-authored-by: Devin Binnie <devin.binnie@mattermost.com>
Co-authored-by: Mattermod <mattermod@users.noreply.github.com>
This commit is contained in:
Agniva De Sarker
2021-04-26 19:51:25 +05:30
committed by GitHub
parent b0279a432d
commit cd4d322e4a
8 changed files with 718 additions and 114 deletions

View File

@@ -5,10 +5,10 @@ package api4
import (
"net/http"
"strconv"
"github.com/gorilla/websocket"
"github.com/mattermost/mattermost-server/v5/app"
"github.com/mattermost/mattermost-server/v5/model"
"github.com/mattermost/mattermost-server/v5/shared/mlog"
)
@@ -36,53 +36,34 @@ func connectWebSocket(c *Context, w http.ResponseWriter, r *http.Request) {
return
}
wc := c.App.NewWebConn(ws, *c.App.Session(), c.App.T, "")
if *c.App.Config().ServiceSettings.EnableReliableWebSockets {
connID := r.URL.Query().Get(connectionIDParam)
if connID == "" {
// If not present, we assume client is not capable yet, or it's a fresh connection.
// We just create a new ID.
connID = model.NewId()
} else {
if !model.IsValidId(connID) {
mlog.Error("Invalid connection ID", mlog.String("id", connID))
wc.WebSocket.Close()
return
}
// If present, we check if it's present in the connection manager.
// TODO: the connection manager internally should forward the request
// to the cluster if it does not have it.
//
// If the connection is not present, then we assume either timeout,
// or server restart. In that case, we set a new one.
//
// Now we get the sequence number
seqVal := r.URL.Query().Get(sequenceNumberParam)
if seqVal == "" {
// Sequence_number must be sent with connection id.
// A client must be either non-compliant or fully compliant.
mlog.Error("Sequence number not present in websocket request")
wc.WebSocket.Close()
return
}
seq, err := strconv.Atoi(seqVal)
if err != nil || seq < 0 {
mlog.Error("Invalid sequence number set in query param",
mlog.String("query", seqVal),
mlog.Err(err))
wc.WebSocket.Close()
return
}
wc.Sequence = int64(seq)
// Now if there have been past entries to be back-filled, we do it.
// First we find the right sequence number point.
// We start consuming from dead queue first, and then move to active queue
}
// In case of fresh connection id, sequence number is already zero.
wc.SetConnectionID(connID)
// We initialize webconn with all the necessary data.
// If the queues are empty, they are initialized in the constructor.
cfg := &app.WebConnConfig{
WebSocket: ws,
Session: *c.App.Session(),
TFunc: c.App.T,
Locale: "",
Active: true,
}
if *c.App.Config().ServiceSettings.EnableReliableWebSockets {
cfg.ConnectionID = r.URL.Query().Get(connectionIDParam)
if cfg.ConnectionID == "" || c.App.Session().UserId == "" {
// If not present, we assume client is not capable yet, or it's a fresh connection.
// We just create a new ID.
cfg.ConnectionID = model.NewId()
// In case of fresh connection id, sequence number is already zero.
} else {
cfg, err = c.App.PopulateWebConnConfig(cfg, r.URL.Query().Get(sequenceNumberParam))
if err != nil {
mlog.Warn("Error while populating webconn config", mlog.String("id", r.URL.Query().Get(connectionIDParam)), mlog.Err(err))
ws.Close()
return
}
}
}
wc := c.App.NewWebConn(cfg)
if c.App.Session().UserId != "" {
c.App.HubRegister(wc)
}