Files
mattermost/server/einterfaces/cluster.go
Agniva De Sarker cb75a20c54 MM-61904: Make reliable websockets work in HA (#29489)
We do a cluster request to get the active and dead queues
from other nodes in the cluster to sync any missing
information.

We check the dead queue in the other nodes to see
if there's been any message loss or not. Accordingly,
we send just the active queue or both active and dead queues.

There's still an edge case that is left out where
a client could have potentially connected and reconnected
to multiple nodes leaving multiple active queues
in multiple nodes. We don't handle this scenario
because then potentially we need to create
a slice of sendQueueSize * number_of_nodes. And then
this can happen again, leading to an infinite increase
in sendQueueSize.

We leave this edge-case to Redis, acknowledging
a limitation in our architecture.

In this PR, when there's no message loss, we just
take the active queue from the last node it connected
to.

And if there's message loss where the client's
seqNum is within the last node's dead queue, we also
handle that.

But if there's severe message loss where the client's
seqNum falls within the dead queue of another node, then
we just send the data from that node to reconstruct the
data as much as possible. It could be possible to set
a new connection ID in this case, but this involves
more data transfer always from all nodes and recomputing
the state in the requestor node.

https://mattermost.atlassian.net/browse/MM-61904

```release-note
NONE
```

Co-authored-by: Mattermost Build <build@mattermost.com>
2025-01-17 11:11:32 +05:30

41 lines
1.8 KiB
Go

// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package einterfaces
import (
"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/shared/request"
)
type ClusterMessageHandler func(msg *model.ClusterMessage)
type ClusterInterface interface {
StartInterNodeCommunication()
StopInterNodeCommunication()
RegisterClusterMessageHandler(event model.ClusterEvent, crm ClusterMessageHandler)
GetClusterId() string
IsLeader() bool
// HealthScore returns a number which is indicative of how well an instance is meeting
// the soft real-time requirements of the protocol. Lower numbers are better,
// and zero means "totally healthy".
HealthScore() int
GetMyClusterInfo() *model.ClusterInfo
GetClusterInfos() []*model.ClusterInfo
SendClusterMessage(msg *model.ClusterMessage)
SendClusterMessageToNode(nodeID string, msg *model.ClusterMessage) error
NotifyMsg(buf []byte)
GetClusterStats(rctx request.CTX) ([]*model.ClusterStats, *model.AppError)
GetLogs(ctx request.CTX, page, perPage int) ([]string, *model.AppError)
QueryLogs(rctx request.CTX, page, perPage int) (map[string][]string, *model.AppError)
GenerateSupportPacket(rctx request.CTX, options *model.SupportPacketOptions) (map[string][]model.FileData, error)
GetPluginStatuses() (model.PluginStatuses, *model.AppError)
ConfigChanged(previousConfig *model.Config, newConfig *model.Config, sendToOtherServer bool) *model.AppError
// WebConnCountForUser returns the number of active webconn connections
// for a given userID.
WebConnCountForUser(userID string) (int, *model.AppError)
// GetWSQueues returns the necessary websocket queues from a cluster for a given
// connectionID and sequence number.
GetWSQueues(userID, connectionID string, seqNum int64) (map[string]*model.WSQueues, error)
}