mirror of
https://github.com/mattermost/mattermost.git
synced 2025-02-25 18:55:24 -06:00
Modifying post etag cache to apply to getPostsSince (#4837)
This commit is contained in:
committed by
enahum
parent
927eb65978
commit
148fd01b54
@@ -1324,7 +1324,7 @@ func getPostsSince(c *Context, w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
pchan := Srv.Store.Post().GetPostsSince(id, time)
|
||||
pchan := Srv.Store.Post().GetPostsSince(id, time, true)
|
||||
|
||||
if !HasPermissionToChannelContext(c, id, model.PERMISSION_READ_CHANNEL) {
|
||||
return
|
||||
|
||||
@@ -124,7 +124,7 @@ func InvalidateCacheForChannelPosts(channelId string) {
|
||||
}
|
||||
|
||||
func InvalidateCacheForChannelPostsSkipClusterSend(channelId string) {
|
||||
Srv.Store.Post().InvalidatePostEtagCache(channelId)
|
||||
Srv.Store.Post().InvalidateLastPostTimeCache(channelId)
|
||||
}
|
||||
|
||||
func InvalidateCacheForUser(userId string) {
|
||||
|
||||
@@ -19,14 +19,14 @@ type SqlPostStore struct {
|
||||
}
|
||||
|
||||
const (
|
||||
POSTS_ETAG_CACHE_SIZE = 25000
|
||||
POSTS_ETAG_CACHE_SEC = 900 // 15 minutes
|
||||
LAST_POST_TIME_CACHE_SIZE = 25000
|
||||
LAST_POST_TIME_CACHE_SEC = 900 // 15 minutes
|
||||
)
|
||||
|
||||
var postEtagCache = utils.NewLru(CHANNEL_MEMBERS_COUNTS_CACHE_SIZE)
|
||||
var lastPostTimeCache = utils.NewLru(LAST_POST_TIME_CACHE_SIZE)
|
||||
|
||||
func ClearPostCaches() {
|
||||
postEtagCache.Purge()
|
||||
lastPostTimeCache.Purge()
|
||||
}
|
||||
|
||||
func NewSqlPostStore(sqlStore *SqlStore) PostStore {
|
||||
@@ -86,7 +86,7 @@ func (s SqlPostStore) Save(post *model.Post) StoreChannel {
|
||||
if err := s.GetMaster().Insert(post); err != nil {
|
||||
result.Err = model.NewLocAppError("SqlPostStore.Save", "store.sql_post.save.app_error", nil, "id="+post.Id+", "+err.Error())
|
||||
} else {
|
||||
time := model.GetMillis()
|
||||
time := post.UpdateAt
|
||||
|
||||
if post.Type != model.POST_JOIN_LEAVE && post.Type != model.POST_ADD_REMOVE {
|
||||
s.GetMaster().Exec("UPDATE Channels SET LastPostAt = :LastPostAt, TotalMsgCount = TotalMsgCount + 1 WHERE Id = :ChannelId", map[string]interface{}{"LastPostAt": time, "ChannelId": post.ChannelId})
|
||||
@@ -222,46 +222,46 @@ type etagPosts struct {
|
||||
UpdateAt int64
|
||||
}
|
||||
|
||||
func (s SqlPostStore) InvalidatePostEtagCache(channelId string) {
|
||||
postEtagCache.Remove(channelId)
|
||||
func (s SqlPostStore) InvalidateLastPostTimeCache(channelId string) {
|
||||
lastPostTimeCache.Remove(channelId)
|
||||
}
|
||||
|
||||
func (s SqlPostStore) GetEtag(channelId string, allowFromCache bool) StoreChannel {
|
||||
storeChannel := make(StoreChannel, 1)
|
||||
metrics := einterfaces.GetMetricsInterface()
|
||||
|
||||
go func() {
|
||||
result := StoreResult{}
|
||||
metrics := einterfaces.GetMetricsInterface()
|
||||
|
||||
if allowFromCache {
|
||||
if cacheItem, ok := postEtagCache.Get(channelId); ok {
|
||||
if cacheItem, ok := lastPostTimeCache.Get(channelId); ok {
|
||||
if metrics != nil {
|
||||
metrics.IncrementMemCacheHitCounter("Post Etag")
|
||||
metrics.IncrementMemCacheHitCounter("Last Post Time")
|
||||
}
|
||||
result.Data = cacheItem.(string)
|
||||
result.Data = fmt.Sprintf("%v.%v", model.CurrentVersion, cacheItem.(int64))
|
||||
storeChannel <- result
|
||||
close(storeChannel)
|
||||
return
|
||||
} else {
|
||||
if metrics != nil {
|
||||
metrics.IncrementMemCacheMissCounter("Post Etag")
|
||||
metrics.IncrementMemCacheMissCounter("Last Post Time")
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if metrics != nil {
|
||||
metrics.IncrementMemCacheMissCounter("Post Etag")
|
||||
metrics.IncrementMemCacheMissCounter("Last Post Time")
|
||||
}
|
||||
}
|
||||
|
||||
var et etagPosts
|
||||
err := s.GetReplica().SelectOne(&et, "SELECT Id, UpdateAt FROM Posts WHERE ChannelId = :ChannelId ORDER BY UpdateAt DESC LIMIT 1", map[string]interface{}{"ChannelId": channelId})
|
||||
if err != nil {
|
||||
result.Data = fmt.Sprintf("%v.0.%v", model.CurrentVersion, model.GetMillis())
|
||||
result.Data = fmt.Sprintf("%v.%v", model.CurrentVersion, model.GetMillis())
|
||||
} else {
|
||||
result.Data = fmt.Sprintf("%v.%v.%v", model.CurrentVersion, et.Id, et.UpdateAt)
|
||||
result.Data = fmt.Sprintf("%v.%v", model.CurrentVersion, et.UpdateAt)
|
||||
}
|
||||
|
||||
postEtagCache.AddWithExpiresInSecs(channelId, result.Data.(string), POSTS_ETAG_CACHE_SEC)
|
||||
lastPostTimeCache.AddWithExpiresInSecs(channelId, et.UpdateAt, LAST_POST_TIME_CACHE_SEC)
|
||||
|
||||
storeChannel <- result
|
||||
close(storeChannel)
|
||||
@@ -428,11 +428,35 @@ func (s SqlPostStore) GetPosts(channelId string, offset int, limit int) StoreCha
|
||||
return storeChannel
|
||||
}
|
||||
|
||||
func (s SqlPostStore) GetPostsSince(channelId string, time int64) StoreChannel {
|
||||
func (s SqlPostStore) GetPostsSince(channelId string, time int64, allowFromCache bool) StoreChannel {
|
||||
storeChannel := make(StoreChannel, 1)
|
||||
|
||||
go func() {
|
||||
result := StoreResult{}
|
||||
metrics := einterfaces.GetMetricsInterface()
|
||||
|
||||
if allowFromCache {
|
||||
// If the last post in the channel's time is less than or equal to the time we are getting posts since,
|
||||
// we can safely return no posts.
|
||||
if cacheItem, ok := lastPostTimeCache.Get(channelId); ok && cacheItem.(int64) <= time {
|
||||
if metrics != nil {
|
||||
metrics.IncrementMemCacheHitCounter("Last Post Time")
|
||||
}
|
||||
list := &model.PostList{Order: make([]string, 0, 0)}
|
||||
result.Data = list
|
||||
storeChannel <- result
|
||||
close(storeChannel)
|
||||
return
|
||||
} else {
|
||||
if metrics != nil {
|
||||
metrics.IncrementMemCacheMissCounter("Last Post Time")
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if metrics != nil {
|
||||
metrics.IncrementMemCacheMissCounter("Last Post Time")
|
||||
}
|
||||
}
|
||||
|
||||
var posts []*model.Post
|
||||
_, err := s.GetReplica().Select(&posts,
|
||||
@@ -469,13 +493,20 @@ func (s SqlPostStore) GetPostsSince(channelId string, time int64) StoreChannel {
|
||||
|
||||
list := &model.PostList{Order: make([]string, 0, len(posts))}
|
||||
|
||||
var latestUpdate int64 = 0
|
||||
|
||||
for _, p := range posts {
|
||||
list.AddPost(p)
|
||||
if p.UpdateAt > time {
|
||||
list.AddOrder(p.Id)
|
||||
}
|
||||
if latestUpdate < p.UpdateAt {
|
||||
latestUpdate = p.UpdateAt
|
||||
}
|
||||
}
|
||||
|
||||
lastPostTimeCache.AddWithExpiresInSecs(channelId, latestUpdate, LAST_POST_TIME_CACHE_SEC)
|
||||
|
||||
result.Data = list
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -38,14 +39,14 @@ func TestPostStoreGet(t *testing.T) {
|
||||
o1.Message = "a" + model.NewId() + "b"
|
||||
|
||||
etag1 := (<-store.Post().GetEtag(o1.ChannelId, false)).Data.(string)
|
||||
if strings.Index(etag1, model.CurrentVersion+".0.") != 0 {
|
||||
if strings.Index(etag1, model.CurrentVersion+".") != 0 {
|
||||
t.Fatal("Invalid Etag")
|
||||
}
|
||||
|
||||
o1 = (<-store.Post().Save(o1)).Data.(*model.Post)
|
||||
|
||||
etag2 := (<-store.Post().GetEtag(o1.ChannelId, false)).Data.(string)
|
||||
if strings.Index(etag2, model.CurrentVersion+"."+o1.Id) != 0 {
|
||||
if strings.Index(etag2, fmt.Sprintf("%v.%v", model.CurrentVersion, o1.UpdateAt)) != 0 {
|
||||
t.Fatal("Invalid Etag")
|
||||
}
|
||||
|
||||
@@ -70,13 +71,13 @@ func TestGetEtagCache(t *testing.T) {
|
||||
o1.Message = "a" + model.NewId() + "b"
|
||||
|
||||
etag1 := (<-store.Post().GetEtag(o1.ChannelId, true)).Data.(string)
|
||||
if strings.Index(etag1, model.CurrentVersion+".0.") != 0 {
|
||||
if strings.Index(etag1, model.CurrentVersion+".") != 0 {
|
||||
t.Fatal("Invalid Etag")
|
||||
}
|
||||
|
||||
// This one should come from the cache
|
||||
etag2 := (<-store.Post().GetEtag(o1.ChannelId, true)).Data.(string)
|
||||
if strings.Index(etag2, model.CurrentVersion+".0.") != 0 {
|
||||
if strings.Index(etag2, model.CurrentVersion+".") != 0 {
|
||||
t.Fatal("Invalid Etag")
|
||||
}
|
||||
|
||||
@@ -84,15 +85,15 @@ func TestGetEtagCache(t *testing.T) {
|
||||
|
||||
// We have not invalidated the cache so this should be the same as above
|
||||
etag3 := (<-store.Post().GetEtag(o1.ChannelId, true)).Data.(string)
|
||||
if strings.Index(etag3, model.CurrentVersion+".0.") != 0 {
|
||||
if strings.Index(etag3, etag2) != 0 {
|
||||
t.Fatal("Invalid Etag")
|
||||
}
|
||||
|
||||
store.Post().InvalidatePostEtagCache(o1.ChannelId)
|
||||
store.Post().InvalidateLastPostTimeCache(o1.ChannelId)
|
||||
|
||||
// Invalidated cache so we should get a good result
|
||||
etag4 := (<-store.Post().GetEtag(o1.ChannelId, true)).Data.(string)
|
||||
if strings.Index(etag4, model.CurrentVersion+"."+o1.Id) != 0 {
|
||||
if strings.Index(etag4, fmt.Sprintf("%v.%v", model.CurrentVersion, o1.UpdateAt)) != 0 {
|
||||
t.Fatal("Invalid Etag")
|
||||
}
|
||||
}
|
||||
@@ -200,7 +201,7 @@ func TestPostStoreDelete(t *testing.T) {
|
||||
o1.Message = "a" + model.NewId() + "b"
|
||||
|
||||
etag1 := (<-store.Post().GetEtag(o1.ChannelId, false)).Data.(string)
|
||||
if strings.Index(etag1, model.CurrentVersion+".0.") != 0 {
|
||||
if strings.Index(etag1, model.CurrentVersion+".") != 0 {
|
||||
t.Fatal("Invalid Etag")
|
||||
}
|
||||
|
||||
@@ -224,7 +225,7 @@ func TestPostStoreDelete(t *testing.T) {
|
||||
}
|
||||
|
||||
etag2 := (<-store.Post().GetEtag(o1.ChannelId, false)).Data.(string)
|
||||
if strings.Index(etag2, model.CurrentVersion+"."+o1.Id) != 0 {
|
||||
if strings.Index(etag2, model.CurrentVersion+".") != 0 {
|
||||
t.Fatal("Invalid Etag")
|
||||
}
|
||||
}
|
||||
@@ -680,7 +681,7 @@ func TestPostStoreGetPostsSince(t *testing.T) {
|
||||
o5.RootId = o4.Id
|
||||
o5 = (<-store.Post().Save(o5)).Data.(*model.Post)
|
||||
|
||||
r1 := (<-store.Post().GetPostsSince(o1.ChannelId, o1.CreateAt)).Data.(*model.PostList)
|
||||
r1 := (<-store.Post().GetPostsSince(o1.ChannelId, o1.CreateAt, false)).Data.(*model.PostList)
|
||||
|
||||
if r1.Order[0] != o5.Id {
|
||||
t.Fatal("invalid order")
|
||||
@@ -705,6 +706,12 @@ func TestPostStoreGetPostsSince(t *testing.T) {
|
||||
if r1.Posts[o1.Id].Message != o1.Message {
|
||||
t.Fatal("Missing parent")
|
||||
}
|
||||
|
||||
r2 := (<-store.Post().GetPostsSince(o1.ChannelId, o5.UpdateAt, true)).Data.(*model.PostList)
|
||||
|
||||
if len(r2.Order) != 0 {
|
||||
t.Fatal("wrong size ", len(r2.Posts))
|
||||
}
|
||||
}
|
||||
|
||||
func TestPostStoreSearch(t *testing.T) {
|
||||
|
||||
@@ -130,13 +130,13 @@ type PostStore interface {
|
||||
GetFlaggedPosts(userId string, offset int, limit int) StoreChannel
|
||||
GetPostsBefore(channelId string, postId string, numPosts int, offset int) StoreChannel
|
||||
GetPostsAfter(channelId string, postId string, numPosts int, offset int) StoreChannel
|
||||
GetPostsSince(channelId string, time int64) StoreChannel
|
||||
GetPostsSince(channelId string, time int64, allowFromCache bool) StoreChannel
|
||||
GetEtag(channelId string, allowFromCache bool) StoreChannel
|
||||
Search(teamId string, userId string, params *model.SearchParams) StoreChannel
|
||||
AnalyticsUserCountsWithPostsByDay(teamId string) StoreChannel
|
||||
AnalyticsPostCountsByDay(teamId string) StoreChannel
|
||||
AnalyticsPostCount(teamId string, mustHaveFile bool, mustHaveHashtag bool) StoreChannel
|
||||
InvalidatePostEtagCache(channelId string)
|
||||
InvalidateLastPostTimeCache(channelId string)
|
||||
}
|
||||
|
||||
type UserStore interface {
|
||||
|
||||
Reference in New Issue
Block a user