mirror of
https://github.com/mattermost/mattermost.git
synced 2025-02-25 18:55:24 -06:00
We do a cluster request to get the active and dead queues from other nodes in the cluster to sync any missing information. We check the dead queue in the other nodes to see if there's been any message loss or not. Accordingly, we send just the active queue or both active and dead queues. There's still an edge case that is left out where a client could have potentially connected and reconnected to multiple nodes leaving multiple active queues in multiple nodes. We don't handle this scenario because then potentially we need to create a slice of sendQueueSize * number_of_nodes. And then this can happen again, leading to an infinite increase in sendQueueSize. We leave this edge-case to Redis, acknowledging a limitation in our architecture. In this PR, when there's no message loss, we just take the active queue from the last node it connected to. And if there's message loss where the client's seqNum is within the last node's dead queue, we also handle that. But if there's severe message loss where the client's seqNum falls within the dead queue of another node, then we just send the data from that node to reconstruct the data as much as possible. It could be possible to set a new connection ID in this case, but this involves more data transfer always from all nodes and recomputing the state in the requestor node. https://mattermost.atlassian.net/browse/MM-61904 ```release-note NONE ``` Co-authored-by: Mattermost Build <build@mattermost.com>
41 lines
1.8 KiB
Go
41 lines
1.8 KiB
Go
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
|
|
// See LICENSE.txt for license information.
|
|
|
|
package einterfaces
|
|
|
|
import (
|
|
"github.com/mattermost/mattermost/server/public/model"
|
|
"github.com/mattermost/mattermost/server/public/shared/request"
|
|
)
|
|
|
|
type ClusterMessageHandler func(msg *model.ClusterMessage)
|
|
|
|
type ClusterInterface interface {
|
|
StartInterNodeCommunication()
|
|
StopInterNodeCommunication()
|
|
RegisterClusterMessageHandler(event model.ClusterEvent, crm ClusterMessageHandler)
|
|
GetClusterId() string
|
|
IsLeader() bool
|
|
// HealthScore returns a number which is indicative of how well an instance is meeting
|
|
// the soft real-time requirements of the protocol. Lower numbers are better,
|
|
// and zero means "totally healthy".
|
|
HealthScore() int
|
|
GetMyClusterInfo() *model.ClusterInfo
|
|
GetClusterInfos() []*model.ClusterInfo
|
|
SendClusterMessage(msg *model.ClusterMessage)
|
|
SendClusterMessageToNode(nodeID string, msg *model.ClusterMessage) error
|
|
NotifyMsg(buf []byte)
|
|
GetClusterStats(rctx request.CTX) ([]*model.ClusterStats, *model.AppError)
|
|
GetLogs(ctx request.CTX, page, perPage int) ([]string, *model.AppError)
|
|
QueryLogs(rctx request.CTX, page, perPage int) (map[string][]string, *model.AppError)
|
|
GenerateSupportPacket(rctx request.CTX, options *model.SupportPacketOptions) (map[string][]model.FileData, error)
|
|
GetPluginStatuses() (model.PluginStatuses, *model.AppError)
|
|
ConfigChanged(previousConfig *model.Config, newConfig *model.Config, sendToOtherServer bool) *model.AppError
|
|
// WebConnCountForUser returns the number of active webconn connections
|
|
// for a given userID.
|
|
WebConnCountForUser(userID string) (int, *model.AppError)
|
|
// GetWSQueues returns the necessary websocket queues from a cluster for a given
|
|
// connectionID and sequence number.
|
|
GetWSQueues(userID, connectionID string, seqNum int64) (map[string]*model.WSQueues, error)
|
|
}
|