mirror of
https://github.com/mattermost/mattermost.git
synced 2025-02-25 18:55:24 -06:00
PLT-7639: Batch delete methods for data retention. (#7444)
This commit is contained in:
committed by
Christopher Speller
parent
2628022275
commit
8195c80aa1
16
i18n/en.json
16
i18n/en.json
@@ -5263,6 +5263,10 @@
|
||||
"id": "store.sql_audit.save.saving.app_error",
|
||||
"translation": "We encountered an error saving the audit"
|
||||
},
|
||||
{
|
||||
"id": "store.sql_audit.permanent_delete_batch.app_error",
|
||||
"translation": "We encountered an error permanently deleting the batch of audits"
|
||||
},
|
||||
{
|
||||
"id": "store.sql_channel.analytics_deleted_type_count.app_error",
|
||||
"translation": "We couldn't get deleted channel type counts"
|
||||
@@ -5623,6 +5627,10 @@
|
||||
"id": "store.sql_file_info.save_or_update.app_error",
|
||||
"translation": "We couldn't save or update the file info"
|
||||
},
|
||||
{
|
||||
"id": "store.sql_file_info.permanent_delete_batch.app_error",
|
||||
"translation": "We encountered an error permanently deleting the batch of file infos"
|
||||
},
|
||||
{
|
||||
"id": "store.sql_job.delete.app_error",
|
||||
"translation": "We couldn't delete the job"
|
||||
@@ -5855,6 +5863,10 @@
|
||||
"id": "store.sql_post.update.app_error",
|
||||
"translation": "We couldn't update the Post"
|
||||
},
|
||||
{
|
||||
"id": "store.sql_post.permanent_delete_batch.app_error",
|
||||
"translation": "We encountered an error permanently deleting the batch of posts"
|
||||
},
|
||||
{
|
||||
"id": "store.sql_preference.delete.app_error",
|
||||
"translation": "We encountered an error while deleting preferences"
|
||||
@@ -5955,6 +5967,10 @@
|
||||
"id": "store.sql_reaction.save.save.app_error",
|
||||
"translation": "Unable to save reaction"
|
||||
},
|
||||
{
|
||||
"id": "store.sql_reaction.permanent_delete_batch.app_error",
|
||||
"translation": "We encountered an error permanently deleting the batch of reactions"
|
||||
},
|
||||
{
|
||||
"id": "store.sql_session.analytics_session_count.app_error",
|
||||
"translation": "We couldn't count the sessions"
|
||||
|
||||
@@ -199,3 +199,9 @@ func (s *LayeredReactionStore) DeleteAllWithEmojiName(emojiName string) StoreCha
|
||||
return supplier.ReactionDeleteAllWithEmojiName(s.TmpContext, emojiName)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *LayeredReactionStore) PermanentDeleteBatch(endTime int64, limit int64) StoreChannel {
|
||||
return s.RunQuery(func(supplier LayeredStoreSupplier) *LayeredStoreSupplierResult {
|
||||
return supplier.ReactionPermanentDeleteBatch(s.TmpContext, endTime, limit)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -30,4 +30,5 @@ type LayeredStoreSupplier interface {
|
||||
ReactionDelete(ctx context.Context, reaction *model.Reaction, hints ...LayeredStoreHint) *LayeredStoreSupplierResult
|
||||
ReactionGetForPost(ctx context.Context, postId string, hints ...LayeredStoreHint) *LayeredStoreSupplierResult
|
||||
ReactionDeleteAllWithEmojiName(ctx context.Context, emojiName string, hints ...LayeredStoreHint) *LayeredStoreSupplierResult
|
||||
ReactionPermanentDeleteBatch(ctx context.Context, endTime int64, limit int64, hints ...LayeredStoreHint) *LayeredStoreSupplierResult
|
||||
}
|
||||
|
||||
@@ -45,3 +45,9 @@ func (s *LocalCacheSupplier) ReactionDeleteAllWithEmojiName(ctx context.Context,
|
||||
s.doClearCacheCluster(s.reactionCache)
|
||||
return s.Next().ReactionDeleteAllWithEmojiName(ctx, emojiName, hints...)
|
||||
}
|
||||
|
||||
func (s *LocalCacheSupplier) ReactionPermanentDeleteBatch(ctx context.Context, endTime int64, limit int64, hints ...LayeredStoreHint) *LayeredStoreSupplierResult {
|
||||
// Don't bother to clear the cache as the posts will be gone anyway and the reactions being deleted will
|
||||
// expire from the cache in due course.
|
||||
return s.Next().ReactionPermanentDeleteBatch(ctx, endTime, limit)
|
||||
}
|
||||
|
||||
@@ -131,3 +131,8 @@ func (s *RedisSupplier) ReactionDeleteAllWithEmojiName(ctx context.Context, emoj
|
||||
// Ignoring this. It's probably OK to have the emoji slowly expire from Redis.
|
||||
return s.Next().ReactionDeleteAllWithEmojiName(ctx, emojiName, hints...)
|
||||
}
|
||||
|
||||
func (s *RedisSupplier) ReactionPermanentDeleteBatch(ctx context.Context, endTime int64, limit int64, hints ...LayeredStoreHint) *LayeredStoreSupplierResult {
|
||||
// Ignoring this. It's probably OK to have the emoji slowly expire from Redis.
|
||||
return s.Next().ReactionPermanentDeleteBatch(ctx, endTime, limit, hints...)
|
||||
}
|
||||
|
||||
@@ -4,7 +4,10 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/mattermost/mattermost-server/model"
|
||||
"github.com/mattermost/mattermost-server/utils"
|
||||
)
|
||||
|
||||
type SqlAuditStore struct {
|
||||
@@ -109,3 +112,36 @@ func (s SqlAuditStore) PermanentDeleteByUser(userId string) StoreChannel {
|
||||
|
||||
return storeChannel
|
||||
}
|
||||
|
||||
func (s SqlAuditStore) PermanentDeleteBatch(endTime int64, limit int64) StoreChannel {
|
||||
storeChannel := make(StoreChannel, 1)
|
||||
|
||||
go func() {
|
||||
result := StoreResult{}
|
||||
|
||||
var query string
|
||||
if *utils.Cfg.SqlSettings.DriverName == "postgres" {
|
||||
query = "DELETE from Audits WHERE Id = any (array (SELECT Id FROM Audits WHERE CreateAt < :EndTime LIMIT :Limit))"
|
||||
} else {
|
||||
query = "DELETE from Audits WHERE CreateAt < :EndTime LIMIT :Limit"
|
||||
}
|
||||
|
||||
sqlResult, err := s.GetMaster().Exec(query, map[string]interface{}{"EndTime": endTime, "Limit": limit})
|
||||
if err != nil {
|
||||
result.Err = model.NewAppError("SqlAuditStore.PermanentDeleteBatch", "store.sql_audit.permanent_delete_batch.app_error", nil, ""+err.Error(), http.StatusInternalServerError)
|
||||
} else {
|
||||
rowsAffected, err1 := sqlResult.RowsAffected()
|
||||
if err1 != nil {
|
||||
result.Err = model.NewAppError("SqlAuditStore.PermanentDeleteBatch", "store.sql_audit.permanent_delete_batch.app_error", nil, ""+err.Error(), http.StatusInternalServerError)
|
||||
result.Data = int64(0)
|
||||
} else {
|
||||
result.Data = rowsAffected
|
||||
}
|
||||
}
|
||||
|
||||
storeChannel <- result
|
||||
close(storeChannel)
|
||||
}()
|
||||
|
||||
return storeChannel
|
||||
}
|
||||
|
||||
@@ -58,3 +58,32 @@ func TestSqlAuditStore(t *testing.T) {
|
||||
t.Fatal(r2.Err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAuditStorePermanentDeleteBatch(t *testing.T) {
|
||||
Setup()
|
||||
|
||||
a1 := &model.Audit{UserId: model.NewId(), IpAddress: "ipaddress", Action: "Action"}
|
||||
Must(store.Audit().Save(a1))
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
a2 := &model.Audit{UserId: a1.UserId, IpAddress: "ipaddress", Action: "Action"}
|
||||
Must(store.Audit().Save(a2))
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
cutoff := model.GetMillis()
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
a3 := &model.Audit{UserId: a1.UserId, IpAddress: "ipaddress", Action: "Action"}
|
||||
Must(store.Audit().Save(a3))
|
||||
|
||||
if r := <-store.Audit().Get(a1.UserId, 0, 100); len(r.Data.(model.Audits)) != 3 {
|
||||
t.Fatal("Expected 3 audits. Got ", len(r.Data.(model.Audits)))
|
||||
}
|
||||
|
||||
Must(store.Audit().PermanentDeleteBatch(cutoff, 1000000))
|
||||
|
||||
if r := <-store.Audit().Get(a1.UserId, 0, 100); len(r.Data.(model.Audits)) != 1 {
|
||||
t.Fatal("Expected 1 audit. Got ", len(r.Data.(model.Audits)))
|
||||
}
|
||||
|
||||
if r2 := <-store.Audit().PermanentDeleteByUser(a1.UserId); r2.Err != nil {
|
||||
t.Fatal(r2.Err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -281,3 +281,36 @@ func (fs SqlFileInfoStore) PermanentDelete(fileId string) StoreChannel {
|
||||
|
||||
return storeChannel
|
||||
}
|
||||
|
||||
func (s SqlFileInfoStore) PermanentDeleteBatch(endTime int64, limit int64) StoreChannel {
|
||||
storeChannel := make(StoreChannel, 1)
|
||||
|
||||
go func() {
|
||||
result := StoreResult{}
|
||||
|
||||
var query string
|
||||
if *utils.Cfg.SqlSettings.DriverName == "postgres" {
|
||||
query = "DELETE from FileInfo WHERE Id = any (array (SELECT Id FROM FileInfo WHERE CreateAt < :EndTime LIMIT :Limit))"
|
||||
} else {
|
||||
query = "DELETE from FileInfo WHERE CreateAt < :EndTime LIMIT :Limit"
|
||||
}
|
||||
|
||||
sqlResult, err := s.GetMaster().Exec(query, map[string]interface{}{"EndTime": endTime, "Limit": limit})
|
||||
if err != nil {
|
||||
result.Err = model.NewAppError("SqlFileInfoStore.PermanentDeleteBatch", "store.sql_file_info.permanent_delete_batch.app_error", nil, ""+err.Error(), http.StatusInternalServerError)
|
||||
} else {
|
||||
rowsAffected, err1 := sqlResult.RowsAffected()
|
||||
if err1 != nil {
|
||||
result.Err = model.NewAppError("SqlFileInfoStore.PermanentDeleteBatch", "store.sql_file_info.permanent_delete_batch.app_error", nil, ""+err.Error(), http.StatusInternalServerError)
|
||||
result.Data = int64(0)
|
||||
} else {
|
||||
result.Data = rowsAffected
|
||||
}
|
||||
}
|
||||
|
||||
storeChannel <- result
|
||||
close(storeChannel)
|
||||
}()
|
||||
|
||||
return storeChannel
|
||||
}
|
||||
|
||||
@@ -256,3 +256,44 @@ func TestFileInfoPermanentDelete(t *testing.T) {
|
||||
t.Fatal(result.Err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFileInfoPermanentDeleteBatch(t *testing.T) {
|
||||
Setup()
|
||||
|
||||
postId := model.NewId()
|
||||
|
||||
Must(store.FileInfo().Save(&model.FileInfo{
|
||||
PostId: postId,
|
||||
CreatorId: model.NewId(),
|
||||
Path: "file.txt",
|
||||
CreateAt: 1000,
|
||||
}))
|
||||
|
||||
Must(store.FileInfo().Save(&model.FileInfo{
|
||||
PostId: postId,
|
||||
CreatorId: model.NewId(),
|
||||
Path: "file.txt",
|
||||
CreateAt: 1200,
|
||||
}))
|
||||
|
||||
Must(store.FileInfo().Save(&model.FileInfo{
|
||||
PostId: postId,
|
||||
CreatorId: model.NewId(),
|
||||
Path: "file.txt",
|
||||
CreateAt: 2000,
|
||||
}))
|
||||
|
||||
if result := <-store.FileInfo().GetForPost(postId, true, false); result.Err != nil {
|
||||
t.Fatal(result.Err)
|
||||
} else if len(result.Data.([]*model.FileInfo)) != 3 {
|
||||
t.Fatal("Expected 3 fileInfos")
|
||||
}
|
||||
|
||||
Must(store.FileInfo().PermanentDeleteBatch(1500, 1000))
|
||||
|
||||
if result := <-store.FileInfo().GetForPost(postId, true, false); result.Err != nil {
|
||||
t.Fatal(result.Err)
|
||||
} else if len(result.Data.([]*model.FileInfo)) != 1 {
|
||||
t.Fatal("Expected 3 fileInfos")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1356,3 +1356,36 @@ func (s SqlPostStore) GetPostsBatchForIndexing(startTime int64, limit int) Store
|
||||
|
||||
return storeChannel
|
||||
}
|
||||
|
||||
func (s SqlPostStore) PermanentDeleteBatch(endTime int64, limit int64) StoreChannel {
|
||||
storeChannel := make(StoreChannel, 1)
|
||||
|
||||
go func() {
|
||||
result := StoreResult{}
|
||||
|
||||
var query string
|
||||
if *utils.Cfg.SqlSettings.DriverName == "postgres" {
|
||||
query = "DELETE from Posts WHERE Id = any (array (SELECT Id FROM Posts WHERE CreateAt < :EndTime LIMIT :Limit))"
|
||||
} else {
|
||||
query = "DELETE from Posts WHERE CreateAt < :EndTime LIMIT :Limit"
|
||||
}
|
||||
|
||||
sqlResult, err := s.GetMaster().Exec(query, map[string]interface{}{"EndTime": endTime, "Limit": limit})
|
||||
if err != nil {
|
||||
result.Err = model.NewAppError("SqlPostStore.PermanentDeleteBatch", "store.sql_post.permanent_delete_batch.app_error", nil, ""+err.Error(), http.StatusInternalServerError)
|
||||
} else {
|
||||
rowsAffected, err1 := sqlResult.RowsAffected()
|
||||
if err1 != nil {
|
||||
result.Err = model.NewAppError("SqlPostStore.PermanentDeleteBatch", "store.sql_post.permanent_delete_batch.app_error", nil, ""+err.Error(), http.StatusInternalServerError)
|
||||
result.Data = int64(0)
|
||||
} else {
|
||||
result.Data = rowsAffected
|
||||
}
|
||||
}
|
||||
|
||||
storeChannel <- result
|
||||
close(storeChannel)
|
||||
}()
|
||||
|
||||
return storeChannel
|
||||
}
|
||||
|
||||
@@ -1661,3 +1661,42 @@ func TestPostStoreGetPostsBatchForIndexing(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPostStorePermanentDeleteBatch(t *testing.T) {
|
||||
Setup()
|
||||
|
||||
o1 := &model.Post{}
|
||||
o1.ChannelId = model.NewId()
|
||||
o1.UserId = model.NewId()
|
||||
o1.Message = "zz" + model.NewId() + "AAAAAAAAAAA"
|
||||
o1.CreateAt = 1000
|
||||
o1 = (<-store.Post().Save(o1)).Data.(*model.Post)
|
||||
|
||||
o2 := &model.Post{}
|
||||
o2.ChannelId = model.NewId()
|
||||
o2.UserId = model.NewId()
|
||||
o2.Message = "zz" + model.NewId() + "AAAAAAAAAAA"
|
||||
o2.CreateAt = 1000
|
||||
o2 = (<-store.Post().Save(o2)).Data.(*model.Post)
|
||||
|
||||
o3 := &model.Post{}
|
||||
o3.ChannelId = model.NewId()
|
||||
o3.UserId = model.NewId()
|
||||
o3.Message = "zz" + model.NewId() + "AAAAAAAAAAA"
|
||||
o3.CreateAt = 100000
|
||||
o3 = (<-store.Post().Save(o3)).Data.(*model.Post)
|
||||
|
||||
Must(store.Post().PermanentDeleteBatch(2000, 1000))
|
||||
|
||||
if p := <-store.Post().Get(o1.Id); p.Err == nil {
|
||||
t.Fatalf("Should have not found post 1 after purge")
|
||||
}
|
||||
|
||||
if p := <-store.Post().Get(o2.Id); p.Err == nil {
|
||||
t.Fatalf("Should have not found post 2 after purge")
|
||||
}
|
||||
|
||||
if p := <-store.Post().Get(o3.Id); p.Err != nil {
|
||||
t.Fatalf("Should have found post 3 after purge")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -294,3 +294,58 @@ func TestReactionDeleteAllWithEmojiName(t *testing.T) {
|
||||
t.Fatal("post shouldn't have reactions any more")
|
||||
}
|
||||
}
|
||||
|
||||
func TestReactionStorePermanentDeleteBatch(t *testing.T) {
|
||||
Setup()
|
||||
|
||||
post := Must(store.Post().Save(&model.Post{
|
||||
ChannelId: model.NewId(),
|
||||
UserId: model.NewId(),
|
||||
})).(*model.Post)
|
||||
|
||||
reactions := []*model.Reaction{
|
||||
{
|
||||
UserId: model.NewId(),
|
||||
PostId: post.Id,
|
||||
EmojiName: "sad",
|
||||
CreateAt: 1000,
|
||||
},
|
||||
{
|
||||
UserId: model.NewId(),
|
||||
PostId: post.Id,
|
||||
EmojiName: "sad",
|
||||
CreateAt: 1500,
|
||||
},
|
||||
{
|
||||
UserId: model.NewId(),
|
||||
PostId: post.Id,
|
||||
EmojiName: "sad",
|
||||
CreateAt: 2000,
|
||||
},
|
||||
{
|
||||
UserId: model.NewId(),
|
||||
PostId: post.Id,
|
||||
EmojiName: "sad",
|
||||
CreateAt: 2000,
|
||||
},
|
||||
}
|
||||
|
||||
// Need to hang on to a reaction to delete later in order to clear the cache, as "allowFromCache" isn't honoured any more.
|
||||
var lastReaction *model.Reaction
|
||||
for _, reaction := range reactions {
|
||||
lastReaction = Must(store.Reaction().Save(reaction)).(*model.Reaction)
|
||||
}
|
||||
|
||||
if returned := Must(store.Reaction().GetForPost(post.Id, false)).([]*model.Reaction); len(returned) != 4 {
|
||||
t.Fatal("expected 4 reactions")
|
||||
}
|
||||
|
||||
Must(store.Reaction().PermanentDeleteBatch(1800, 1000))
|
||||
|
||||
// This is to force a clear of the cache.
|
||||
Must(store.Reaction().Delete(lastReaction))
|
||||
|
||||
if returned := Must(store.Reaction().GetForPost(post.Id, false)).([]*model.Reaction); len(returned) != 1 {
|
||||
t.Fatalf("expected 1 reaction. Got: %v", len(returned))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,8 +5,10 @@ package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
l4g "github.com/alecthomas/log4go"
|
||||
|
||||
"github.com/mattermost/gorp"
|
||||
"github.com/mattermost/mattermost-server/model"
|
||||
"github.com/mattermost/mattermost-server/utils"
|
||||
@@ -140,6 +142,32 @@ func (s *SqlSupplier) ReactionDeleteAllWithEmojiName(ctx context.Context, emojiN
|
||||
return result
|
||||
}
|
||||
|
||||
func (s *SqlSupplier) ReactionPermanentDeleteBatch(ctx context.Context, endTime int64, limit int64, hints ...LayeredStoreHint) *LayeredStoreSupplierResult {
|
||||
result := NewSupplierResult()
|
||||
|
||||
var query string
|
||||
if *utils.Cfg.SqlSettings.DriverName == "postgres" {
|
||||
query = "DELETE from Reactions WHERE Id = any (array (SELECT Id FROM Reactions WHERE CreateAt < :EndTime LIMIT :Limit))"
|
||||
} else {
|
||||
query = "DELETE from Reactions WHERE CreateAt < :EndTime LIMIT :Limit"
|
||||
}
|
||||
|
||||
sqlResult, err := s.GetMaster().Exec(query, map[string]interface{}{"EndTime": endTime, "Limit": limit})
|
||||
if err != nil {
|
||||
result.Err = model.NewAppError("SqlReactionStore.PermanentDeleteBatch", "store.sql_reaction.permanent_delete_batch.app_error", nil, ""+err.Error(), http.StatusInternalServerError)
|
||||
} else {
|
||||
rowsAffected, err1 := sqlResult.RowsAffected()
|
||||
if err1 != nil {
|
||||
result.Err = model.NewAppError("SqlReactionStore.PermanentDeleteBatch", "store.sql_reaction.permanent_delete_batch.app_error", nil, ""+err.Error(), http.StatusInternalServerError)
|
||||
result.Data = int64(0)
|
||||
} else {
|
||||
result.Data = rowsAffected
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func saveReactionAndUpdatePost(transaction *gorp.Transaction, reaction *model.Reaction) error {
|
||||
if err := transaction.Insert(reaction); err != nil {
|
||||
return err
|
||||
|
||||
@@ -171,6 +171,7 @@ type PostStore interface {
|
||||
Overwrite(post *model.Post) StoreChannel
|
||||
GetPostsByIds(postIds []string) StoreChannel
|
||||
GetPostsBatchForIndexing(startTime int64, limit int) StoreChannel
|
||||
PermanentDeleteBatch(endTime int64, limit int64) StoreChannel
|
||||
}
|
||||
|
||||
type UserStore interface {
|
||||
@@ -242,6 +243,7 @@ type AuditStore interface {
|
||||
Save(audit *model.Audit) StoreChannel
|
||||
Get(user_id string, offset int, limit int) StoreChannel
|
||||
PermanentDeleteByUser(userId string) StoreChannel
|
||||
PermanentDeleteBatch(endTime int64, limit int64) StoreChannel
|
||||
}
|
||||
|
||||
type ClusterDiscoveryStore interface {
|
||||
@@ -387,6 +389,7 @@ type FileInfoStore interface {
|
||||
AttachToPost(fileId string, postId string) StoreChannel
|
||||
DeleteForPost(postId string) StoreChannel
|
||||
PermanentDelete(fileId string) StoreChannel
|
||||
PermanentDeleteBatch(endTime int64, limit int64) StoreChannel
|
||||
}
|
||||
|
||||
type ReactionStore interface {
|
||||
@@ -394,6 +397,7 @@ type ReactionStore interface {
|
||||
Delete(reaction *model.Reaction) StoreChannel
|
||||
GetForPost(postId string, allowFromCache bool) StoreChannel
|
||||
DeleteAllWithEmojiName(emojiName string) StoreChannel
|
||||
PermanentDeleteBatch(endTime int64, limit int64) StoreChannel
|
||||
}
|
||||
|
||||
type JobStore interface {
|
||||
|
||||
Reference in New Issue
Block a user