[GH-13058] Migrate webhookCache from store/sqlstore/webhook_st… (#13091)

* Migrate webhookCache from store/sqlstore/webhook_store.go to the new store/localcachelayer

* Remove duplicated statistics and move several functions to the layer

* Put cache size in format minutes * 60seconds

* Move cache functions to layer functions over the caches

* Directly remove and purge caches in handleClusterInvalidate to avoid loops
This commit is contained in:
larkox
2019-11-25 20:44:52 +01:00
committed by Ben Schumacher
parent e1adfb0d47
commit 22b495b2df
6 changed files with 163 additions and 35 deletions

View File

@@ -24,6 +24,7 @@ const (
CLUSTER_EVENT_CLEAR_SESSION_CACHE_FOR_USER = "clear_session_user"
CLUSTER_EVENT_INVALIDATE_CACHE_FOR_ROLES = "inv_roles"
CLUSTER_EVENT_INVALIDATE_CACHE_FOR_SCHEMES = "inv_schemes"
CLUSTER_EVENT_INVALIDATE_CACHE_FOR_WEBHOOKS = "inv_webhooks"
CLUSTER_EVENT_INVALIDATE_CACHE_FOR_EMOJIS_BY_ID = "inv_emojis_by_id"
CLUSTER_EVENT_INVALIDATE_CACHE_FOR_EMOJIS_ID_BY_NAME = "inv_emojis_id_by_name"
CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_MEMBER_COUNTS = "inv_channel_member_counts"

View File

@@ -20,6 +20,9 @@ const (
SCHEME_CACHE_SIZE = 20000
SCHEME_CACHE_SEC = 30 * 60
WEBHOOK_CACHE_SIZE = 25000
WEBHOOK_CACHE_SEC = 15 * 60
EMOJI_CACHE_SIZE = 5000
EMOJI_CACHE_SEC = 30 * 60
@@ -47,6 +50,8 @@ type LocalCacheStore struct {
emojiIdCacheByName *utils.Cache
channel LocalCacheChannelStore
channelMemberCountsCache *utils.Cache
webhook LocalCacheWebhookStore
webhookCache *utils.Cache
post LocalCachePostStore
postLastPostsCache *utils.Cache
}
@@ -63,6 +68,8 @@ func NewLocalCacheLayer(baseStore store.Store, metrics einterfaces.MetricsInterf
localCacheStore.role = LocalCacheRoleStore{RoleStore: baseStore.Role(), rootStore: &localCacheStore}
localCacheStore.schemeCache = utils.NewLruWithParams(SCHEME_CACHE_SIZE, "Scheme", SCHEME_CACHE_SEC, model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_SCHEMES)
localCacheStore.scheme = LocalCacheSchemeStore{SchemeStore: baseStore.Scheme(), rootStore: &localCacheStore}
localCacheStore.webhookCache = utils.NewLruWithParams(WEBHOOK_CACHE_SIZE, "Webhook", WEBHOOK_CACHE_SEC, model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_WEBHOOKS)
localCacheStore.webhook = LocalCacheWebhookStore{WebhookStore: baseStore.Webhook(), rootStore: &localCacheStore}
localCacheStore.emojiCacheById = utils.NewLruWithParams(EMOJI_CACHE_SIZE, "EmojiById", EMOJI_CACHE_SEC, model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_EMOJIS_BY_ID)
localCacheStore.emojiIdCacheByName = utils.NewLruWithParams(EMOJI_CACHE_SIZE, "EmojiByName", EMOJI_CACHE_SEC, model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_EMOJIS_ID_BY_NAME)
localCacheStore.emoji = LocalCacheEmojiStore{EmojiStore: baseStore.Emoji(), rootStore: &localCacheStore}
@@ -75,6 +82,7 @@ func NewLocalCacheLayer(baseStore store.Store, metrics einterfaces.MetricsInterf
cluster.RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_REACTIONS, localCacheStore.reaction.handleClusterInvalidateReaction)
cluster.RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_ROLES, localCacheStore.role.handleClusterInvalidateRole)
cluster.RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_SCHEMES, localCacheStore.scheme.handleClusterInvalidateScheme)
cluster.RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_WEBHOOKS, localCacheStore.webhook.handleClusterInvalidateWebhook)
cluster.RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_EMOJIS_BY_ID, localCacheStore.emoji.handleClusterInvalidateEmojiById)
cluster.RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_EMOJIS_ID_BY_NAME, localCacheStore.emoji.handleClusterInvalidateEmojiIdByName)
cluster.RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_MEMBER_COUNTS, localCacheStore.channel.handleClusterInvalidateChannelMemberCounts)
@@ -95,6 +103,10 @@ func (s LocalCacheStore) Scheme() store.SchemeStore {
return s.scheme
}
func (s LocalCacheStore) Webhook() store.WebhookStore {
return s.webhook
}
func (s LocalCacheStore) Emoji() store.EmojiStore {
return s.emoji
}
@@ -157,6 +169,7 @@ func (s *LocalCacheStore) doClearCacheCluster(cache *utils.Cache) {
func (s *LocalCacheStore) Invalidate() {
s.doClearCacheCluster(s.reactionCache)
s.doClearCacheCluster(s.webhookCache)
s.doClearCacheCluster(s.emojiCacheById)
s.doClearCacheCluster(s.emojiIdCacheByName)
s.doClearCacheCluster(s.channelMemberCountsCache)

View File

@@ -41,6 +41,12 @@ func getMockStore() *mocks.Store {
mockSchemesStore.On("PermanentDeleteAll").Return(nil)
mockStore.On("Scheme").Return(&mockSchemesStore)
fakeWebhook := model.IncomingWebhook{Id: "123"}
mockWebhookStore := mocks.WebhookStore{}
mockWebhookStore.On("GetIncoming", "123", true).Return(&fakeWebhook, nil)
mockWebhookStore.On("GetIncoming", "123", false).Return(&fakeWebhook, nil)
mockStore.On("Webhook").Return(&mockWebhookStore)
fakeEmoji := model.Emoji{Id: "123", Name: "name123"}
mockEmojiStore := mocks.EmojiStore{}
mockEmojiStore.On("Get", "123", true).Return(&fakeEmoji, nil)

View File

@@ -0,0 +1,86 @@
// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
// See License.txt for license information.
package localcachelayer
import (
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/store"
)
type LocalCacheWebhookStore struct {
store.WebhookStore
rootStore *LocalCacheStore
}
func (s *LocalCacheWebhookStore) handleClusterInvalidateWebhook(msg *model.ClusterMessage) {
if msg.Data == CLEAR_CACHE_MESSAGE_DATA {
s.rootStore.webhookCache.Purge()
} else {
s.rootStore.webhookCache.Remove(msg.Data)
}
}
func (s LocalCacheWebhookStore) ClearCaches() {
s.rootStore.doClearCacheCluster(s.rootStore.webhookCache)
if s.rootStore.metrics != nil {
s.rootStore.metrics.IncrementMemCacheInvalidationCounter("Webhook - Purge")
}
}
func (s LocalCacheWebhookStore) InvalidateWebhookCache(webhookId string) {
s.rootStore.doInvalidateCacheCluster(s.rootStore.webhookCache, webhookId)
if s.rootStore.metrics != nil {
s.rootStore.metrics.IncrementMemCacheInvalidationCounter("Webhook - Remove by WebhookId")
}
}
func (s LocalCacheWebhookStore) GetIncoming(id string, allowFromCache bool) (*model.IncomingWebhook, *model.AppError) {
if !allowFromCache {
return s.WebhookStore.GetIncoming(id, allowFromCache)
}
if incomingWebhook := s.rootStore.doStandardReadCache(s.rootStore.webhookCache, id); incomingWebhook != nil {
return incomingWebhook.(*model.IncomingWebhook), nil
}
incomingWebhook, err := s.WebhookStore.GetIncoming(id, allowFromCache)
if err != nil {
return nil, err
}
s.rootStore.doStandardAddToCache(s.rootStore.webhookCache, id, incomingWebhook)
return incomingWebhook, nil
}
func (s LocalCacheWebhookStore) DeleteIncoming(webhookId string, time int64) *model.AppError {
err := s.WebhookStore.DeleteIncoming(webhookId, time)
if err != nil {
return err
}
s.InvalidateWebhookCache(webhookId)
return nil
}
func (s LocalCacheWebhookStore) PermanentDeleteIncomingByUser(userId string) *model.AppError {
err := s.WebhookStore.PermanentDeleteIncomingByUser(userId)
if err != nil {
return err
}
s.ClearCaches()
return nil
}
func (s LocalCacheWebhookStore) PermanentDeleteIncomingByChannel(channelId string) *model.AppError {
err := s.WebhookStore.PermanentDeleteIncomingByChannel(channelId)
if err != nil {
return err
}
s.ClearCaches()
return nil
}

View File

@@ -0,0 +1,57 @@
// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
// See License.txt for license information.
package localcachelayer
import (
"testing"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/store/storetest"
"github.com/mattermost/mattermost-server/store/storetest/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestWebhookStore(t *testing.T) {
StoreTest(t, storetest.TestWebhookStore)
}
func TestWebhookStoreCache(t *testing.T) {
fakeWebhook := model.IncomingWebhook{Id: "123"}
t.Run("first call not cached, second cached and returning same data", func(t *testing.T) {
mockStore := getMockStore()
cachedStore := NewLocalCacheLayer(mockStore, nil, nil)
incomingWebhook, err := cachedStore.Webhook().GetIncoming("123", true)
require.Nil(t, err)
assert.Equal(t, incomingWebhook, &fakeWebhook)
mockStore.Webhook().(*mocks.WebhookStore).AssertNumberOfCalls(t, "GetIncoming", 1)
assert.Equal(t, incomingWebhook, &fakeWebhook)
cachedStore.Webhook().GetIncoming("123", true)
mockStore.Webhook().(*mocks.WebhookStore).AssertNumberOfCalls(t, "GetIncoming", 1)
})
t.Run("first call not cached, second force no cached", func(t *testing.T) {
mockStore := getMockStore()
cachedStore := NewLocalCacheLayer(mockStore, nil, nil)
cachedStore.Webhook().GetIncoming("123", true)
mockStore.Webhook().(*mocks.WebhookStore).AssertNumberOfCalls(t, "GetIncoming", 1)
cachedStore.Webhook().GetIncoming("123", false)
mockStore.Webhook().(*mocks.WebhookStore).AssertNumberOfCalls(t, "GetIncoming", 2)
})
t.Run("first call not cached, invalidate, and then not cached again", func(t *testing.T) {
mockStore := getMockStore()
cachedStore := NewLocalCacheLayer(mockStore, nil, nil)
cachedStore.Webhook().GetIncoming("123", true)
mockStore.Webhook().(*mocks.WebhookStore).AssertNumberOfCalls(t, "GetIncoming", 1)
cachedStore.Webhook().InvalidateWebhookCache("123")
cachedStore.Webhook().GetIncoming("123", true)
mockStore.Webhook().(*mocks.WebhookStore).AssertNumberOfCalls(t, "GetIncoming", 2)
})
}

View File

@@ -11,7 +11,6 @@ import (
"github.com/mattermost/mattermost-server/einterfaces"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/store"
"github.com/mattermost/mattermost-server/utils"
)
type SqlWebhookStore struct {
@@ -19,19 +18,7 @@ type SqlWebhookStore struct {
metrics einterfaces.MetricsInterface
}
const (
WEBHOOK_CACHE_SIZE = 25000
WEBHOOK_CACHE_SEC = 900 // 15 minutes
)
var webhookCache = utils.NewLru(WEBHOOK_CACHE_SIZE)
func (s SqlWebhookStore) ClearCaches() {
webhookCache.Purge()
if s.metrics != nil {
s.metrics.IncrementMemCacheInvalidationCounter("Webhook - Purge")
}
}
func NewSqlWebhookStore(sqlStore SqlStore, metrics einterfaces.MetricsInterface) store.WebhookStore {
@@ -83,10 +70,6 @@ func (s SqlWebhookStore) CreateIndexesIfNotExists() {
}
func (s SqlWebhookStore) InvalidateWebhookCache(webhookId string) {
webhookCache.Remove(webhookId)
if s.metrics != nil {
s.metrics.IncrementMemCacheInvalidationCounter("Webhook - Remove by WebhookId")
}
}
func (s SqlWebhookStore) SaveIncoming(webhook *model.IncomingWebhook) (*model.IncomingWebhook, *model.AppError) {
@@ -118,18 +101,6 @@ func (s SqlWebhookStore) UpdateIncoming(hook *model.IncomingWebhook) (*model.Inc
}
func (s SqlWebhookStore) GetIncoming(id string, allowFromCache bool) (*model.IncomingWebhook, *model.AppError) {
if allowFromCache {
if cacheItem, ok := webhookCache.Get(id); ok {
if s.metrics != nil {
s.metrics.IncrementMemCacheHitCounter("Webhook")
}
return cacheItem.(*model.IncomingWebhook), nil
}
if s.metrics != nil {
s.metrics.IncrementMemCacheMissCounter("Webhook")
}
}
var webhook model.IncomingWebhook
if err := s.GetReplica().SelectOne(&webhook, "SELECT * FROM IncomingWebhooks WHERE Id = :Id AND DeleteAt = 0", map[string]interface{}{"Id": id}); err != nil {
if err == sql.ErrNoRows {
@@ -138,8 +109,6 @@ func (s SqlWebhookStore) GetIncoming(id string, allowFromCache bool) (*model.Inc
return nil, model.NewAppError("SqlWebhookStore.GetIncoming", "store.sql_webhooks.get_incoming.app_error", nil, "id="+id+", err="+err.Error(), http.StatusInternalServerError)
}
webhookCache.AddWithExpiresInSecs(id, &webhook, WEBHOOK_CACHE_SEC)
return &webhook, nil
}
@@ -149,7 +118,6 @@ func (s SqlWebhookStore) DeleteIncoming(webhookId string, time int64) *model.App
return model.NewAppError("SqlWebhookStore.DeleteIncoming", "store.sql_webhooks.delete_incoming.app_error", nil, "id="+webhookId+", err="+err.Error(), http.StatusInternalServerError)
}
s.InvalidateWebhookCache(webhookId)
return nil
}
@@ -159,7 +127,6 @@ func (s SqlWebhookStore) PermanentDeleteIncomingByUser(userId string) *model.App
return model.NewAppError("SqlWebhookStore.DeleteIncomingByUser", "store.sql_webhooks.permanent_delete_incoming_by_user.app_error", nil, "id="+userId+", err="+err.Error(), http.StatusInternalServerError)
}
s.ClearCaches()
return nil
}
@@ -169,8 +136,6 @@ func (s SqlWebhookStore) PermanentDeleteIncomingByChannel(channelId string) *mod
return model.NewAppError("SqlWebhookStore.DeleteIncomingByChannel", "store.sql_webhooks.permanent_delete_incoming_by_channel.app_error", nil, "id="+channelId+", err="+err.Error(), http.StatusInternalServerError)
}
s.ClearCaches()
return nil
}