mirror of
https://github.com/mattermost/mattermost.git
synced 2025-02-25 18:55:24 -06:00
MM-56879: Migrate caches from store layer to cache layer (#26255)
There were 3 remaining caches which were there in the store layer. We migrate them to make the store layer fully free from any caches. https://mattermost.atlassian.net/browse/MM-56879 ```release-note NONE ``` Co-authored-by: Mattermost Build <build@mattermost.com>
This commit is contained in:
@@ -131,7 +131,7 @@ func TestCreatePost(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
// Message with no channel mentions should result in no ephemeral message
|
||||
timeout := time.After(2 * time.Second)
|
||||
timeout := time.After(5 * time.Second)
|
||||
waiting := true
|
||||
for waiting {
|
||||
select {
|
||||
@@ -157,7 +157,7 @@ func TestCreatePost(t *testing.T) {
|
||||
_, _, err = client.CreatePost(context.Background(), post)
|
||||
require.NoError(t, err)
|
||||
|
||||
timeout = time.After(2 * time.Second)
|
||||
timeout = time.After(5 * time.Second)
|
||||
eventsToGo := 3 // 3 Posts created with @ mentions should result in 3 websocket events
|
||||
for eventsToGo > 0 {
|
||||
select {
|
||||
@@ -1081,7 +1081,7 @@ func TestCreatePostSendOutOfChannelMentions(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
CheckCreatedStatus(t, resp)
|
||||
|
||||
timeout := time.After(2 * time.Second)
|
||||
timeout := time.After(5 * time.Second)
|
||||
waiting := true
|
||||
for waiting {
|
||||
select {
|
||||
@@ -1100,7 +1100,7 @@ func TestCreatePostSendOutOfChannelMentions(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
CheckCreatedStatus(t, resp)
|
||||
|
||||
timeout = time.After(2 * time.Second)
|
||||
timeout = time.After(5 * time.Second)
|
||||
waiting = true
|
||||
for waiting {
|
||||
select {
|
||||
@@ -2796,7 +2796,7 @@ func TestDeletePostEvent(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
received = true
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
case <-time.After(5 * time.Second):
|
||||
exit = true
|
||||
}
|
||||
if exit {
|
||||
@@ -3594,7 +3594,7 @@ func TestSetPostUnreadWithoutCollapsedThreads(t *testing.T) {
|
||||
caught = true
|
||||
data = ev.GetData()
|
||||
}
|
||||
case <-time.After(1 * time.Second):
|
||||
case <-time.After(5 * time.Second):
|
||||
exit = true
|
||||
}
|
||||
if exit {
|
||||
@@ -3866,7 +3866,7 @@ func TestCreatePostNotificationsWithCRT(t *testing.T) {
|
||||
require.EqualValues(t, "[\""+th.BasicUser.Id+"\"]", users)
|
||||
}
|
||||
}
|
||||
case <-time.After(1 * time.Second):
|
||||
case <-time.After(5 * time.Second):
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -3980,7 +3980,7 @@ func TestPostReminder(t *testing.T) {
|
||||
require.Equal(t, th.BasicTeam.Name, parsedPost.GetProp("team_name").(string))
|
||||
return
|
||||
}
|
||||
case <-time.After(1 * time.Second):
|
||||
case <-time.After(5 * time.Second):
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1417,7 +1417,7 @@ func (a *App) getChannelsForPosts(teams map[string]*model.Team, data []*imports.
|
||||
channelName := strings.ToLower(*postData.Channel)
|
||||
if channel, ok := teamChannels[teamName][channelName]; !ok || channel == nil {
|
||||
var err error
|
||||
channel, err = a.Srv().Store().Channel().GetByName(teams[teamName].Id, *postData.Channel, true)
|
||||
channel, err = a.Srv().Store().Channel().GetByNameIncludeDeleted(teams[teamName].Id, *postData.Channel, true)
|
||||
if err != nil {
|
||||
return nil, model.NewAppError("BulkImport", "app.import.import_post.channel_not_found.error", map[string]any{"ChannelName": *postData.Channel}, "", http.StatusBadRequest).Wrap(err)
|
||||
}
|
||||
|
||||
@@ -17,9 +17,6 @@ func (ps *PlatformService) RegisterClusterHandlers() {
|
||||
ps.clusterIFace.RegisterClusterMessageHandler(model.ClusterEventPublish, ps.ClusterPublishHandler)
|
||||
ps.clusterIFace.RegisterClusterMessageHandler(model.ClusterEventUpdateStatus, ps.ClusterUpdateStatusHandler)
|
||||
ps.clusterIFace.RegisterClusterMessageHandler(model.ClusterEventInvalidateAllCaches, ps.ClusterInvalidateAllCachesHandler)
|
||||
ps.clusterIFace.RegisterClusterMessageHandler(model.ClusterEventInvalidateCacheForChannelMembersNotifyProps, ps.clusterInvalidateCacheForChannelMembersNotifyPropHandler)
|
||||
ps.clusterIFace.RegisterClusterMessageHandler(model.ClusterEventInvalidateCacheForChannelByName, ps.clusterInvalidateCacheForChannelByNameHandler)
|
||||
ps.clusterIFace.RegisterClusterMessageHandler(model.ClusterEventInvalidateCacheForUser, ps.clusterInvalidateCacheForUserHandler)
|
||||
ps.clusterIFace.RegisterClusterMessageHandler(model.ClusterEventInvalidateCacheForUserTeams, ps.clusterInvalidateCacheForUserTeamsHandler)
|
||||
ps.clusterIFace.RegisterClusterMessageHandler(model.ClusterEventBusyStateChanged, ps.clusterBusyStateChgHandler)
|
||||
ps.clusterIFace.RegisterClusterMessageHandler(model.ClusterEventClearSessionCacheForUser, ps.clusterClearSessionCacheForUserHandler)
|
||||
@@ -70,18 +67,6 @@ func (ps *PlatformService) ClusterInvalidateAllCachesHandler(msg *model.ClusterM
|
||||
ps.InvalidateAllCachesSkipSend()
|
||||
}
|
||||
|
||||
func (ps *PlatformService) clusterInvalidateCacheForChannelMembersNotifyPropHandler(msg *model.ClusterMessage) {
|
||||
ps.invalidateCacheForChannelMembersNotifyPropsSkipClusterSend(string(msg.Data))
|
||||
}
|
||||
|
||||
func (ps *PlatformService) clusterInvalidateCacheForChannelByNameHandler(msg *model.ClusterMessage) {
|
||||
ps.invalidateCacheForChannelByNameSkipClusterSend(msg.Props["id"], msg.Props["name"])
|
||||
}
|
||||
|
||||
func (ps *PlatformService) clusterInvalidateCacheForUserHandler(msg *model.ClusterMessage) {
|
||||
ps.InvalidateCacheForUserSkipClusterSend(string(msg.Data))
|
||||
}
|
||||
|
||||
func (ps *PlatformService) clusterInvalidateCacheForUserTeamsHandler(msg *model.ClusterMessage) {
|
||||
ps.invalidateWebConnSessionCacheForUser(string(msg.Data))
|
||||
}
|
||||
@@ -118,23 +103,6 @@ func (ps *PlatformService) clusterBusyStateChgHandler(msg *model.ClusterMessage)
|
||||
}
|
||||
}
|
||||
|
||||
func (ps *PlatformService) invalidateCacheForChannelMembersNotifyPropsSkipClusterSend(channelID string) {
|
||||
ps.Store.Channel().InvalidateCacheForChannelMembersNotifyProps(channelID)
|
||||
}
|
||||
|
||||
func (ps *PlatformService) invalidateCacheForChannelByNameSkipClusterSend(teamID, name string) {
|
||||
if teamID == "" {
|
||||
teamID = "dm"
|
||||
}
|
||||
|
||||
ps.Store.Channel().InvalidateChannelByName(teamID, name)
|
||||
}
|
||||
|
||||
func (ps *PlatformService) InvalidateCacheForUserSkipClusterSend(userID string) {
|
||||
ps.Store.Channel().InvalidateAllChannelMembersForUser(userID)
|
||||
ps.invalidateWebConnSessionCacheForUser(userID)
|
||||
}
|
||||
|
||||
func (ps *PlatformService) invalidateWebConnSessionCacheForUser(userID string) {
|
||||
hub := ps.GetHubForUserId(userID)
|
||||
if hub != nil {
|
||||
|
||||
@@ -165,24 +165,12 @@ func (ps *PlatformService) HubUnregister(webConn *WebConn) {
|
||||
|
||||
func (ps *PlatformService) InvalidateCacheForChannel(channel *model.Channel) {
|
||||
ps.Store.Channel().InvalidateChannel(channel.Id)
|
||||
ps.invalidateCacheForChannelByNameSkipClusterSend(channel.TeamId, channel.Name)
|
||||
|
||||
if ps.clusterIFace != nil {
|
||||
nameMsg := &model.ClusterMessage{
|
||||
Event: model.ClusterEventInvalidateCacheForChannelByName,
|
||||
SendType: model.ClusterSendBestEffort,
|
||||
Props: make(map[string]string),
|
||||
}
|
||||
|
||||
nameMsg.Props["name"] = channel.Name
|
||||
if channel.TeamId == "" {
|
||||
nameMsg.Props["id"] = "dm"
|
||||
} else {
|
||||
nameMsg.Props["id"] = channel.TeamId
|
||||
}
|
||||
|
||||
ps.clusterIFace.SendClusterMessage(nameMsg)
|
||||
teamID := channel.TeamId
|
||||
if teamID == "" {
|
||||
teamID = "dm"
|
||||
}
|
||||
|
||||
ps.Store.Channel().InvalidateChannelByName(teamID, channel.Name)
|
||||
}
|
||||
|
||||
func (ps *PlatformService) InvalidateCacheForChannelMembers(channelID string) {
|
||||
@@ -192,16 +180,7 @@ func (ps *PlatformService) InvalidateCacheForChannelMembers(channelID string) {
|
||||
}
|
||||
|
||||
func (ps *PlatformService) InvalidateCacheForChannelMembersNotifyProps(channelID string) {
|
||||
ps.invalidateCacheForChannelMembersNotifyPropsSkipClusterSend(channelID)
|
||||
|
||||
if ps.clusterIFace != nil {
|
||||
msg := &model.ClusterMessage{
|
||||
Event: model.ClusterEventInvalidateCacheForChannelMembersNotifyProps,
|
||||
SendType: model.ClusterSendBestEffort,
|
||||
Data: []byte(channelID),
|
||||
}
|
||||
ps.clusterIFace.SendClusterMessage(msg)
|
||||
}
|
||||
ps.Store.Channel().InvalidateCacheForChannelMembersNotifyProps(channelID)
|
||||
}
|
||||
|
||||
func (ps *PlatformService) InvalidateCacheForChannelPosts(channelID string) {
|
||||
@@ -210,19 +189,11 @@ func (ps *PlatformService) InvalidateCacheForChannelPosts(channelID string) {
|
||||
}
|
||||
|
||||
func (ps *PlatformService) InvalidateCacheForUser(userID string) {
|
||||
ps.InvalidateCacheForUserSkipClusterSend(userID)
|
||||
ps.Store.Channel().InvalidateAllChannelMembersForUser(userID)
|
||||
ps.invalidateWebConnSessionCacheForUser(userID)
|
||||
|
||||
ps.Store.User().InvalidateProfilesInChannelCacheByUser(userID)
|
||||
ps.Store.User().InvalidateProfileCacheForUser(userID)
|
||||
|
||||
if ps.clusterIFace != nil {
|
||||
msg := &model.ClusterMessage{
|
||||
Event: model.ClusterEventInvalidateCacheForUser,
|
||||
SendType: model.ClusterSendBestEffort,
|
||||
Data: []byte(userID),
|
||||
}
|
||||
ps.clusterIFace.SendClusterMessage(msg)
|
||||
}
|
||||
}
|
||||
|
||||
func (ps *PlatformService) InvalidateCacheForUserTeams(userID string) {
|
||||
|
||||
@@ -1791,7 +1791,7 @@ func TestHookPreferencesHaveChanged(t *testing.T) {
|
||||
|
||||
require.Nil(t, appErr)
|
||||
assert.Equal(t, "test_value_third", preference.Value)
|
||||
}, 1*time.Second, 10*time.Millisecond)
|
||||
}, 5*time.Second, 100*time.Millisecond)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1854,11 +1854,12 @@ func TestChannelHasBeenCreated(t *testing.T) {
|
||||
posts, appErr := th.App.GetPosts(channel.Id, 0, 1)
|
||||
|
||||
require.Nil(t, appErr)
|
||||
assert.True(t, len(posts.Order) > 0)
|
||||
|
||||
post := posts.Posts[posts.Order[0]]
|
||||
assert.Equal(t, channel.Id, post.ChannelId)
|
||||
assert.Equal(t, "ChannelHasBeenCreated has been called for "+channel.Id, post.Message)
|
||||
}, 1*time.Second, 10*time.Millisecond)
|
||||
}, 5*time.Second, 100*time.Millisecond)
|
||||
})
|
||||
|
||||
t.Run("should call hook when a DM is created", func(t *testing.T) {
|
||||
@@ -1879,11 +1880,11 @@ func TestChannelHasBeenCreated(t *testing.T) {
|
||||
posts, appErr := th.App.GetPosts(channel.Id, 0, 1)
|
||||
|
||||
require.Nil(t, appErr)
|
||||
|
||||
assert.True(t, len(posts.Order) > 0)
|
||||
post := posts.Posts[posts.Order[0]]
|
||||
assert.Equal(t, channel.Id, post.ChannelId)
|
||||
assert.Equal(t, "ChannelHasBeenCreated has been called for "+channel.Id, post.Message)
|
||||
}, 1*time.Second, 10*time.Millisecond)
|
||||
}, 5*time.Second, 100*time.Millisecond)
|
||||
})
|
||||
|
||||
t.Run("should call hook when a GM is created", func(t *testing.T) {
|
||||
@@ -1905,11 +1906,11 @@ func TestChannelHasBeenCreated(t *testing.T) {
|
||||
posts, appErr := th.App.GetPosts(channel.Id, 0, 1)
|
||||
|
||||
require.Nil(t, appErr)
|
||||
|
||||
assert.True(t, len(posts.Order) > 0)
|
||||
post := posts.Posts[posts.Order[0]]
|
||||
assert.Equal(t, channel.Id, post.ChannelId)
|
||||
assert.Equal(t, "ChannelHasBeenCreated has been called for "+channel.Id, post.Message)
|
||||
}, 1*time.Second, 10*time.Millisecond)
|
||||
}, 5*time.Second, 100*time.Millisecond)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1987,9 +1988,9 @@ func TestUserHasJoinedChannel(t *testing.T) {
|
||||
posts, appErr := th.App.GetPosts(channel.Id, 0, 1)
|
||||
|
||||
require.Nil(t, appErr)
|
||||
|
||||
assert.True(t, len(posts.Order) > 0)
|
||||
assert.Equal(t, fmt.Sprintf("Test: User %s joined %s", user2.Id, channel.Id), posts.Posts[posts.Order[0]].Message)
|
||||
}, 1*time.Second, 10*time.Millisecond)
|
||||
}, 5*time.Second, 100*time.Millisecond)
|
||||
})
|
||||
|
||||
t.Run("should call hook when a user is added to an existing channel", func(t *testing.T) {
|
||||
@@ -2023,7 +2024,6 @@ func TestUserHasJoinedChannel(t *testing.T) {
|
||||
// Typically, the post we're looking for will be the latest, but there's a race between the plugin and
|
||||
// "User has joined the channel" post which means the plugin post may not the the latest one
|
||||
posts, appErr := th.App.GetPosts(channel.Id, 0, 10)
|
||||
|
||||
require.Nil(t, appErr)
|
||||
|
||||
for _, postId := range posts.Order {
|
||||
@@ -2035,7 +2035,7 @@ func TestUserHasJoinedChannel(t *testing.T) {
|
||||
}
|
||||
|
||||
return false
|
||||
}, 1*time.Second, 10*time.Millisecond)
|
||||
}, 5*time.Second, 100*time.Millisecond)
|
||||
})
|
||||
|
||||
t.Run("should not call hook when a regular channel is created", func(t *testing.T) {
|
||||
|
||||
@@ -286,7 +286,8 @@ func TestUpdateIncomingWebhook(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestCreateWebhookPost(t *testing.T) {
|
||||
th := Setup(t).InitBasic()
|
||||
testCluster := &testlib.FakeClusterInterface{}
|
||||
th := SetupWithClusterMock(t, testCluster).InitBasic()
|
||||
defer th.TearDown()
|
||||
|
||||
th.App.UpdateConfig(func(cfg *model.Config) { *cfg.ServiceSettings.EnableIncomingWebhooks = true })
|
||||
@@ -365,20 +366,18 @@ Date: Thu Mar 1 19:46:48 2018 +0300
|
||||
assert.Equal(t, expectedText, post.Message)
|
||||
|
||||
t.Run("should set webhook creator status to online", func(t *testing.T) {
|
||||
testCluster := &testlib.FakeClusterInterface{}
|
||||
th.Server.Platform().SetCluster(testCluster)
|
||||
defer th.Server.Platform().SetCluster(nil)
|
||||
|
||||
testCluster.ClearMessages()
|
||||
_, appErr := th.App.CreateWebhookPost(th.Context, hook.UserId, th.BasicChannel, "text", "", "", "", model.StringInterface{}, model.PostTypeDefault, "")
|
||||
require.Nil(t, appErr)
|
||||
|
||||
msgs := testCluster.GetMessages()
|
||||
// The first message is ClusterEventInvalidateCacheForChannelByName so we skip it
|
||||
ev, err1 := model.WebSocketEventFromJSON(bytes.NewReader(msgs[1].Data))
|
||||
require.NoError(t, err1)
|
||||
require.Equal(t, model.WebsocketEventPosted, ev.EventType())
|
||||
assert.Equal(t, false, ev.GetData()["set_online"])
|
||||
msgs := testCluster.SelectMessages(func(msg *model.ClusterMessage) bool {
|
||||
event, err := model.WebSocketEventFromJSON(bytes.NewReader(msg.Data))
|
||||
return err == nil && event.EventType() == model.WebsocketEventPosted
|
||||
})
|
||||
require.Len(t, msgs, 1)
|
||||
// We know there will be no error from the filter condition.
|
||||
event, _ := model.WebSocketEventFromJSON(bytes.NewReader(msgs[0].Data))
|
||||
assert.Equal(t, false, event.GetData()["set_online"])
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -48,48 +48,111 @@ func (s *LocalCacheChannelStore) handleClusterInvalidateChannelById(msg *model.C
|
||||
}
|
||||
}
|
||||
|
||||
func (s *LocalCacheChannelStore) handleClusterInvalidateChannelForUser(msg *model.ClusterMessage) {
|
||||
if bytes.Equal(msg.Data, clearCacheMessageData) {
|
||||
s.rootStore.channelMembersForUserCache.Purge()
|
||||
} else {
|
||||
s.rootStore.channelMembersForUserCache.Remove(string(msg.Data))
|
||||
}
|
||||
}
|
||||
|
||||
func (s *LocalCacheChannelStore) handleClusterInvalidateChannelMembersNotifyProps(msg *model.ClusterMessage) {
|
||||
if bytes.Equal(msg.Data, clearCacheMessageData) {
|
||||
s.rootStore.channelMembersNotifyPropsCache.Purge()
|
||||
} else {
|
||||
s.rootStore.channelMembersNotifyPropsCache.Remove(string(msg.Data))
|
||||
}
|
||||
}
|
||||
|
||||
func (s *LocalCacheChannelStore) handleClusterInvalidateChannelByName(msg *model.ClusterMessage) {
|
||||
if bytes.Equal(msg.Data, clearCacheMessageData) {
|
||||
s.rootStore.channelByNameCache.Purge()
|
||||
} else {
|
||||
s.rootStore.channelByNameCache.Remove(string(msg.Data))
|
||||
}
|
||||
}
|
||||
|
||||
func (s LocalCacheChannelStore) ClearMembersForUserCache() {
|
||||
s.rootStore.doClearCacheCluster(s.rootStore.channelMembersForUserCache)
|
||||
}
|
||||
|
||||
func (s LocalCacheChannelStore) ClearCaches() {
|
||||
s.rootStore.doClearCacheCluster(s.rootStore.channelMemberCountsCache)
|
||||
s.rootStore.doClearCacheCluster(s.rootStore.channelPinnedPostCountsCache)
|
||||
s.rootStore.doClearCacheCluster(s.rootStore.channelGuestCountCache)
|
||||
s.rootStore.doClearCacheCluster(s.rootStore.channelByIdCache)
|
||||
s.ChannelStore.ClearCaches()
|
||||
s.rootStore.doClearCacheCluster(s.rootStore.channelMembersForUserCache)
|
||||
s.rootStore.doClearCacheCluster(s.rootStore.channelMembersNotifyPropsCache)
|
||||
s.rootStore.doClearCacheCluster(s.rootStore.channelByNameCache)
|
||||
if s.rootStore.metrics != nil {
|
||||
s.rootStore.metrics.IncrementMemCacheInvalidationCounter("Channel Pinned Post Counts - Purge")
|
||||
s.rootStore.metrics.IncrementMemCacheInvalidationCounter("Channel Member Counts - Purge")
|
||||
s.rootStore.metrics.IncrementMemCacheInvalidationCounter("Channel Guest Count - Purge")
|
||||
s.rootStore.metrics.IncrementMemCacheInvalidationCounter("Channel - Purge")
|
||||
s.rootStore.metrics.IncrementMemCacheInvalidationCounter(s.rootStore.channelMemberCountsCache.Name())
|
||||
s.rootStore.metrics.IncrementMemCacheInvalidationCounter(s.rootStore.channelPinnedPostCountsCache.Name())
|
||||
s.rootStore.metrics.IncrementMemCacheInvalidationCounter(s.rootStore.channelGuestCountCache.Name())
|
||||
s.rootStore.metrics.IncrementMemCacheInvalidationCounter(s.rootStore.channelByIdCache.Name())
|
||||
s.rootStore.metrics.IncrementMemCacheInvalidationCounter(s.rootStore.channelMembersForUserCache.Name())
|
||||
s.rootStore.metrics.IncrementMemCacheInvalidationCounter(s.rootStore.channelMembersNotifyPropsCache.Name())
|
||||
s.rootStore.metrics.IncrementMemCacheInvalidationCounter(s.rootStore.channelByNameCache.Name())
|
||||
}
|
||||
}
|
||||
|
||||
func (s LocalCacheChannelStore) InvalidatePinnedPostCount(channelId string) {
|
||||
s.rootStore.doInvalidateCacheCluster(s.rootStore.channelPinnedPostCountsCache, channelId)
|
||||
s.rootStore.doInvalidateCacheCluster(s.rootStore.channelPinnedPostCountsCache, channelId, nil)
|
||||
if s.rootStore.metrics != nil {
|
||||
s.rootStore.metrics.IncrementMemCacheInvalidationCounter("Channel Pinned Post Counts - Remove by ChannelId")
|
||||
s.rootStore.metrics.IncrementMemCacheInvalidationCounter(s.rootStore.channelPinnedPostCountsCache.Name())
|
||||
}
|
||||
}
|
||||
|
||||
func (s LocalCacheChannelStore) InvalidateMemberCount(channelId string) {
|
||||
s.rootStore.doInvalidateCacheCluster(s.rootStore.channelMemberCountsCache, channelId)
|
||||
s.rootStore.doInvalidateCacheCluster(s.rootStore.channelMemberCountsCache, channelId, nil)
|
||||
if s.rootStore.metrics != nil {
|
||||
s.rootStore.metrics.IncrementMemCacheInvalidationCounter("Channel Member Counts - Remove by ChannelId")
|
||||
s.rootStore.metrics.IncrementMemCacheInvalidationCounter(s.rootStore.channelMemberCountsCache.Name())
|
||||
}
|
||||
}
|
||||
|
||||
func (s LocalCacheChannelStore) InvalidateGuestCount(channelId string) {
|
||||
s.rootStore.doInvalidateCacheCluster(s.rootStore.channelGuestCountCache, channelId)
|
||||
s.rootStore.doInvalidateCacheCluster(s.rootStore.channelGuestCountCache, channelId, nil)
|
||||
if s.rootStore.metrics != nil {
|
||||
s.rootStore.metrics.IncrementMemCacheInvalidationCounter("Channel Guests Count - Remove by channelId")
|
||||
}
|
||||
}
|
||||
|
||||
func (s LocalCacheChannelStore) InvalidateChannel(channelId string) {
|
||||
s.rootStore.doInvalidateCacheCluster(s.rootStore.channelByIdCache, channelId)
|
||||
s.rootStore.doInvalidateCacheCluster(s.rootStore.channelByIdCache, channelId, nil)
|
||||
if s.rootStore.metrics != nil {
|
||||
s.rootStore.metrics.IncrementMemCacheInvalidationCounter("Channel - Remove by ChannelId")
|
||||
}
|
||||
}
|
||||
|
||||
func (s LocalCacheChannelStore) InvalidateAllChannelMembersForUser(userId string) {
|
||||
s.rootStore.doInvalidateCacheCluster(s.rootStore.channelMembersForUserCache, userId, nil)
|
||||
s.rootStore.doInvalidateCacheCluster(s.rootStore.channelMembersForUserCache, userId+"_deleted", nil)
|
||||
if s.rootStore.metrics != nil {
|
||||
s.rootStore.metrics.IncrementMemCacheInvalidationCounter(s.rootStore.channelMembersForUserCache.Name())
|
||||
}
|
||||
}
|
||||
|
||||
func (s LocalCacheChannelStore) InvalidateCacheForChannelMembersNotifyProps(channelId string) {
|
||||
s.rootStore.doInvalidateCacheCluster(s.rootStore.channelMembersNotifyPropsCache, channelId, nil)
|
||||
if s.rootStore.metrics != nil {
|
||||
s.rootStore.metrics.IncrementMemCacheInvalidationCounter(s.rootStore.channelMembersNotifyPropsCache.Name())
|
||||
}
|
||||
}
|
||||
|
||||
func (s LocalCacheChannelStore) InvalidateChannelByName(teamId, name string) {
|
||||
props := make(map[string]string)
|
||||
props["name"] = name
|
||||
if teamId == "" {
|
||||
props["id"] = "dm"
|
||||
} else {
|
||||
props["id"] = teamId
|
||||
}
|
||||
|
||||
s.rootStore.doInvalidateCacheCluster(s.rootStore.channelByNameCache, teamId+name, props)
|
||||
if s.rootStore.metrics != nil {
|
||||
s.rootStore.metrics.IncrementMemCacheInvalidationCounter(s.rootStore.channelByNameCache.Name())
|
||||
}
|
||||
}
|
||||
|
||||
func (s LocalCacheChannelStore) GetMemberCount(channelId string, allowFromCache bool) (int64, error) {
|
||||
if allowFromCache {
|
||||
var count int64
|
||||
@@ -205,6 +268,141 @@ func (s LocalCacheChannelStore) GetMany(ids []string, allowFromCache bool) (mode
|
||||
return append(foundChannels, channels...), nil
|
||||
}
|
||||
|
||||
func (s LocalCacheChannelStore) GetAllChannelMembersForUser(userId string, allowFromCache bool, includeDeleted bool) (map[string]string, error) {
|
||||
cache_key := userId
|
||||
if includeDeleted {
|
||||
cache_key += "_deleted"
|
||||
}
|
||||
if allowFromCache {
|
||||
ids := make(map[string]string)
|
||||
if err := s.rootStore.doStandardReadCache(s.rootStore.channelMembersForUserCache, cache_key, &ids); err == nil {
|
||||
return ids, nil
|
||||
}
|
||||
}
|
||||
|
||||
ids, err := s.ChannelStore.GetAllChannelMembersForUser(userId, allowFromCache, includeDeleted)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if allowFromCache {
|
||||
s.rootStore.doStandardAddToCache(s.rootStore.channelMembersForUserCache, cache_key, ids)
|
||||
}
|
||||
|
||||
return ids, nil
|
||||
}
|
||||
|
||||
func (s LocalCacheChannelStore) GetAllChannelMembersNotifyPropsForChannel(channelId string, allowFromCache bool) (map[string]model.StringMap, error) {
|
||||
if allowFromCache {
|
||||
var cacheItem map[string]model.StringMap
|
||||
if err := s.rootStore.doStandardReadCache(s.rootStore.channelMembersNotifyPropsCache, channelId, &cacheItem); err == nil {
|
||||
return cacheItem, nil
|
||||
}
|
||||
}
|
||||
|
||||
props, err := s.ChannelStore.GetAllChannelMembersNotifyPropsForChannel(channelId, allowFromCache)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if allowFromCache {
|
||||
s.rootStore.doStandardAddToCache(s.rootStore.channelMembersNotifyPropsCache, channelId, props)
|
||||
}
|
||||
|
||||
return props, nil
|
||||
}
|
||||
|
||||
func (s LocalCacheChannelStore) GetByNamesIncludeDeleted(teamId string, names []string, allowFromCache bool) ([]*model.Channel, error) {
|
||||
return s.getByNames(teamId, names, allowFromCache, true)
|
||||
}
|
||||
|
||||
func (s LocalCacheChannelStore) GetByNames(teamId string, names []string, allowFromCache bool) ([]*model.Channel, error) {
|
||||
return s.getByNames(teamId, names, allowFromCache, false)
|
||||
}
|
||||
|
||||
func (s LocalCacheChannelStore) getByNames(teamId string, names []string, allowFromCache, includeArchivedChannels bool) ([]*model.Channel, error) {
|
||||
var channels []*model.Channel
|
||||
|
||||
if allowFromCache {
|
||||
var misses []string
|
||||
visited := make(map[string]struct{})
|
||||
for _, name := range names {
|
||||
if _, ok := visited[name]; ok {
|
||||
continue
|
||||
}
|
||||
visited[name] = struct{}{}
|
||||
var cacheItem *model.Channel
|
||||
|
||||
if err := s.rootStore.doStandardReadCache(s.rootStore.channelByNameCache, teamId+name, &cacheItem); err == nil {
|
||||
if includeArchivedChannels || cacheItem.DeleteAt == 0 {
|
||||
channels = append(channels, cacheItem)
|
||||
}
|
||||
} else {
|
||||
misses = append(misses, name)
|
||||
}
|
||||
}
|
||||
names = misses
|
||||
}
|
||||
|
||||
if len(names) > 0 {
|
||||
var dbChannels []*model.Channel
|
||||
var err error
|
||||
if includeArchivedChannels {
|
||||
dbChannels, err = s.ChannelStore.GetByNamesIncludeDeleted(teamId, names, allowFromCache)
|
||||
} else {
|
||||
dbChannels, err = s.ChannelStore.GetByNames(teamId, names, allowFromCache)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, channel := range dbChannels {
|
||||
if allowFromCache {
|
||||
s.rootStore.doStandardAddToCache(s.rootStore.channelByNameCache, teamId+channel.Name, channel)
|
||||
}
|
||||
channels = append(channels, channel)
|
||||
}
|
||||
}
|
||||
|
||||
return channels, nil
|
||||
}
|
||||
|
||||
func (s LocalCacheChannelStore) GetByNameIncludeDeleted(teamId string, name string, allowFromCache bool) (*model.Channel, error) {
|
||||
return s.getByName(teamId, name, allowFromCache, true)
|
||||
}
|
||||
|
||||
func (s LocalCacheChannelStore) GetByName(teamId string, name string, allowFromCache bool) (*model.Channel, error) {
|
||||
return s.getByName(teamId, name, allowFromCache, false)
|
||||
}
|
||||
|
||||
func (s LocalCacheChannelStore) getByName(teamId string, name string, allowFromCache, includeArchivedChannels bool) (*model.Channel, error) {
|
||||
var channel *model.Channel
|
||||
|
||||
if allowFromCache {
|
||||
if err := s.rootStore.doStandardReadCache(s.rootStore.channelByNameCache, teamId+name, &channel); err == nil {
|
||||
if includeArchivedChannels || channel.DeleteAt == 0 {
|
||||
return channel, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var err error
|
||||
if includeArchivedChannels {
|
||||
channel, err = s.ChannelStore.GetByNameIncludeDeleted(teamId, name, allowFromCache)
|
||||
} else {
|
||||
channel, err = s.ChannelStore.GetByName(teamId, name, allowFromCache)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if allowFromCache {
|
||||
s.rootStore.doStandardAddToCache(s.rootStore.channelByNameCache, teamId+name, channel)
|
||||
}
|
||||
|
||||
return channel, nil
|
||||
}
|
||||
|
||||
func (s LocalCacheChannelStore) SaveMember(rctx request.CTX, member *model.ChannelMember) (*model.ChannelMember, error) {
|
||||
member, err := s.ChannelStore.SaveMember(rctx, member)
|
||||
if err != nil {
|
||||
|
||||
@@ -171,10 +171,10 @@ func (es *LocalCacheEmojiStore) removeFromCache(emoji *model.Emoji) {
|
||||
es.emojiByIdMut.Lock()
|
||||
es.emojiByIdInvalidations[emoji.Id] = true
|
||||
es.emojiByIdMut.Unlock()
|
||||
es.rootStore.doInvalidateCacheCluster(es.rootStore.emojiCacheById, emoji.Id)
|
||||
es.rootStore.doInvalidateCacheCluster(es.rootStore.emojiCacheById, emoji.Id, nil)
|
||||
|
||||
es.emojiByNameMut.Lock()
|
||||
es.emojiByNameInvalidations[emoji.Name] = true
|
||||
es.emojiByNameMut.Unlock()
|
||||
es.rootStore.doInvalidateCacheCluster(es.rootStore.emojiIdCacheByName, emoji.Name)
|
||||
es.rootStore.doInvalidateCacheCluster(es.rootStore.emojiIdCacheByName, emoji.Name, nil)
|
||||
}
|
||||
|
||||
@@ -62,7 +62,7 @@ func (s LocalCacheFileInfoStore) InvalidateFileInfosForPostCache(postId string,
|
||||
if deleted {
|
||||
cacheKey += "_deleted"
|
||||
}
|
||||
s.rootStore.doInvalidateCacheCluster(s.rootStore.fileInfoCache, cacheKey)
|
||||
s.rootStore.doInvalidateCacheCluster(s.rootStore.fileInfoCache, cacheKey, nil)
|
||||
if s.rootStore.metrics != nil {
|
||||
s.rootStore.metrics.IncrementMemCacheInvalidationCounter("File Info Cache - Remove by PostId")
|
||||
}
|
||||
|
||||
@@ -35,8 +35,15 @@ const (
|
||||
EmojiCacheSize = 5000
|
||||
EmojiCacheSec = 30 * 60
|
||||
|
||||
ChannelPinnedPostsCountsCacheSize = model.ChannelCacheSize
|
||||
ChannelPinnedPostsCountsCacheSec = 30 * 60
|
||||
ChannelPinnedPostsCountsCacheSize = model.ChannelCacheSize
|
||||
ChannelPinnedPostsCountsCacheSec = 30 * 60
|
||||
AllChannelMembersForUserCacheSize = model.SessionCacheSize
|
||||
AllChannelMembersForUserCacheDuration = 15 * time.Minute
|
||||
|
||||
AllChannelMembersNotifyPropsForChannelCacheSize = model.SessionCacheSize
|
||||
AllChannelMembersNotifyPropsForChannelCacheDuration = 30 * time.Minute
|
||||
|
||||
ChannelCacheDuration = 15 * time.Minute
|
||||
|
||||
ChannelMembersCountsCacheSize = model.ChannelCacheSize
|
||||
ChannelMembersCountsCacheSec = 30 * 60
|
||||
@@ -87,11 +94,14 @@ type LocalCacheStore struct {
|
||||
emojiCacheById cache.Cache
|
||||
emojiIdCacheByName cache.Cache
|
||||
|
||||
channel LocalCacheChannelStore
|
||||
channelMemberCountsCache cache.Cache
|
||||
channelGuestCountCache cache.Cache
|
||||
channelPinnedPostCountsCache cache.Cache
|
||||
channelByIdCache cache.Cache
|
||||
channel LocalCacheChannelStore
|
||||
channelMemberCountsCache cache.Cache
|
||||
channelGuestCountCache cache.Cache
|
||||
channelPinnedPostCountsCache cache.Cache
|
||||
channelByIdCache cache.Cache
|
||||
channelMembersForUserCache cache.Cache
|
||||
channelMembersNotifyPropsCache cache.Cache
|
||||
channelByNameCache cache.Cache
|
||||
|
||||
webhook LocalCacheWebhookStore
|
||||
webhookCache cache.Cache
|
||||
@@ -240,6 +250,30 @@ func NewLocalCacheLayer(baseStore store.Store, metrics einterfaces.MetricsInterf
|
||||
}); err != nil {
|
||||
return
|
||||
}
|
||||
if localCacheStore.channelMembersForUserCache, err = cacheProvider.NewCache(&cache.CacheOptions{
|
||||
Size: AllChannelMembersForUserCacheSize,
|
||||
Name: "ChannnelMembersForUser",
|
||||
DefaultExpiry: AllChannelMembersForUserCacheDuration,
|
||||
InvalidateClusterEvent: model.ClusterEventInvalidateCacheForUser,
|
||||
}); err != nil {
|
||||
return
|
||||
}
|
||||
if localCacheStore.channelMembersNotifyPropsCache, err = cacheProvider.NewCache(&cache.CacheOptions{
|
||||
Size: AllChannelMembersNotifyPropsForChannelCacheSize,
|
||||
Name: "ChannnelMembersNotifyProps",
|
||||
DefaultExpiry: AllChannelMembersNotifyPropsForChannelCacheDuration,
|
||||
InvalidateClusterEvent: model.ClusterEventInvalidateCacheForChannelMembersNotifyProps,
|
||||
}); err != nil {
|
||||
return
|
||||
}
|
||||
if localCacheStore.channelByNameCache, err = cacheProvider.NewCache(&cache.CacheOptions{
|
||||
Size: model.ChannelCacheSize,
|
||||
Name: "ChannelByName",
|
||||
DefaultExpiry: ChannelCacheDuration,
|
||||
InvalidateClusterEvent: model.ClusterEventInvalidateCacheForChannelByName,
|
||||
}); err != nil {
|
||||
return
|
||||
}
|
||||
localCacheStore.channel = LocalCacheChannelStore{ChannelStore: baseStore.Channel(), rootStore: &localCacheStore}
|
||||
|
||||
// Posts
|
||||
@@ -331,6 +365,9 @@ func NewLocalCacheLayer(baseStore store.Store, metrics einterfaces.MetricsInterf
|
||||
cluster.RegisterClusterMessageHandler(model.ClusterEventInvalidateCacheForChannelMemberCounts, localCacheStore.channel.handleClusterInvalidateChannelMemberCounts)
|
||||
cluster.RegisterClusterMessageHandler(model.ClusterEventInvalidateCacheForChannelGuestCount, localCacheStore.channel.handleClusterInvalidateChannelGuestCounts)
|
||||
cluster.RegisterClusterMessageHandler(model.ClusterEventInvalidateCacheForChannel, localCacheStore.channel.handleClusterInvalidateChannelById)
|
||||
cluster.RegisterClusterMessageHandler(model.ClusterEventInvalidateCacheForUser, localCacheStore.channel.handleClusterInvalidateChannelForUser)
|
||||
cluster.RegisterClusterMessageHandler(model.ClusterEventInvalidateCacheForChannelMembersNotifyProps, localCacheStore.channel.handleClusterInvalidateChannelMembersNotifyProps)
|
||||
cluster.RegisterClusterMessageHandler(model.ClusterEventInvalidateCacheForChannelByName, localCacheStore.channel.handleClusterInvalidateChannelByName)
|
||||
cluster.RegisterClusterMessageHandler(model.ClusterEventInvalidateCacheForLastPosts, localCacheStore.post.handleClusterInvalidateLastPosts)
|
||||
cluster.RegisterClusterMessageHandler(model.ClusterEventInvalidateCacheForTermsOfService, localCacheStore.termsOfService.handleClusterInvalidateTermsOfService)
|
||||
cluster.RegisterClusterMessageHandler(model.ClusterEventInvalidateCacheForProfileByIds, localCacheStore.user.handleClusterInvalidateScheme)
|
||||
@@ -396,7 +433,7 @@ func (s LocalCacheStore) DropAllTables() {
|
||||
s.Store.DropAllTables()
|
||||
}
|
||||
|
||||
func (s *LocalCacheStore) doInvalidateCacheCluster(cache cache.Cache, key string) {
|
||||
func (s *LocalCacheStore) doInvalidateCacheCluster(cache cache.Cache, key string, props map[string]string) {
|
||||
cache.Remove(key)
|
||||
if s.cluster != nil {
|
||||
msg := &model.ClusterMessage{
|
||||
@@ -404,6 +441,9 @@ func (s *LocalCacheStore) doInvalidateCacheCluster(cache cache.Cache, key string
|
||||
SendType: model.ClusterSendBestEffort,
|
||||
Data: []byte(key),
|
||||
}
|
||||
if props != nil {
|
||||
msg.Props = props
|
||||
}
|
||||
s.cluster.SendClusterMessage(msg)
|
||||
}
|
||||
}
|
||||
@@ -450,6 +490,9 @@ func (s *LocalCacheStore) Invalidate() {
|
||||
s.doClearCacheCluster(s.channelPinnedPostCountsCache)
|
||||
s.doClearCacheCluster(s.channelGuestCountCache)
|
||||
s.doClearCacheCluster(s.channelByIdCache)
|
||||
s.doClearCacheCluster(s.channelMembersForUserCache)
|
||||
s.doClearCacheCluster(s.channelMembersNotifyPropsCache)
|
||||
s.doClearCacheCluster(s.channelByNameCache)
|
||||
s.doClearCacheCluster(s.postLastPostsCache)
|
||||
s.doClearCacheCluster(s.termsOfServiceCache)
|
||||
s.doClearCacheCluster(s.lastPostTimeCache)
|
||||
|
||||
@@ -56,11 +56,11 @@ func (s LocalCachePostStore) ClearCaches() {
|
||||
}
|
||||
|
||||
func (s LocalCachePostStore) InvalidateLastPostTimeCache(channelId string) {
|
||||
s.rootStore.doInvalidateCacheCluster(s.rootStore.lastPostTimeCache, channelId)
|
||||
s.rootStore.doInvalidateCacheCluster(s.rootStore.lastPostTimeCache, channelId, nil)
|
||||
|
||||
// Keys are "{channelid}{limit}" and caching only occurs on limits of 30 and 60
|
||||
s.rootStore.doInvalidateCacheCluster(s.rootStore.postLastPostsCache, channelId+"30")
|
||||
s.rootStore.doInvalidateCacheCluster(s.rootStore.postLastPostsCache, channelId+"60")
|
||||
s.rootStore.doInvalidateCacheCluster(s.rootStore.postLastPostsCache, channelId+"30", nil)
|
||||
s.rootStore.doInvalidateCacheCluster(s.rootStore.postLastPostsCache, channelId+"60", nil)
|
||||
|
||||
s.PostStore.InvalidateLastPostTimeCache(channelId)
|
||||
|
||||
|
||||
@@ -24,12 +24,12 @@ func (s *LocalCacheReactionStore) handleClusterInvalidateReaction(msg *model.Clu
|
||||
}
|
||||
|
||||
func (s LocalCacheReactionStore) Save(reaction *model.Reaction) (*model.Reaction, error) {
|
||||
defer s.rootStore.doInvalidateCacheCluster(s.rootStore.reactionCache, reaction.PostId)
|
||||
defer s.rootStore.doInvalidateCacheCluster(s.rootStore.reactionCache, reaction.PostId, nil)
|
||||
return s.ReactionStore.Save(reaction)
|
||||
}
|
||||
|
||||
func (s LocalCacheReactionStore) Delete(reaction *model.Reaction) (*model.Reaction, error) {
|
||||
defer s.rootStore.doInvalidateCacheCluster(s.rootStore.reactionCache, reaction.PostId)
|
||||
defer s.rootStore.doInvalidateCacheCluster(s.rootStore.reactionCache, reaction.PostId, nil)
|
||||
return s.ReactionStore.Delete(reaction)
|
||||
}
|
||||
|
||||
|
||||
@@ -36,7 +36,7 @@ func (s *LocalCacheRoleStore) handleClusterInvalidateRolePermissions(msg *model.
|
||||
|
||||
func (s LocalCacheRoleStore) Save(role *model.Role) (*model.Role, error) {
|
||||
if role.Name != "" {
|
||||
defer s.rootStore.doInvalidateCacheCluster(s.rootStore.roleCache, role.Name)
|
||||
defer s.rootStore.doInvalidateCacheCluster(s.rootStore.roleCache, role.Name, nil)
|
||||
defer s.rootStore.doClearCacheCluster(s.rootStore.rolePermissionsCache)
|
||||
}
|
||||
return s.RoleStore.Save(role)
|
||||
@@ -85,7 +85,7 @@ func (s LocalCacheRoleStore) Delete(roleId string) (*model.Role, error) {
|
||||
role, err := s.RoleStore.Delete(roleId)
|
||||
|
||||
if err == nil {
|
||||
s.rootStore.doInvalidateCacheCluster(s.rootStore.roleCache, role.Name)
|
||||
s.rootStore.doInvalidateCacheCluster(s.rootStore.roleCache, role.Name, nil)
|
||||
defer s.rootStore.doClearCacheCluster(s.rootStore.rolePermissionsCache)
|
||||
}
|
||||
return role, err
|
||||
|
||||
@@ -25,7 +25,7 @@ func (s *LocalCacheSchemeStore) handleClusterInvalidateScheme(msg *model.Cluster
|
||||
|
||||
func (s LocalCacheSchemeStore) Save(scheme *model.Scheme) (*model.Scheme, error) {
|
||||
if scheme.Id != "" {
|
||||
defer s.rootStore.doInvalidateCacheCluster(s.rootStore.schemeCache, scheme.Id)
|
||||
defer s.rootStore.doInvalidateCacheCluster(s.rootStore.schemeCache, scheme.Id, nil)
|
||||
}
|
||||
return s.SchemeStore.Save(scheme)
|
||||
}
|
||||
@@ -47,7 +47,7 @@ func (s LocalCacheSchemeStore) Get(schemeId string) (*model.Scheme, error) {
|
||||
}
|
||||
|
||||
func (s LocalCacheSchemeStore) Delete(schemeId string) (*model.Scheme, error) {
|
||||
defer s.rootStore.doInvalidateCacheCluster(s.rootStore.schemeCache, schemeId)
|
||||
defer s.rootStore.doInvalidateCacheCluster(s.rootStore.schemeCache, schemeId, nil)
|
||||
defer s.rootStore.doClearCacheCluster(s.rootStore.roleCache)
|
||||
defer s.rootStore.doClearCacheCluster(s.rootStore.rolePermissionsCache)
|
||||
return s.SchemeStore.Delete(schemeId)
|
||||
|
||||
@@ -31,7 +31,7 @@ func (s LocalCacheTeamStore) ClearCaches() {
|
||||
}
|
||||
|
||||
func (s LocalCacheTeamStore) InvalidateAllTeamIdsForUser(userId string) {
|
||||
s.rootStore.doInvalidateCacheCluster(s.rootStore.teamAllTeamIdsForUserCache, userId)
|
||||
s.rootStore.doInvalidateCacheCluster(s.rootStore.teamAllTeamIdsForUserCache, userId, nil)
|
||||
if s.rootStore.metrics != nil {
|
||||
s.rootStore.metrics.IncrementMemCacheInvalidationCounter("All Team Ids for User - Remove by UserId")
|
||||
}
|
||||
|
||||
@@ -40,7 +40,7 @@ func (s LocalCacheTermsOfServiceStore) Save(termsOfService *model.TermsOfService
|
||||
|
||||
if err == nil {
|
||||
s.rootStore.doStandardAddToCache(s.rootStore.termsOfServiceCache, tos.Id, tos)
|
||||
s.rootStore.doInvalidateCacheCluster(s.rootStore.termsOfServiceCache, LatestKey)
|
||||
s.rootStore.doInvalidateCacheCluster(s.rootStore.termsOfServiceCache, LatestKey, nil)
|
||||
}
|
||||
return tos, err
|
||||
}
|
||||
|
||||
@@ -54,7 +54,7 @@ func (s *LocalCacheUserStore) InvalidateProfileCacheForUser(userId string) {
|
||||
s.userProfileByIdsMut.Lock()
|
||||
s.userProfileByIdsInvalidations[userId] = true
|
||||
s.userProfileByIdsMut.Unlock()
|
||||
s.rootStore.doInvalidateCacheCluster(s.rootStore.userProfileByIdsCache, userId)
|
||||
s.rootStore.doInvalidateCacheCluster(s.rootStore.userProfileByIdsCache, userId, nil)
|
||||
|
||||
if s.rootStore.metrics != nil {
|
||||
s.rootStore.metrics.IncrementMemCacheInvalidationCounter("Profile By Ids - Remove")
|
||||
@@ -68,7 +68,7 @@ func (s *LocalCacheUserStore) InvalidateProfilesInChannelCacheByUser(userId stri
|
||||
var userMap map[string]*model.User
|
||||
if err = s.rootStore.profilesInChannelCache.Get(key, &userMap); err == nil {
|
||||
if _, userInCache := userMap[userId]; userInCache {
|
||||
s.rootStore.doInvalidateCacheCluster(s.rootStore.profilesInChannelCache, key)
|
||||
s.rootStore.doInvalidateCacheCluster(s.rootStore.profilesInChannelCache, key, nil)
|
||||
if s.rootStore.metrics != nil {
|
||||
s.rootStore.metrics.IncrementMemCacheInvalidationCounter("Profiles in Channel - Remove by User")
|
||||
}
|
||||
@@ -79,7 +79,7 @@ func (s *LocalCacheUserStore) InvalidateProfilesInChannelCacheByUser(userId stri
|
||||
}
|
||||
|
||||
func (s *LocalCacheUserStore) InvalidateProfilesInChannelCache(channelID string) {
|
||||
s.rootStore.doInvalidateCacheCluster(s.rootStore.profilesInChannelCache, channelID)
|
||||
s.rootStore.doInvalidateCacheCluster(s.rootStore.profilesInChannelCache, channelID, nil)
|
||||
if s.rootStore.metrics != nil {
|
||||
s.rootStore.metrics.IncrementMemCacheInvalidationCounter("Profiles in Channel - Remove by Channel")
|
||||
}
|
||||
|
||||
@@ -32,7 +32,7 @@ func (s LocalCacheWebhookStore) ClearCaches() {
|
||||
}
|
||||
|
||||
func (s LocalCacheWebhookStore) InvalidateWebhookCache(webhookId string) {
|
||||
s.rootStore.doInvalidateCacheCluster(s.rootStore.webhookCache, webhookId)
|
||||
s.rootStore.doInvalidateCacheCluster(s.rootStore.webhookCache, webhookId, nil)
|
||||
if s.rootStore.metrics != nil {
|
||||
s.rootStore.metrics.IncrementMemCacheInvalidationCounter("Webhook - Remove by WebhookId")
|
||||
}
|
||||
|
||||
@@ -10,7 +10,6 @@ import (
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
"unicode/utf8"
|
||||
|
||||
sq "github.com/mattermost/squirrel"
|
||||
@@ -20,17 +19,6 @@ import (
|
||||
"github.com/mattermost/mattermost/server/public/shared/request"
|
||||
"github.com/mattermost/mattermost/server/v8/channels/store"
|
||||
"github.com/mattermost/mattermost/server/v8/einterfaces"
|
||||
"github.com/mattermost/mattermost/server/v8/platform/services/cache"
|
||||
)
|
||||
|
||||
const (
|
||||
AllChannelMembersForUserCacheSize = model.SessionCacheSize
|
||||
AllChannelMembersForUserCacheDuration = 15 * time.Minute // 15 mins
|
||||
|
||||
AllChannelMembersNotifyPropsForChannelCacheSize = model.SessionCacheSize
|
||||
AllChannelMembersNotifyPropsForChannelCacheDuration = 30 * time.Minute // 30 mins
|
||||
|
||||
ChannelCacheDuration = 15 * time.Minute // 15 mins
|
||||
)
|
||||
|
||||
type SqlChannelStore struct {
|
||||
@@ -442,30 +430,10 @@ type publicChannel struct {
|
||||
Purpose string `json:"purpose"`
|
||||
}
|
||||
|
||||
var allChannelMembersForUserCache = cache.NewLRU(cache.LRUOptions{
|
||||
Size: AllChannelMembersForUserCacheSize,
|
||||
})
|
||||
var allChannelMembersNotifyPropsForChannelCache = cache.NewLRU(cache.LRUOptions{
|
||||
Size: AllChannelMembersNotifyPropsForChannelCacheSize,
|
||||
})
|
||||
var channelByNameCache = cache.NewLRU(cache.LRUOptions{
|
||||
Size: model.ChannelCacheSize,
|
||||
})
|
||||
|
||||
func (s SqlChannelStore) ClearMembersForUserCache() {
|
||||
allChannelMembersForUserCache.Purge()
|
||||
}
|
||||
|
||||
func (s SqlChannelStore) ClearCaches() {
|
||||
allChannelMembersForUserCache.Purge()
|
||||
allChannelMembersNotifyPropsForChannelCache.Purge()
|
||||
channelByNameCache.Purge()
|
||||
|
||||
if s.metrics != nil {
|
||||
s.metrics.IncrementMemCacheInvalidationCounter("All Channel Members for User - Purge")
|
||||
s.metrics.IncrementMemCacheInvalidationCounter("All Channel Members Notify Props for Channel - Purge")
|
||||
s.metrics.IncrementMemCacheInvalidationCounter("Channel By Name - Purge")
|
||||
}
|
||||
}
|
||||
|
||||
func newSqlChannelStore(sqlStore *SqlStore, metrics einterfaces.MetricsInterface) store.ChannelStore {
|
||||
@@ -820,11 +788,8 @@ func (s SqlChannelStore) GetChannelUnread(channelId, userId string) (*model.Chan
|
||||
func (s SqlChannelStore) InvalidateChannel(id string) {
|
||||
}
|
||||
|
||||
//nolint:unparam
|
||||
func (s SqlChannelStore) InvalidateChannelByName(teamId, name string) {
|
||||
channelByNameCache.Remove(teamId + name)
|
||||
if s.metrics != nil {
|
||||
s.metrics.IncrementMemCacheInvalidationCounter("Channel by Name - Remove by TeamId and Name")
|
||||
}
|
||||
}
|
||||
|
||||
func (s SqlChannelStore) GetPinnedPosts(channelId string) (*model.PostList, error) {
|
||||
@@ -1393,10 +1358,6 @@ func (s SqlChannelStore) GetTeamChannels(teamId string) (model.ChannelList, erro
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func (s SqlChannelStore) GetByName(teamId string, name string, allowFromCache bool) (*model.Channel, error) {
|
||||
return s.getByName(teamId, name, false, allowFromCache)
|
||||
}
|
||||
|
||||
func (s SqlChannelStore) GetByNamesIncludeDeleted(teamId string, names []string, allowFromCache bool) ([]*model.Channel, error) {
|
||||
return s.getByNames(teamId, names, allowFromCache, true)
|
||||
}
|
||||
@@ -1406,27 +1367,7 @@ func (s SqlChannelStore) GetByNames(teamId string, names []string, allowFromCach
|
||||
}
|
||||
|
||||
func (s SqlChannelStore) getByNames(teamId string, names []string, allowFromCache, includeArchivedChannels bool) ([]*model.Channel, error) {
|
||||
var channels []*model.Channel
|
||||
|
||||
if allowFromCache {
|
||||
var misses []string
|
||||
visited := make(map[string]struct{})
|
||||
for _, name := range names {
|
||||
if _, ok := visited[name]; ok {
|
||||
continue
|
||||
}
|
||||
visited[name] = struct{}{}
|
||||
var cacheItem *model.Channel
|
||||
if err := channelByNameCache.Get(teamId+name, &cacheItem); err == nil {
|
||||
if includeArchivedChannels || cacheItem.DeleteAt == 0 {
|
||||
channels = append(channels, cacheItem)
|
||||
}
|
||||
} else {
|
||||
misses = append(misses, name)
|
||||
}
|
||||
}
|
||||
names = misses
|
||||
}
|
||||
channels := []*model.Channel{}
|
||||
|
||||
if len(names) > 0 {
|
||||
cond := sq.And{
|
||||
@@ -1450,27 +1391,13 @@ func (s SqlChannelStore) getByNames(teamId string, names []string, allowFromCach
|
||||
return nil, errors.Wrap(err, "GetByNames_tosql")
|
||||
}
|
||||
|
||||
dbChannels := []*model.Channel{}
|
||||
if err := s.GetReplicaX().Select(&dbChannels, query, args...); err != nil && err != sql.ErrNoRows {
|
||||
if err := s.GetReplicaX().Select(&channels, query, args...); err != nil && err != sql.ErrNoRows {
|
||||
msg := fmt.Sprintf("failed to get channels with names=%v", names)
|
||||
if teamId != "" {
|
||||
msg += fmt.Sprintf(" teamId=%s", teamId)
|
||||
}
|
||||
return nil, errors.Wrap(err, msg)
|
||||
}
|
||||
for _, channel := range dbChannels {
|
||||
channelByNameCache.SetWithExpiry(teamId+channel.Name, channel, ChannelCacheDuration)
|
||||
channels = append(channels, channel)
|
||||
}
|
||||
// Not all channels are in cache. Increment aggregate miss counter.
|
||||
if s.metrics != nil {
|
||||
s.metrics.IncrementMemCacheMissCounter("Channel By Name - Aggregate")
|
||||
}
|
||||
} else {
|
||||
// All of the channel names are in cache. Increment aggregate hit counter.
|
||||
if s.metrics != nil {
|
||||
s.metrics.IncrementMemCacheHitCounter("Channel By Name - Aggregate")
|
||||
}
|
||||
}
|
||||
|
||||
return channels, nil
|
||||
@@ -1480,6 +1407,10 @@ func (s SqlChannelStore) GetByNameIncludeDeleted(teamId string, name string, all
|
||||
return s.getByName(teamId, name, true, allowFromCache)
|
||||
}
|
||||
|
||||
func (s SqlChannelStore) GetByName(teamId string, name string, allowFromCache bool) (*model.Channel, error) {
|
||||
return s.getByName(teamId, name, false, allowFromCache)
|
||||
}
|
||||
|
||||
func (s SqlChannelStore) getByName(teamId string, name string, includeDeleted bool, allowFromCache bool) (*model.Channel, error) {
|
||||
query := s.getQueryBuilder().
|
||||
Select("*").
|
||||
@@ -1493,35 +1424,21 @@ func (s SqlChannelStore) getByName(teamId string, name string, includeDeleted bo
|
||||
if !includeDeleted {
|
||||
query = query.Where(sq.Eq{"DeleteAt": 0})
|
||||
}
|
||||
channel := model.Channel{}
|
||||
|
||||
if allowFromCache {
|
||||
var cacheItem *model.Channel
|
||||
if err := channelByNameCache.Get(teamId+name, &cacheItem); err == nil {
|
||||
if s.metrics != nil {
|
||||
s.metrics.IncrementMemCacheHitCounter("Channel By Name")
|
||||
}
|
||||
return cacheItem, nil
|
||||
}
|
||||
if s.metrics != nil {
|
||||
s.metrics.IncrementMemCacheMissCounter("Channel By Name")
|
||||
}
|
||||
}
|
||||
|
||||
queryStr, args, err := query.ToSql()
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "getByName_tosql")
|
||||
}
|
||||
|
||||
if err = s.GetReplicaX().Get(&channel, queryStr, args...); err != nil {
|
||||
channel := model.Channel{}
|
||||
if err := s.GetReplicaX().Get(&channel, queryStr, args...); err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, store.NewErrNotFound("Channel", fmt.Sprintf("TeamId=%s&Name=%s", teamId, name))
|
||||
}
|
||||
return nil, errors.Wrapf(err, "failed to find channel with TeamId=%s and Name=%s", teamId, name)
|
||||
}
|
||||
|
||||
err = channelByNameCache.SetWithExpiry(teamId+name, &channel, ChannelCacheDuration)
|
||||
return &channel, err
|
||||
return &channel, nil
|
||||
}
|
||||
|
||||
func (s SqlChannelStore) GetDeletedByName(teamId string, name string) (*model.Channel, error) {
|
||||
@@ -2156,11 +2073,6 @@ func (s SqlChannelStore) GetMemberLastViewedAt(ctx context.Context, channelID st
|
||||
}
|
||||
|
||||
func (s SqlChannelStore) InvalidateAllChannelMembersForUser(userId string) {
|
||||
allChannelMembersForUserCache.Remove(userId)
|
||||
allChannelMembersForUserCache.Remove(userId + "_deleted")
|
||||
if s.metrics != nil {
|
||||
s.metrics.IncrementMemCacheInvalidationCounter("All Channel Members for User - Remove by UserId")
|
||||
}
|
||||
}
|
||||
|
||||
func (s SqlChannelStore) GetMemberForPost(postId string, userId string, includeArchivedChannels bool) (*model.ChannelMember, error) {
|
||||
@@ -2214,24 +2126,6 @@ func (s SqlChannelStore) GetMemberForPost(postId string, userId string, includeA
|
||||
}
|
||||
|
||||
func (s SqlChannelStore) GetAllChannelMembersForUser(userId string, allowFromCache bool, includeDeleted bool) (_ map[string]string, err error) {
|
||||
cache_key := userId
|
||||
if includeDeleted {
|
||||
cache_key += "_deleted"
|
||||
}
|
||||
if allowFromCache {
|
||||
ids := make(map[string]string)
|
||||
if err = allChannelMembersForUserCache.Get(cache_key, &ids); err == nil {
|
||||
if s.metrics != nil {
|
||||
s.metrics.IncrementMemCacheHitCounter("All Channel Members for User")
|
||||
}
|
||||
return ids, nil
|
||||
}
|
||||
}
|
||||
|
||||
if s.metrics != nil {
|
||||
s.metrics.IncrementMemCacheMissCounter("All Channel Members for User")
|
||||
}
|
||||
|
||||
query := s.getQueryBuilder().
|
||||
Select(`
|
||||
ChannelMembers.ChannelId, ChannelMembers.Roles, ChannelMembers.SchemeGuest,
|
||||
@@ -2282,9 +2176,6 @@ func (s SqlChannelStore) GetAllChannelMembersForUser(userId string, allowFromCac
|
||||
}
|
||||
ids := data.ToMapStringString()
|
||||
|
||||
if allowFromCache {
|
||||
allChannelMembersForUserCache.SetWithExpiry(cache_key, ids, AllChannelMembersForUserCacheDuration)
|
||||
}
|
||||
return ids, nil
|
||||
}
|
||||
|
||||
@@ -2334,10 +2225,6 @@ func (s SqlChannelStore) GetChannelsMemberCount(channelIDs []string) (_ map[stri
|
||||
}
|
||||
|
||||
func (s SqlChannelStore) InvalidateCacheForChannelMembersNotifyProps(channelId string) {
|
||||
allChannelMembersNotifyPropsForChannelCache.Remove(channelId)
|
||||
if s.metrics != nil {
|
||||
s.metrics.IncrementMemCacheInvalidationCounter("All Channel Members Notify Props for Channel - Remove by ChannelId")
|
||||
}
|
||||
}
|
||||
|
||||
type allChannelMemberNotifyProps struct {
|
||||
@@ -2346,20 +2233,6 @@ type allChannelMemberNotifyProps struct {
|
||||
}
|
||||
|
||||
func (s SqlChannelStore) GetAllChannelMembersNotifyPropsForChannel(channelId string, allowFromCache bool) (map[string]model.StringMap, error) {
|
||||
if allowFromCache {
|
||||
var cacheItem map[string]model.StringMap
|
||||
if err := allChannelMembersNotifyPropsForChannelCache.Get(channelId, &cacheItem); err == nil {
|
||||
if s.metrics != nil {
|
||||
s.metrics.IncrementMemCacheHitCounter("All Channel Members Notify Props for Channel")
|
||||
}
|
||||
return cacheItem, nil
|
||||
}
|
||||
}
|
||||
|
||||
if s.metrics != nil {
|
||||
s.metrics.IncrementMemCacheMissCounter("All Channel Members Notify Props for Channel")
|
||||
}
|
||||
|
||||
data := []allChannelMemberNotifyProps{}
|
||||
err := s.GetReplicaX().Select(&data, `
|
||||
SELECT UserId, NotifyProps
|
||||
@@ -2374,8 +2247,6 @@ func (s SqlChannelStore) GetAllChannelMembersNotifyPropsForChannel(channelId str
|
||||
props[data[i].UserId] = data[i].NotifyProps
|
||||
}
|
||||
|
||||
allChannelMembersNotifyPropsForChannelCache.SetWithExpiry(channelId, props, AllChannelMembersNotifyPropsForChannelCacheDuration)
|
||||
|
||||
return props, nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user