mirror of
https://github.com/mattermost/mattermost.git
synced 2025-02-25 18:55:24 -06:00
MM-25516: Changed to byte slice instead of string for cluster messages (#17998)
* MM-25516: Changed to byte slice instead of string for cluster messages https://mattermost.atlassian.net/browse/MM-25116 Testing: Manually tested. Load-tested with Cluster Controller. I looked into changing the serialization method to use msgpack, but the ClusterMessage struct was mainly used for only 3 fields which didn't lead to much of a CPU time improvement, whereas actually led to more allocations using msgpack. Hence, I chose to remain with JSON. ``` name old time/op new time/op delta ClusterMarshal-8 3.51µs ± 1% 3.10µs ± 2% -11.59% (p=0.000 n=9+10) name old alloc/op new alloc/op delta ClusterMarshal-8 776B ± 0% 1000B ± 0% +28.87% (p=0.000 n=10+10) name old allocs/op new allocs/op delta ClusterMarshal-8 12.0 ± 0% 13.0 ± 0% +8.33% (p=0.000 n=10+10) ``` ```release-note Changed the field type of Data in model.ClusterMessage to []byte from string. ``` * Trigger CI ```release-note NONE ```
This commit is contained in:
@@ -304,11 +304,13 @@ func TestNotifyClusterPluginEvent(t *testing.T) {
|
||||
expectedPluginData := model.PluginEventData{
|
||||
Id: manifest.Id,
|
||||
}
|
||||
|
||||
buf, _ := json.Marshal(expectedPluginData)
|
||||
expectedInstallMessage := &model.ClusterMessage{
|
||||
Event: model.ClusterEventInstallPlugin,
|
||||
SendType: model.ClusterSendReliable,
|
||||
WaitForAllToSend: true,
|
||||
Data: expectedPluginData.ToJson(),
|
||||
Data: buf,
|
||||
}
|
||||
actualMessages := findClusterMessages(model.ClusterEventInstallPlugin, messages)
|
||||
require.Equal(t, []*model.ClusterMessage{expectedInstallMessage}, actualMessages)
|
||||
@@ -354,7 +356,7 @@ func TestNotifyClusterPluginEvent(t *testing.T) {
|
||||
Event: model.ClusterEventRemovePlugin,
|
||||
SendType: model.ClusterSendReliable,
|
||||
WaitForAllToSend: true,
|
||||
Data: expectedPluginData.ToJson(),
|
||||
Data: buf,
|
||||
}
|
||||
actualMessages = findClusterMessages(model.ClusterEventRemovePlugin, messages)
|
||||
require.Equal(t, []*model.ClusterMessage{expectedRemoveMessage}, actualMessages)
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@@ -109,11 +110,12 @@ func (b *Busy) notifyServerBusyChange(sbs *model.ServerBusyState) {
|
||||
if b.cluster == nil {
|
||||
return
|
||||
}
|
||||
buf, _ := json.Marshal(sbs)
|
||||
msg := &model.ClusterMessage{
|
||||
Event: model.ClusterEventBusyStateChanged,
|
||||
SendType: model.ClusterSendReliable,
|
||||
WaitForAllToSend: true,
|
||||
Data: sbs.ToJson(),
|
||||
Data: buf,
|
||||
}
|
||||
b.cluster.SendClusterMessage(msg)
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"bytes"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -116,7 +116,7 @@ type ClusterMock struct {
|
||||
}
|
||||
|
||||
func (c *ClusterMock) SendClusterMessage(msg *model.ClusterMessage) {
|
||||
sbs := model.ServerBusyStateFromJson(strings.NewReader(msg.Data))
|
||||
sbs := model.ServerBusyStateFromJson(bytes.NewReader(msg.Data))
|
||||
c.Busy.ClusterEventChanged(sbs)
|
||||
}
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"bytes"
|
||||
|
||||
"github.com/mattermost/mattermost-server/v6/model"
|
||||
"github.com/mattermost/mattermost-server/v6/plugin"
|
||||
@@ -12,11 +12,11 @@ import (
|
||||
)
|
||||
|
||||
func (s *Server) clusterInstallPluginHandler(msg *model.ClusterMessage) {
|
||||
s.installPluginFromData(model.PluginEventDataFromJson(strings.NewReader(msg.Data)))
|
||||
s.installPluginFromData(model.PluginEventDataFromJson(bytes.NewReader(msg.Data)))
|
||||
}
|
||||
|
||||
func (s *Server) clusterRemovePluginHandler(msg *model.ClusterMessage) {
|
||||
s.removePluginFromData(model.PluginEventDataFromJson(strings.NewReader(msg.Data)))
|
||||
s.removePluginFromData(model.PluginEventDataFromJson(bytes.NewReader(msg.Data)))
|
||||
}
|
||||
|
||||
func (s *Server) clusterPluginEventHandler(msg *model.ClusterMessage) {
|
||||
@@ -44,7 +44,7 @@ func (s *Server) clusterPluginEventHandler(msg *model.ClusterMessage) {
|
||||
|
||||
hooks.OnPluginClusterEvent(&plugin.Context{}, model.PluginClusterEvent{
|
||||
Id: eventID,
|
||||
Data: []byte(msg.Data),
|
||||
Data: msg.Data,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -69,7 +69,7 @@ func (s *Server) registerClusterHandlers() {
|
||||
}
|
||||
|
||||
func (s *Server) clusterPublishHandler(msg *model.ClusterMessage) {
|
||||
event := model.WebSocketEventFromJson(strings.NewReader(msg.Data))
|
||||
event := model.WebSocketEventFromJson(bytes.NewReader(msg.Data))
|
||||
if event == nil {
|
||||
return
|
||||
}
|
||||
@@ -77,7 +77,7 @@ func (s *Server) clusterPublishHandler(msg *model.ClusterMessage) {
|
||||
}
|
||||
|
||||
func (s *Server) clusterUpdateStatusHandler(msg *model.ClusterMessage) {
|
||||
status := model.StatusFromJson(strings.NewReader(msg.Data))
|
||||
status := model.StatusFromJson(bytes.NewReader(msg.Data))
|
||||
s.statusCache.Set(status.UserId, status)
|
||||
}
|
||||
|
||||
@@ -86,7 +86,7 @@ func (s *Server) clusterInvalidateAllCachesHandler(msg *model.ClusterMessage) {
|
||||
}
|
||||
|
||||
func (s *Server) clusterInvalidateCacheForChannelMembersNotifyPropHandler(msg *model.ClusterMessage) {
|
||||
s.invalidateCacheForChannelMembersNotifyPropsSkipClusterSend(msg.Data)
|
||||
s.invalidateCacheForChannelMembersNotifyPropsSkipClusterSend(string(msg.Data))
|
||||
}
|
||||
|
||||
func (s *Server) clusterInvalidateCacheForChannelByNameHandler(msg *model.ClusterMessage) {
|
||||
@@ -94,11 +94,11 @@ func (s *Server) clusterInvalidateCacheForChannelByNameHandler(msg *model.Cluste
|
||||
}
|
||||
|
||||
func (s *Server) clusterInvalidateCacheForUserHandler(msg *model.ClusterMessage) {
|
||||
s.invalidateCacheForUserSkipClusterSend(msg.Data)
|
||||
s.invalidateCacheForUserSkipClusterSend(string(msg.Data))
|
||||
}
|
||||
|
||||
func (s *Server) clusterInvalidateCacheForUserTeamsHandler(msg *model.ClusterMessage) {
|
||||
s.invalidateWebConnSessionCacheForUser(msg.Data)
|
||||
s.invalidateWebConnSessionCacheForUser(string(msg.Data))
|
||||
}
|
||||
|
||||
func (s *Server) clearSessionCacheForUserSkipClusterSend(userID string) {
|
||||
@@ -112,7 +112,7 @@ func (s *Server) clearSessionCacheForAllUsersSkipClusterSend() {
|
||||
}
|
||||
|
||||
func (s *Server) clusterClearSessionCacheForUserHandler(msg *model.ClusterMessage) {
|
||||
s.clearSessionCacheForUserSkipClusterSend(msg.Data)
|
||||
s.clearSessionCacheForUserSkipClusterSend(string(msg.Data))
|
||||
}
|
||||
|
||||
func (s *Server) clusterClearSessionCacheForAllUsersHandler(msg *model.ClusterMessage) {
|
||||
@@ -120,7 +120,7 @@ func (s *Server) clusterClearSessionCacheForAllUsersHandler(msg *model.ClusterMe
|
||||
}
|
||||
|
||||
func (s *Server) clusterBusyStateChgHandler(msg *model.ClusterMessage) {
|
||||
s.serverBusyStateChanged(model.ServerBusyStateFromJson(strings.NewReader(msg.Data)))
|
||||
s.serverBusyStateChanged(model.ServerBusyStateFromJson(bytes.NewReader(msg.Data)))
|
||||
}
|
||||
|
||||
func (s *Server) invalidateCacheForChannelMembersNotifyPropsSkipClusterSend(channelID string) {
|
||||
|
||||
@@ -1139,7 +1139,7 @@ func (api *PluginAPI) PublishPluginClusterEvent(ev model.PluginClusterEvent,
|
||||
"PluginID": api.id,
|
||||
"EventID": ev.Id,
|
||||
},
|
||||
Data: string(ev.Data),
|
||||
Data: ev.Data,
|
||||
}
|
||||
|
||||
// If TargetId is empty we broadcast to all other cluster nodes.
|
||||
|
||||
@@ -34,7 +34,7 @@ func (p *Plugin) ServeHTTP(_ *plugin.Context, w http.ResponseWriter, r *http.Req
|
||||
}
|
||||
req := model.WebSocketRequestFromJson(bytes.NewReader(msg))
|
||||
resp := model.NewWebSocketResponse("OK", req.Seq, map[string]interface{}{"action": req.Action, "value": req.Data["value"]})
|
||||
if err = ws.WriteMessage(mt, []byte(resp.ToJson())); err != nil {
|
||||
if err = ws.WriteMessage(mt, resp.ToJson()); err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,16 +4,19 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"github.com/mattermost/mattermost-server/v6/model"
|
||||
)
|
||||
|
||||
func (s *Server) notifyClusterPluginEvent(event model.ClusterEvent, data model.PluginEventData) {
|
||||
buf, _ := json.Marshal(data)
|
||||
if s.Cluster != nil {
|
||||
s.Cluster.SendClusterMessage(&model.ClusterMessage{
|
||||
Event: event,
|
||||
SendType: model.ClusterSendReliable,
|
||||
WaitForAllToSend: true,
|
||||
Data: data.ToJson(),
|
||||
Data: buf,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
package slashcommands
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"bytes"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
@@ -50,7 +50,7 @@ func TestShareProviderDoCommand(t *testing.T) {
|
||||
require.Equal(t, "##### "+args.T("api.command_share.channel_shared"), response.Text)
|
||||
|
||||
channelConvertedMessages := testCluster.SelectMessages(func(msg *model.ClusterMessage) bool {
|
||||
event := model.WebSocketEventFromJson(strings.NewReader(msg.Data))
|
||||
event := model.WebSocketEventFromJson(bytes.NewReader(msg.Data))
|
||||
return event != nil && event.EventType() == model.WebsocketEventChannelConverted
|
||||
})
|
||||
assert.Len(t, channelConvertedMessages, 1)
|
||||
@@ -85,7 +85,7 @@ func TestShareProviderDoCommand(t *testing.T) {
|
||||
require.Equal(t, "##### "+args.T("api.command_share.shared_channel_unavailable"), response.Text)
|
||||
|
||||
channelConvertedMessages := testCluster.SelectMessages(func(msg *model.ClusterMessage) bool {
|
||||
event := model.WebSocketEventFromJson(strings.NewReader(msg.Data))
|
||||
event := model.WebSocketEventFromJson(bytes.NewReader(msg.Data))
|
||||
return event != nil && event.EventType() == model.WebsocketEventChannelConverted
|
||||
})
|
||||
require.Len(t, channelConvertedMessages, 1)
|
||||
|
||||
@@ -24,7 +24,7 @@ func (a *App) AddStatusCache(status *model.Status) {
|
||||
msg := &model.ClusterMessage{
|
||||
Event: model.ClusterEventUpdateStatus,
|
||||
SendType: model.ClusterSendBestEffort,
|
||||
Data: status.ToClusterJson(),
|
||||
Data: []byte(status.ToClusterJson()),
|
||||
}
|
||||
a.Cluster().SendClusterMessage(msg)
|
||||
}
|
||||
|
||||
@@ -241,7 +241,7 @@ func (a *App) invalidateCacheForChannelMembersNotifyProps(channelID string) {
|
||||
msg := &model.ClusterMessage{
|
||||
Event: model.ClusterEventInvalidateCacheForChannelMembersNotifyProps,
|
||||
SendType: model.ClusterSendBestEffort,
|
||||
Data: channelID,
|
||||
Data: []byte(channelID),
|
||||
}
|
||||
a.Cluster().SendClusterMessage(msg)
|
||||
}
|
||||
@@ -266,7 +266,7 @@ func (a *App) invalidateCacheForUserTeams(userID string) {
|
||||
msg := &model.ClusterMessage{
|
||||
Event: model.ClusterEventInvalidateCacheForUserTeams,
|
||||
SendType: model.ClusterSendBestEffort,
|
||||
Data: userID,
|
||||
Data: []byte(userID),
|
||||
}
|
||||
a.Cluster().SendClusterMessage(msg)
|
||||
}
|
||||
|
||||
@@ -3,11 +3,6 @@
|
||||
|
||||
package model
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
)
|
||||
|
||||
type ClusterEvent string
|
||||
|
||||
const (
|
||||
@@ -62,17 +57,6 @@ type ClusterMessage struct {
|
||||
Event ClusterEvent `json:"event"`
|
||||
SendType string `json:"-"`
|
||||
WaitForAllToSend bool `json:"-"`
|
||||
Data string `json:"data,omitempty"`
|
||||
Data []byte `json:"data,omitempty"`
|
||||
Props map[string]string `json:"props,omitempty"`
|
||||
}
|
||||
|
||||
func (o *ClusterMessage) ToJson() string {
|
||||
b, _ := json.Marshal(o)
|
||||
return string(b)
|
||||
}
|
||||
|
||||
func ClusterMessageFromJson(data io.Reader) *ClusterMessage {
|
||||
var o *ClusterMessage
|
||||
json.NewDecoder(data).Decode(&o)
|
||||
return o
|
||||
}
|
||||
|
||||
@@ -1,27 +0,0 @@
|
||||
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
|
||||
// See LICENSE.txt for license information.
|
||||
|
||||
package model
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestClusterMessage(t *testing.T) {
|
||||
m := ClusterMessage{
|
||||
Event: ClusterEventPublish,
|
||||
SendType: ClusterSendBestEffort,
|
||||
Data: "hello",
|
||||
}
|
||||
json := m.ToJson()
|
||||
result := ClusterMessageFromJson(strings.NewReader(json))
|
||||
|
||||
require.Equal(t, "hello", result.Data)
|
||||
|
||||
badresult := ClusterMessageFromJson(strings.NewReader("junk"))
|
||||
|
||||
require.Nil(t, badresult, "should not have parsed")
|
||||
}
|
||||
@@ -77,7 +77,7 @@ const (
|
||||
)
|
||||
|
||||
type WebSocketMessage interface {
|
||||
ToJson() string
|
||||
ToJson() []byte
|
||||
IsValid() bool
|
||||
EventType() string
|
||||
}
|
||||
@@ -199,9 +199,9 @@ func (ev *WebSocketEvent) EventType() string {
|
||||
return ev.event
|
||||
}
|
||||
|
||||
func (ev *WebSocketEvent) ToJson() string {
|
||||
func (ev *WebSocketEvent) ToJson() []byte {
|
||||
if ev.precomputedJSON != nil {
|
||||
return fmt.Sprintf(`{"event": %s, "data": %s, "broadcast": %s, "seq": %d}`, ev.precomputedJSON.Event, ev.precomputedJSON.Data, ev.precomputedJSON.Broadcast, ev.GetSequence())
|
||||
return []byte(fmt.Sprintf(`{"event": %s, "data": %s, "broadcast": %s, "seq": %d}`, ev.precomputedJSON.Event, ev.precomputedJSON.Data, ev.precomputedJSON.Broadcast, ev.GetSequence()))
|
||||
}
|
||||
b, _ := json.Marshal(webSocketEventJSON{
|
||||
ev.event,
|
||||
@@ -209,7 +209,7 @@ func (ev *WebSocketEvent) ToJson() string {
|
||||
ev.broadcast,
|
||||
ev.sequence,
|
||||
})
|
||||
return string(b)
|
||||
return b
|
||||
}
|
||||
|
||||
// Encode encodes the event to the given encoder.
|
||||
@@ -280,9 +280,9 @@ func (m *WebSocketResponse) EventType() string {
|
||||
return WebsocketEventResponse
|
||||
}
|
||||
|
||||
func (m *WebSocketResponse) ToJson() string {
|
||||
func (m *WebSocketResponse) ToJson() []byte {
|
||||
b, _ := json.Marshal(m)
|
||||
return string(b)
|
||||
return b
|
||||
}
|
||||
|
||||
func WebSocketResponseFromJson(data io.Reader) *WebSocketResponse {
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"bytes"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
@@ -20,7 +20,7 @@ func TestWebSocketEvent(t *testing.T) {
|
||||
}
|
||||
m.Add("user", user)
|
||||
json := m.ToJson()
|
||||
result := WebSocketEventFromJson(strings.NewReader(json))
|
||||
result := WebSocketEventFromJson(bytes.NewReader(json))
|
||||
|
||||
require.True(t, m.IsValid(), "should be valid")
|
||||
require.Equal(t, m.GetBroadcast().TeamId, result.GetBroadcast().TeamId, "Team ids do not match")
|
||||
@@ -73,10 +73,10 @@ func TestWebSocketEventImmutable(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestWebSocketEventFromJson(t *testing.T) {
|
||||
ev := WebSocketEventFromJson(strings.NewReader("junk"))
|
||||
ev := WebSocketEventFromJson(bytes.NewReader([]byte("junk")))
|
||||
require.Nil(t, ev, "should not have parsed")
|
||||
data := `{"event": "test", "data": {"key": "val"}, "seq": 45, "broadcast": {"user_id": "userid"}}`
|
||||
ev = WebSocketEventFromJson(strings.NewReader(data))
|
||||
data := []byte(`{"event": "test", "data": {"key": "val"}, "seq": 45, "broadcast": {"user_id": "userid"}}`)
|
||||
ev = WebSocketEventFromJson(bytes.NewReader(data))
|
||||
require.NotNil(t, ev, "should have parsed")
|
||||
require.Equal(t, ev.EventType(), "test")
|
||||
require.Equal(t, ev.GetSequence(), int64(45))
|
||||
@@ -89,11 +89,11 @@ func TestWebSocketResponse(t *testing.T) {
|
||||
e := NewWebSocketError(1, &AppError{})
|
||||
m.Add("RootId", NewId())
|
||||
json := m.ToJson()
|
||||
result := WebSocketResponseFromJson(strings.NewReader(json))
|
||||
result := WebSocketResponseFromJson(bytes.NewReader(json))
|
||||
json2 := e.ToJson()
|
||||
WebSocketResponseFromJson(strings.NewReader(json2))
|
||||
WebSocketResponseFromJson(bytes.NewReader(json2))
|
||||
|
||||
badresult := WebSocketResponseFromJson(strings.NewReader("junk"))
|
||||
badresult := WebSocketResponseFromJson(bytes.NewReader([]byte("junk")))
|
||||
require.Nil(t, badresult, "should not have parsed")
|
||||
|
||||
require.True(t, m.IsValid(), "should be valid")
|
||||
@@ -109,10 +109,10 @@ func TestWebSocketEvent_PrecomputeJSON(t *testing.T) {
|
||||
event.PrecomputeJSON()
|
||||
after := event.ToJson()
|
||||
|
||||
assert.JSONEq(t, before, after)
|
||||
assert.Equal(t, before, after)
|
||||
}
|
||||
|
||||
var stringSink string
|
||||
var stringSink []byte
|
||||
|
||||
func BenchmarkWebSocketEvent_ToJson(b *testing.B) {
|
||||
event := NewWebSocketEvent(WebsocketEventPosted, "foo", "bar", "baz", nil)
|
||||
|
||||
@@ -99,7 +99,7 @@ func (us *UserService) ClearUserSessionCache(userID string) {
|
||||
msg := &model.ClusterMessage{
|
||||
Event: model.ClusterEventClearSessionCacheForUser,
|
||||
SendType: model.ClusterSendReliable,
|
||||
Data: userID,
|
||||
Data: []byte(userID),
|
||||
}
|
||||
us.cluster.SendClusterMessage(msg)
|
||||
}
|
||||
|
||||
@@ -216,7 +216,7 @@ func (us *UserService) InvalidateCacheForUser(userID string) {
|
||||
msg := &model.ClusterMessage{
|
||||
Event: model.ClusterEventInvalidateCacheForUser,
|
||||
SendType: model.ClusterSendBestEffort,
|
||||
Data: userID,
|
||||
Data: []byte(userID),
|
||||
}
|
||||
us.cluster.SendClusterMessage(msg)
|
||||
}
|
||||
|
||||
@@ -4,6 +4,8 @@
|
||||
package localcachelayer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
||||
"github.com/mattermost/mattermost-server/v6/model"
|
||||
"github.com/mattermost/mattermost-server/v6/store"
|
||||
)
|
||||
@@ -14,34 +16,34 @@ type LocalCacheChannelStore struct {
|
||||
}
|
||||
|
||||
func (s *LocalCacheChannelStore) handleClusterInvalidateChannelMemberCounts(msg *model.ClusterMessage) {
|
||||
if msg.Data == ClearCacheMessageData {
|
||||
if bytes.Equal(msg.Data, clearCacheMessageData) {
|
||||
s.rootStore.channelMemberCountsCache.Purge()
|
||||
} else {
|
||||
s.rootStore.channelMemberCountsCache.Remove(msg.Data)
|
||||
s.rootStore.channelMemberCountsCache.Remove(string(msg.Data))
|
||||
}
|
||||
}
|
||||
|
||||
func (s *LocalCacheChannelStore) handleClusterInvalidateChannelPinnedPostCount(msg *model.ClusterMessage) {
|
||||
if msg.Data == ClearCacheMessageData {
|
||||
if bytes.Equal(msg.Data, clearCacheMessageData) {
|
||||
s.rootStore.channelPinnedPostCountsCache.Purge()
|
||||
} else {
|
||||
s.rootStore.channelPinnedPostCountsCache.Remove(msg.Data)
|
||||
s.rootStore.channelPinnedPostCountsCache.Remove(string(msg.Data))
|
||||
}
|
||||
}
|
||||
|
||||
func (s *LocalCacheChannelStore) handleClusterInvalidateChannelGuestCounts(msg *model.ClusterMessage) {
|
||||
if msg.Data == ClearCacheMessageData {
|
||||
if bytes.Equal(msg.Data, clearCacheMessageData) {
|
||||
s.rootStore.channelGuestCountCache.Purge()
|
||||
} else {
|
||||
s.rootStore.channelGuestCountCache.Remove(msg.Data)
|
||||
s.rootStore.channelGuestCountCache.Remove(string(msg.Data))
|
||||
}
|
||||
}
|
||||
|
||||
func (s *LocalCacheChannelStore) handleClusterInvalidateChannelById(msg *model.ClusterMessage) {
|
||||
if msg.Data == ClearCacheMessageData {
|
||||
if bytes.Equal(msg.Data, clearCacheMessageData) {
|
||||
s.rootStore.channelByIdCache.Purge()
|
||||
} else {
|
||||
s.rootStore.channelByIdCache.Remove(msg.Data)
|
||||
s.rootStore.channelByIdCache.Remove(string(msg.Data))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
package localcachelayer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
@@ -22,24 +23,24 @@ type LocalCacheEmojiStore struct {
|
||||
}
|
||||
|
||||
func (es *LocalCacheEmojiStore) handleClusterInvalidateEmojiById(msg *model.ClusterMessage) {
|
||||
if msg.Data == ClearCacheMessageData {
|
||||
if bytes.Equal(msg.Data, clearCacheMessageData) {
|
||||
es.rootStore.emojiCacheById.Purge()
|
||||
} else {
|
||||
es.emojiByIdMut.Lock()
|
||||
es.emojiByIdInvalidations[msg.Data] = true
|
||||
es.emojiByIdInvalidations[string(msg.Data)] = true
|
||||
es.emojiByIdMut.Unlock()
|
||||
es.rootStore.emojiCacheById.Remove(msg.Data)
|
||||
es.rootStore.emojiCacheById.Remove(string(msg.Data))
|
||||
}
|
||||
}
|
||||
|
||||
func (es *LocalCacheEmojiStore) handleClusterInvalidateEmojiIdByName(msg *model.ClusterMessage) {
|
||||
if msg.Data == ClearCacheMessageData {
|
||||
if bytes.Equal(msg.Data, clearCacheMessageData) {
|
||||
es.rootStore.emojiIdCacheByName.Purge()
|
||||
} else {
|
||||
es.emojiByNameMut.Lock()
|
||||
es.emojiByNameInvalidations[msg.Data] = true
|
||||
es.emojiByNameInvalidations[string(msg.Data)] = true
|
||||
es.emojiByNameMut.Unlock()
|
||||
es.rootStore.emojiIdCacheByName.Remove(msg.Data)
|
||||
es.rootStore.emojiIdCacheByName.Remove(string(msg.Data))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,8 @@
|
||||
package localcachelayer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
||||
"github.com/mattermost/mattermost-server/v6/model"
|
||||
"github.com/mattermost/mattermost-server/v6/store"
|
||||
)
|
||||
@@ -14,11 +16,11 @@ type LocalCacheFileInfoStore struct {
|
||||
}
|
||||
|
||||
func (s *LocalCacheFileInfoStore) handleClusterInvalidateFileInfo(msg *model.ClusterMessage) {
|
||||
if msg.Data == ClearCacheMessageData {
|
||||
if bytes.Equal(msg.Data, clearCacheMessageData) {
|
||||
s.rootStore.fileInfoCache.Purge()
|
||||
return
|
||||
}
|
||||
s.rootStore.fileInfoCache.Remove(msg.Data)
|
||||
s.rootStore.fileInfoCache.Remove(string(msg.Data))
|
||||
}
|
||||
|
||||
func (s LocalCacheFileInfoStore) GetForPost(postId string, readFromMaster, includeDeleted, allowFromCache bool) ([]*model.FileInfo, error) {
|
||||
|
||||
@@ -58,11 +58,11 @@ const (
|
||||
TeamCacheSize = 20000
|
||||
TeamCacheSec = 30 * 60
|
||||
|
||||
ClearCacheMessageData = ""
|
||||
|
||||
ChannelCacheSec = 15 * 60 // 15 mins
|
||||
)
|
||||
|
||||
var clearCacheMessageData = []byte("")
|
||||
|
||||
type LocalCacheStore struct {
|
||||
store.Store
|
||||
metrics einterfaces.MetricsInterface
|
||||
@@ -390,7 +390,7 @@ func (s *LocalCacheStore) doInvalidateCacheCluster(cache cache.Cache, key string
|
||||
msg := &model.ClusterMessage{
|
||||
Event: cache.GetInvalidateClusterEvent(),
|
||||
SendType: model.ClusterSendBestEffort,
|
||||
Data: key,
|
||||
Data: []byte(key),
|
||||
}
|
||||
s.cluster.SendClusterMessage(msg)
|
||||
}
|
||||
@@ -420,7 +420,7 @@ func (s *LocalCacheStore) doClearCacheCluster(cache cache.Cache) {
|
||||
msg := &model.ClusterMessage{
|
||||
Event: cache.GetInvalidateClusterEvent(),
|
||||
SendType: model.ClusterSendBestEffort,
|
||||
Data: ClearCacheMessageData,
|
||||
Data: clearCacheMessageData,
|
||||
}
|
||||
s.cluster.SendClusterMessage(msg)
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
package localcachelayer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -18,18 +19,18 @@ type LocalCachePostStore struct {
|
||||
}
|
||||
|
||||
func (s *LocalCachePostStore) handleClusterInvalidateLastPostTime(msg *model.ClusterMessage) {
|
||||
if msg.Data == ClearCacheMessageData {
|
||||
if bytes.Equal(msg.Data, clearCacheMessageData) {
|
||||
s.rootStore.lastPostTimeCache.Purge()
|
||||
} else {
|
||||
s.rootStore.lastPostTimeCache.Remove(msg.Data)
|
||||
s.rootStore.lastPostTimeCache.Remove(string(msg.Data))
|
||||
}
|
||||
}
|
||||
|
||||
func (s *LocalCachePostStore) handleClusterInvalidateLastPosts(msg *model.ClusterMessage) {
|
||||
if msg.Data == ClearCacheMessageData {
|
||||
if bytes.Equal(msg.Data, clearCacheMessageData) {
|
||||
s.rootStore.postLastPostsCache.Purge()
|
||||
} else {
|
||||
s.rootStore.postLastPostsCache.Remove(msg.Data)
|
||||
s.rootStore.postLastPostsCache.Remove(string(msg.Data))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,8 @@
|
||||
package localcachelayer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
||||
"github.com/mattermost/mattermost-server/v6/model"
|
||||
"github.com/mattermost/mattermost-server/v6/store"
|
||||
)
|
||||
@@ -14,10 +16,10 @@ type LocalCacheReactionStore struct {
|
||||
}
|
||||
|
||||
func (s *LocalCacheReactionStore) handleClusterInvalidateReaction(msg *model.ClusterMessage) {
|
||||
if msg.Data == ClearCacheMessageData {
|
||||
if bytes.Equal(msg.Data, clearCacheMessageData) {
|
||||
s.rootStore.reactionCache.Purge()
|
||||
} else {
|
||||
s.rootStore.reactionCache.Remove(msg.Data)
|
||||
s.rootStore.reactionCache.Remove(string(msg.Data))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
package localcachelayer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"sort"
|
||||
"strings"
|
||||
@@ -18,18 +19,18 @@ type LocalCacheRoleStore struct {
|
||||
}
|
||||
|
||||
func (s *LocalCacheRoleStore) handleClusterInvalidateRole(msg *model.ClusterMessage) {
|
||||
if msg.Data == ClearCacheMessageData {
|
||||
if bytes.Equal(msg.Data, clearCacheMessageData) {
|
||||
s.rootStore.roleCache.Purge()
|
||||
} else {
|
||||
s.rootStore.roleCache.Remove(msg.Data)
|
||||
s.rootStore.roleCache.Remove(string(msg.Data))
|
||||
}
|
||||
}
|
||||
|
||||
func (s *LocalCacheRoleStore) handleClusterInvalidateRolePermissions(msg *model.ClusterMessage) {
|
||||
if msg.Data == ClearCacheMessageData {
|
||||
if bytes.Equal(msg.Data, clearCacheMessageData) {
|
||||
s.rootStore.rolePermissionsCache.Purge()
|
||||
} else {
|
||||
s.rootStore.rolePermissionsCache.Remove(msg.Data)
|
||||
s.rootStore.rolePermissionsCache.Remove(string(msg.Data))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,8 @@
|
||||
package localcachelayer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
||||
"github.com/mattermost/mattermost-server/v6/model"
|
||||
"github.com/mattermost/mattermost-server/v6/store"
|
||||
)
|
||||
@@ -14,10 +16,10 @@ type LocalCacheSchemeStore struct {
|
||||
}
|
||||
|
||||
func (s *LocalCacheSchemeStore) handleClusterInvalidateScheme(msg *model.ClusterMessage) {
|
||||
if msg.Data == ClearCacheMessageData {
|
||||
if bytes.Equal(msg.Data, clearCacheMessageData) {
|
||||
s.rootStore.schemeCache.Purge()
|
||||
} else {
|
||||
s.rootStore.schemeCache.Remove(msg.Data)
|
||||
s.rootStore.schemeCache.Remove(string(msg.Data))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,8 @@
|
||||
package localcachelayer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
||||
"github.com/mattermost/mattermost-server/v6/model"
|
||||
"github.com/mattermost/mattermost-server/v6/store"
|
||||
)
|
||||
@@ -14,10 +16,10 @@ type LocalCacheTeamStore struct {
|
||||
}
|
||||
|
||||
func (s *LocalCacheTeamStore) handleClusterInvalidateTeam(msg *model.ClusterMessage) {
|
||||
if msg.Data == ClearCacheMessageData {
|
||||
if bytes.Equal(msg.Data, clearCacheMessageData) {
|
||||
s.rootStore.teamAllTeamIdsForUserCache.Purge()
|
||||
} else {
|
||||
s.rootStore.teamAllTeamIdsForUserCache.Remove(msg.Data)
|
||||
s.rootStore.teamAllTeamIdsForUserCache.Remove(string(msg.Data))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,8 @@
|
||||
package localcachelayer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
||||
"github.com/mattermost/mattermost-server/v6/model"
|
||||
"github.com/mattermost/mattermost-server/v6/store"
|
||||
)
|
||||
@@ -18,10 +20,10 @@ type LocalCacheTermsOfServiceStore struct {
|
||||
}
|
||||
|
||||
func (s *LocalCacheTermsOfServiceStore) handleClusterInvalidateTermsOfService(msg *model.ClusterMessage) {
|
||||
if msg.Data == ClearCacheMessageData {
|
||||
if bytes.Equal(msg.Data, clearCacheMessageData) {
|
||||
s.rootStore.termsOfServiceCache.Purge()
|
||||
} else {
|
||||
s.rootStore.termsOfServiceCache.Remove(msg.Data)
|
||||
s.rootStore.termsOfServiceCache.Remove(string(msg.Data))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
package localcachelayer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"sort"
|
||||
"sync"
|
||||
@@ -21,21 +22,21 @@ type LocalCacheUserStore struct {
|
||||
}
|
||||
|
||||
func (s *LocalCacheUserStore) handleClusterInvalidateScheme(msg *model.ClusterMessage) {
|
||||
if msg.Data == ClearCacheMessageData {
|
||||
if bytes.Equal(msg.Data, clearCacheMessageData) {
|
||||
s.rootStore.userProfileByIdsCache.Purge()
|
||||
} else {
|
||||
s.userProfileByIdsMut.Lock()
|
||||
s.userProfileByIdsInvalidations[msg.Data] = true
|
||||
s.userProfileByIdsInvalidations[string(msg.Data)] = true
|
||||
s.userProfileByIdsMut.Unlock()
|
||||
s.rootStore.userProfileByIdsCache.Remove(msg.Data)
|
||||
s.rootStore.userProfileByIdsCache.Remove(string(msg.Data))
|
||||
}
|
||||
}
|
||||
|
||||
func (s *LocalCacheUserStore) handleClusterInvalidateProfilesInChannel(msg *model.ClusterMessage) {
|
||||
if msg.Data == ClearCacheMessageData {
|
||||
if bytes.Equal(msg.Data, clearCacheMessageData) {
|
||||
s.rootStore.profilesInChannelCache.Purge()
|
||||
} else {
|
||||
s.rootStore.profilesInChannelCache.Remove(msg.Data)
|
||||
s.rootStore.profilesInChannelCache.Remove(string(msg.Data))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,8 @@
|
||||
package localcachelayer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
||||
"github.com/mattermost/mattermost-server/v6/model"
|
||||
"github.com/mattermost/mattermost-server/v6/store"
|
||||
)
|
||||
@@ -14,10 +16,10 @@ type LocalCacheWebhookStore struct {
|
||||
}
|
||||
|
||||
func (s *LocalCacheWebhookStore) handleClusterInvalidateWebhook(msg *model.ClusterMessage) {
|
||||
if msg.Data == ClearCacheMessageData {
|
||||
if bytes.Equal(msg.Data, clearCacheMessageData) {
|
||||
s.rootStore.webhookCache.Purge()
|
||||
} else {
|
||||
s.rootStore.webhookCache.Remove(msg.Data)
|
||||
s.rootStore.webhookCache.Remove(string(msg.Data))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user