[MM-53291] Data retention improvements (#24253)

* adding new migration for RetentionIdsForDeletion, changing logic for deleting orphaned reactions. Updating delete user and channel endpoints to remove respective reactions
This commit is contained in:
Ben Cooke 2023-09-06 08:25:27 -04:00 committed by GitHub
parent d13429aa92
commit 791ee40568
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 820 additions and 275 deletions

View File

@ -1700,6 +1700,10 @@ func (a *App) PermanentDeleteUser(c *request.Context, user *model.User) *model.A
return model.NewAppError("PermanentDeleteUser", "app.post.permanent_delete_by_user.app_error", nil, "", http.StatusInternalServerError).Wrap(err)
}
if err := a.Srv().Store().Reaction().PermanentDeleteByUser(user.Id); err != nil {
return model.NewAppError("PermanentDeleteUser", "app.reaction.permanent_delete_by_user.app_error", nil, "", http.StatusInternalServerError).Wrap(err)
}
if err := a.Srv().Store().Bot().PermanentDelete(user.Id); err != nil {
var invErr *store.ErrInvalidInput
switch {

View File

@ -222,6 +222,8 @@ channels/db/migrations/mysql/000111_update_vacuuming.down.sql
channels/db/migrations/mysql/000111_update_vacuuming.up.sql
channels/db/migrations/mysql/000112_rework_desktop_tokens.down.sql
channels/db/migrations/mysql/000112_rework_desktop_tokens.up.sql
channels/db/migrations/mysql/000113_create_retentionidsfordeletion_table.down.sql
channels/db/migrations/mysql/000113_create_retentionidsfordeletion_table.up.sql
channels/db/migrations/postgres/000001_create_teams.down.sql
channels/db/migrations/postgres/000001_create_teams.up.sql
channels/db/migrations/postgres/000002_create_team_members.down.sql
@ -444,3 +446,5 @@ channels/db/migrations/postgres/000111_update_vacuuming.down.sql
channels/db/migrations/postgres/000111_update_vacuuming.up.sql
channels/db/migrations/postgres/000112_rework_desktop_tokens.down.sql
channels/db/migrations/postgres/000112_rework_desktop_tokens.up.sql
channels/db/migrations/postgres/000113_create_retentionidsfordeletion_table.down.sql
channels/db/migrations/postgres/000113_create_retentionidsfordeletion_table.up.sql

View File

@ -0,0 +1 @@
DROP TABLE IF EXISTS RetentionIdsForDeletion;

View File

@ -0,0 +1,7 @@
CREATE TABLE IF NOT EXISTS RetentionIdsForDeletion (
Id varchar(26) NOT NULL,
TableName varchar(64),
Ids json,
PRIMARY KEY (Id),
KEY idx_retentionidsfordeletion_tablename (TableName)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

View File

@ -0,0 +1,3 @@
DROP INDEX IF EXISTS idx_retentionidsfordeletion_tablename;
DROP TABLE IF EXISTS retentionidsfordeletion;

View File

@ -0,0 +1,7 @@
CREATE TABLE IF NOT EXISTS retentionidsfordeletion (
id varchar(26) PRIMARY KEY,
tablename varchar(64),
ids varchar(26)[]
);
CREATE INDEX IF NOT EXISTS idx_retentionidsfordeletion_tablename ON retentionidsfordeletion (tablename);

View File

@ -6001,24 +6001,6 @@ func (s *OpenTracingLayerPostStore) Delete(postID string, timestamp int64, delet
return err
}
func (s *OpenTracingLayerPostStore) DeleteOrphanedRows(limit int) (int64, error) {
origCtx := s.Root.Store.Context()
span, newCtx := tracing.StartSpanWithParentByContext(s.Root.Store.Context(), "PostStore.DeleteOrphanedRows")
s.Root.Store.SetContext(newCtx)
defer func() {
s.Root.Store.SetContext(origCtx)
}()
defer span.Finish()
result, err := s.PostStore.DeleteOrphanedRows(limit)
if err != nil {
span.LogFields(spanlog.Error(err))
ext.Error.Set(span, true)
}
return result, err
}
func (s *OpenTracingLayerPostStore) Get(ctx context.Context, id string, opts model.GetPostsOptions, userID string, sanitizeOptions map[string]bool) (*model.PostList, error) {
origCtx := s.Root.Store.Context()
span, newCtx := tracing.StartSpanWithParentByContext(s.Root.Store.Context(), "PostStore.Get")
@ -7344,22 +7326,22 @@ func (s *OpenTracingLayerReactionStore) DeleteAllWithEmojiName(emojiName string)
return err
}
func (s *OpenTracingLayerReactionStore) DeleteOrphanedRows(limit int) (int64, error) {
func (s *OpenTracingLayerReactionStore) DeleteOrphanedRowsByIds(r *model.RetentionIdsForDeletion) error {
origCtx := s.Root.Store.Context()
span, newCtx := tracing.StartSpanWithParentByContext(s.Root.Store.Context(), "ReactionStore.DeleteOrphanedRows")
span, newCtx := tracing.StartSpanWithParentByContext(s.Root.Store.Context(), "ReactionStore.DeleteOrphanedRowsByIds")
s.Root.Store.SetContext(newCtx)
defer func() {
s.Root.Store.SetContext(origCtx)
}()
defer span.Finish()
result, err := s.ReactionStore.DeleteOrphanedRows(limit)
err := s.ReactionStore.DeleteOrphanedRowsByIds(r)
if err != nil {
span.LogFields(spanlog.Error(err))
ext.Error.Set(span, true)
}
return result, err
return err
}
func (s *OpenTracingLayerReactionStore) GetForPost(postID string, allowFromCache bool) ([]*model.Reaction, error) {
@ -7416,6 +7398,24 @@ func (s *OpenTracingLayerReactionStore) PermanentDeleteBatch(endTime int64, limi
return result, err
}
func (s *OpenTracingLayerReactionStore) PermanentDeleteByUser(userID string) error {
origCtx := s.Root.Store.Context()
span, newCtx := tracing.StartSpanWithParentByContext(s.Root.Store.Context(), "ReactionStore.PermanentDeleteByUser")
s.Root.Store.SetContext(newCtx)
defer func() {
s.Root.Store.SetContext(origCtx)
}()
defer span.Finish()
err := s.ReactionStore.PermanentDeleteByUser(userID)
if err != nil {
span.LogFields(spanlog.Error(err))
ext.Error.Set(span, true)
}
return err
}
func (s *OpenTracingLayerReactionStore) Save(reaction *model.Reaction) (*model.Reaction, error) {
origCtx := s.Root.Store.Context()
span, newCtx := tracing.StartSpanWithParentByContext(s.Root.Store.Context(), "ReactionStore.Save")
@ -7758,6 +7758,24 @@ func (s *OpenTracingLayerRetentionPolicyStore) GetCount() (int64, error) {
return result, err
}
func (s *OpenTracingLayerRetentionPolicyStore) GetIdsForDeletionByTableName(tableName string, limit int) ([]*model.RetentionIdsForDeletion, error) {
origCtx := s.Root.Store.Context()
span, newCtx := tracing.StartSpanWithParentByContext(s.Root.Store.Context(), "RetentionPolicyStore.GetIdsForDeletionByTableName")
s.Root.Store.SetContext(newCtx)
defer func() {
s.Root.Store.SetContext(origCtx)
}()
defer span.Finish()
result, err := s.RetentionPolicyStore.GetIdsForDeletionByTableName(tableName, limit)
if err != nil {
span.LogFields(spanlog.Error(err))
ext.Error.Set(span, true)
}
return result, err
}
func (s *OpenTracingLayerRetentionPolicyStore) GetTeamPoliciesCountForUser(userID string) (int64, error) {
origCtx := s.Root.Store.Context()
span, newCtx := tracing.StartSpanWithParentByContext(s.Root.Store.Context(), "RetentionPolicyStore.GetTeamPoliciesCountForUser")

View File

@ -6806,27 +6806,6 @@ func (s *RetryLayerPostStore) Delete(postID string, timestamp int64, deleteByID
}
func (s *RetryLayerPostStore) DeleteOrphanedRows(limit int) (int64, error) {
tries := 0
for {
result, err := s.PostStore.DeleteOrphanedRows(limit)
if err == nil {
return result, nil
}
if !isRepeatableError(err) {
return result, err
}
tries++
if tries >= 3 {
err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures")
return result, err
}
timepkg.Sleep(100 * timepkg.Millisecond)
}
}
func (s *RetryLayerPostStore) Get(ctx context.Context, id string, opts model.GetPostsOptions, userID string, sanitizeOptions map[string]bool) (*model.PostList, error) {
tries := 0
@ -8336,21 +8315,21 @@ func (s *RetryLayerReactionStore) DeleteAllWithEmojiName(emojiName string) error
}
func (s *RetryLayerReactionStore) DeleteOrphanedRows(limit int) (int64, error) {
func (s *RetryLayerReactionStore) DeleteOrphanedRowsByIds(r *model.RetentionIdsForDeletion) error {
tries := 0
for {
result, err := s.ReactionStore.DeleteOrphanedRows(limit)
err := s.ReactionStore.DeleteOrphanedRowsByIds(r)
if err == nil {
return result, nil
return nil
}
if !isRepeatableError(err) {
return result, err
return err
}
tries++
if tries >= 3 {
err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures")
return result, err
return err
}
timepkg.Sleep(100 * timepkg.Millisecond)
}
@ -8420,6 +8399,27 @@ func (s *RetryLayerReactionStore) PermanentDeleteBatch(endTime int64, limit int6
}
func (s *RetryLayerReactionStore) PermanentDeleteByUser(userID string) error {
tries := 0
for {
err := s.ReactionStore.PermanentDeleteByUser(userID)
if err == nil {
return nil
}
if !isRepeatableError(err) {
return err
}
tries++
if tries >= 3 {
err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures")
return err
}
timepkg.Sleep(100 * timepkg.Millisecond)
}
}
func (s *RetryLayerReactionStore) Save(reaction *model.Reaction) (*model.Reaction, error) {
tries := 0
@ -8819,6 +8819,27 @@ func (s *RetryLayerRetentionPolicyStore) GetCount() (int64, error) {
}
func (s *RetryLayerRetentionPolicyStore) GetIdsForDeletionByTableName(tableName string, limit int) ([]*model.RetentionIdsForDeletion, error) {
tries := 0
for {
result, err := s.RetentionPolicyStore.GetIdsForDeletionByTableName(tableName, limit)
if err == nil {
return result, nil
}
if !isRepeatableError(err) {
return result, err
}
tries++
if tries >= 3 {
err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures")
return result, err
}
timepkg.Sleep(100 * timepkg.Millisecond)
}
}
func (s *RetryLayerRetentionPolicyStore) GetTeamPoliciesCountForUser(userID string) (int64, error) {
tries := 0

View File

@ -175,6 +175,7 @@ func (s SqlChannelMemberHistoryStore) PermanentDeleteBatchForRetentionPolicies(n
NowMillis: now,
GlobalPolicyEndTime: globalPolicyEndTime,
Limit: limit,
StoreDeletedIds: false,
}, s.SqlStore, cursor)
}

View File

@ -11,6 +11,7 @@ import (
"regexp"
"strings"
"sync"
"time"
sq "github.com/mattermost/squirrel"
"github.com/pkg/errors"
@ -929,24 +930,31 @@ func (s *SqlPostStore) Delete(postID string, time int64, deleteByID string) (err
return nil
}
func (s *SqlPostStore) permanentDelete(postId string) (err error) {
var post model.Post
func (s *SqlPostStore) permanentDelete(postIds []string) (err error) {
transaction, err := s.GetMasterX().Beginx()
if err != nil {
return errors.Wrap(err, "begin_transaction")
}
defer finalizeTransactionX(transaction, &err)
err = transaction.Get(&post, "SELECT * FROM Posts WHERE Id = ?", postId)
if err != nil && err != sql.ErrNoRows {
return errors.Wrapf(err, "failed to get Post with id=%s", postId)
}
if err = s.permanentDeleteThreads(transaction, post.Id); err != nil {
return errors.Wrapf(err, "failed to cleanup threads for Post with id=%s", postId)
if err = s.permanentDeleteThreads(transaction, postIds); err != nil {
return err
}
if _, err = transaction.NamedExec("DELETE FROM Posts WHERE Id = :id OR RootId = :rootid", map[string]any{"id": postId, "rootid": postId}); err != nil {
return errors.Wrapf(err, "failed to delete Post with id=%s", postId)
if err = s.permanentDeleteReactions(transaction, postIds); err != nil {
return err
}
query := s.getQueryBuilder().
Delete("Posts").
Where(
sq.Or{
sq.Eq{"Id": postIds},
sq.Eq{"RootId": postIds},
},
)
if _, err = transaction.ExecBuilder(query); err != nil {
return errors.Wrap(err, "failed to delete Posts")
}
if err = transaction.Commit(); err != nil {
@ -980,10 +988,17 @@ func (s *SqlPostStore) permanentDeleteAllCommentByUser(userId string) (err error
return errors.Wrapf(err, "failed to delete Posts with userId=%s", userId)
}
postIds := []string{}
for _, ids := range results {
if err = s.updateThreadAfterReplyDeletion(transaction, ids.RootId, userId); err != nil {
return err
}
postIds = append(postIds, ids.Id)
}
// Delete all the reactions on the comments
if err = s.permanentDeleteReactions(transaction, postIds); err != nil {
return err
}
if err = transaction.Commit(); err != nil {
@ -1005,22 +1020,20 @@ func (s *SqlPostStore) PermanentDeleteByUser(userId string) error {
// Now attempt to delete all the root posts for a user. This will also
// delete all the comments for each post
found := true
count := 0
for found {
for {
var ids []string
err := s.GetMasterX().Select(&ids, "SELECT Id FROM Posts WHERE UserId = ? LIMIT 1000", userId)
if err != nil {
return errors.Wrapf(err, "failed to find Posts with userId=%s", userId)
}
found = false
for _, id := range ids {
found = true
if err = s.permanentDelete(id); err != nil {
return err
}
if len(ids) == 0 {
break
}
if err = s.permanentDelete(ids); err != nil {
return err
}
// This is a fail safe, give up if more than 10k messages
@ -1035,6 +1048,7 @@ func (s *SqlPostStore) PermanentDeleteByUser(userId string) error {
// Permanent deletes all channel root posts and comments,
// deletes all threads and thread memberships
// deletes all reactions
// no thread comment cleanup needed, since we are deleting threads and thread memberships
func (s *SqlPostStore) PermanentDeleteByChannel(channelId string) (err error) {
transaction, err := s.GetMasterX().Beginx()
@ -1043,20 +1057,39 @@ func (s *SqlPostStore) PermanentDeleteByChannel(channelId string) (err error) {
}
defer finalizeTransactionX(transaction, &err)
results := []postIds{}
err = transaction.Select(&results, "SELECT Id, RootId, UserId FROM Posts WHERE ChannelId = ?", channelId)
if err != nil {
return errors.Wrapf(err, "failed to fetch Posts with channelId=%s", channelId)
}
id := ""
for {
ids := []string{}
err = transaction.Select(&ids, "SELECT Id FROM Posts WHERE ChannelId = ? AND Id > ? ORDER BY Id ASC LIMIT 500", channelId, id)
if err != nil {
return errors.Wrapf(err, "failed to fetch Posts with channelId=%s", channelId)
}
for _, ids := range results {
if err = s.permanentDeleteThreads(transaction, ids.Id); err != nil {
if len(ids) == 0 {
break
}
id = ids[len(ids)-1]
if err = s.permanentDeleteThreads(transaction, ids); err != nil {
return err
}
}
time.Sleep(10 * time.Millisecond)
if _, err = transaction.Exec("DELETE FROM Posts WHERE ChannelId = ?", channelId); err != nil {
return errors.Wrapf(err, "failed to delete Posts with channelId=%s", channelId)
if err = s.permanentDeleteReactions(transaction, ids); err != nil {
return err
}
time.Sleep(10 * time.Millisecond)
query := s.getQueryBuilder().
Delete("Posts").
Where(
sq.Eq{"Id": ids},
)
if _, err = transaction.ExecBuilder(query); err != nil {
return errors.Wrap(err, "failed to delete Posts")
}
time.Sleep(10 * time.Millisecond)
}
if err = transaction.Commit(); err != nil {
@ -2419,48 +2452,10 @@ func (s *SqlPostStore) PermanentDeleteBatchForRetentionPolicies(now, globalPolic
NowMillis: now,
GlobalPolicyEndTime: globalPolicyEndTime,
Limit: limit,
StoreDeletedIds: true,
}, s.SqlStore, cursor)
}
// DeleteOrphanedRows removes entries from Posts when a corresponding channel no longer exists.
func (s *SqlPostStore) DeleteOrphanedRows(limit int) (deleted int64, err error) {
var query string
// We need the extra level of nesting to deal with MySQL's locking
if s.DriverName() == model.DatabaseDriverMysql {
// MySQL fails to do a proper antijoin if the selecting column
// and the joining column are different. In that case, doing a subquery
// leads to a faster plan because MySQL materializes the sub-query
// and does a covering index scan on Posts table. More details on the PR with
// this commit.
query = `
DELETE FROM Posts WHERE Id IN (
SELECT * FROM (
SELECT Posts.Id FROM Posts
WHERE Posts.ChannelId NOT IN (SELECT Id FROM Channels USE INDEX (PRIMARY))
LIMIT ?
) AS A
)`
} else {
query = `
DELETE FROM Posts WHERE Id IN (
SELECT * FROM (
SELECT Posts.Id FROM Posts
LEFT JOIN Channels ON Posts.ChannelId = Channels.Id
WHERE Channels.Id IS NULL
LIMIT ?
) AS A
)`
}
result, err := s.GetMasterX().Exec(query, limit)
if err != nil {
return
}
deleted, err = result.RowsAffected()
return
}
func (s *SqlPostStore) PermanentDeleteBatch(endTime int64, limit int64) (int64, error) {
var query string
if s.DriverName() == "postgres" {
@ -2779,16 +2774,40 @@ func (s *SqlPostStore) GetOldestEntityCreationTime() (int64, error) {
}
// Deletes a thread and a thread membership if the postId is a root post
func (s *SqlPostStore) permanentDeleteThreads(transaction *sqlxTxWrapper, postId string) error {
if _, err := transaction.Exec("DELETE FROM Threads WHERE PostId = ?", postId); err != nil {
func (s *SqlPostStore) permanentDeleteThreads(transaction *sqlxTxWrapper, postIds []string) error {
query := s.getQueryBuilder().
Delete("Threads").
Where(
sq.Eq{"PostId": postIds},
)
if _, err := transaction.ExecBuilder(query); err != nil {
return errors.Wrap(err, "failed to delete Threads")
}
if _, err := transaction.Exec("DELETE FROM ThreadMemberships WHERE PostId = ?", postId); err != nil {
query = s.getQueryBuilder().
Delete("ThreadMemberships").
Where(
sq.Eq{"PostId": postIds},
)
if _, err := transaction.ExecBuilder(query); err != nil {
return errors.Wrap(err, "failed to delete ThreadMemberships")
}
return nil
}
func (s *SqlPostStore) permanentDeleteReactions(transaction *sqlxTxWrapper, postIds []string) error {
query := s.getQueryBuilder().
Delete("Reactions").
Where(
sq.Eq{"PostId": postIds},
)
if _, err := transaction.ExecBuilder(query); err != nil {
return errors.Wrap(err, "failed to delete Reactions")
}
return nil
}
// deleteThread marks a thread as deleted at the given time.
func (s *SqlPostStore) deleteThread(transaction *sqlxTxWrapper, postId string, deleteAtTime int64) error {
queryString, args, err := s.getQueryBuilder().

View File

@ -4,6 +4,8 @@
package sqlstore
import (
"time"
sq "github.com/mattermost/squirrel"
"github.com/mattermost/mattermost/server/public/model"
@ -205,24 +207,91 @@ func (s *SqlReactionStore) DeleteAllWithEmojiName(emojiName string) error {
return nil
}
// DeleteOrphanedRows removes entries from Reactions when a corresponding post no longer exists.
func (s *SqlReactionStore) DeleteOrphanedRows(limit int) (deleted int64, err error) {
// We need the extra level of nesting to deal with MySQL's locking
const query = `
DELETE FROM Reactions WHERE PostId IN (
SELECT * FROM (
SELECT PostId FROM Reactions
LEFT JOIN Posts ON Reactions.PostId = Posts.Id
WHERE Posts.Id IS NULL
LIMIT ?
) AS A
)`
result, err := s.GetMasterX().Exec(query, limit)
func (s *SqlReactionStore) permanentDeleteReactions(userId string, postIds *[]string) error {
txn, err := s.GetMasterX().Beginx()
if err != nil {
return
return err
}
deleted, err = result.RowsAffected()
return
defer finalizeTransactionX(txn, &err)
err = txn.Select(postIds, "SELECT PostId FROM Reactions WHERE UserId = ?", userId)
if err != nil {
return errors.Wrapf(err, "failed to get Reactions with userId=%s", userId)
}
query := s.getQueryBuilder().
Delete("Reactions").
Where(sq.And{
sq.Eq{"PostId": postIds},
sq.Eq{"UserId": userId},
})
_, err = txn.ExecBuilder(query)
if err != nil {
return errors.Wrapf(err, "failed to delete reactions with userId=%s", userId)
}
if err = txn.Commit(); err != nil {
return err
}
return nil
}
func (s SqlReactionStore) PermanentDeleteByUser(userId string) error {
now := model.GetMillis()
postIds := []string{}
err := s.permanentDeleteReactions(userId, &postIds)
if err != nil {
return err
}
transaction, err := s.GetMasterX().Beginx()
if err != nil {
return err
}
defer finalizeTransactionX(transaction, &err)
for _, postId := range postIds {
_, err = transaction.Exec(UpdatePostHasReactionsOnDeleteQuery, now, postId, postId)
if err != nil {
mlog.Warn("Unable to update Post.HasReactions while removing reactions",
mlog.String("post_id", postId),
mlog.Err(err))
}
time.Sleep(10 * time.Millisecond)
}
if err = transaction.Commit(); err != nil {
return err
}
return nil
}
func (s *SqlReactionStore) DeleteOrphanedRowsByIds(r *model.RetentionIdsForDeletion) error {
txn, err := s.GetMasterX().Beginx()
if err != nil {
return err
}
defer finalizeTransactionX(txn, &err)
query := s.getQueryBuilder().
Delete("Reactions").
Where(
sq.Eq{"PostId": r.Ids},
)
_, err = txn.ExecBuilder(query)
if err != nil {
return errors.Wrapf(err, "failed to delete orphaned reactions with RetentionIdsForDeletion Id=%s", r.Id)
}
err = deleteFromRetentionIdsTx(txn, r.Id)
if err != nil {
return err
}
if err = txn.Commit(); err != nil {
return err
}
return nil
}
func (s *SqlReactionStore) PermanentDeleteBatch(endTime int64, limit int64) (int64, error) {

View File

@ -5,6 +5,7 @@ package sqlstore
import (
"database/sql"
"encoding/json"
"fmt"
"strconv"
"strings"
@ -817,6 +818,92 @@ func (s *SqlRetentionPolicyStore) GetChannelPoliciesCountForUser(userID string)
return count, nil
}
func scanRetentionIdsForDeletion(rows *sql.Rows, isPostgres bool) ([]*model.RetentionIdsForDeletion, error) {
idsForDeletion := []*model.RetentionIdsForDeletion{}
for rows.Next() {
var row model.RetentionIdsForDeletion
if isPostgres {
if err := rows.Scan(
&row.Id, &row.TableName, pq.Array(&row.Ids),
); err != nil {
return nil, errors.Wrap(err, "unable to scan columns")
}
} else {
var ids []byte
if err := rows.Scan(
&row.Id, &row.TableName, &ids,
); err != nil {
return nil, errors.Wrap(err, "unable to scan columns")
}
if err := json.Unmarshal(ids, &row.Ids); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal ids")
}
}
idsForDeletion = append(idsForDeletion, &row)
}
if err := rows.Err(); err != nil {
return nil, errors.Wrap(err, "error while iterating over rows")
}
return idsForDeletion, nil
}
func (s *SqlRetentionPolicyStore) GetIdsForDeletionByTableName(tableName string, limit int) ([]*model.RetentionIdsForDeletion, error) {
query := s.getQueryBuilder().
Select("*").
From("RetentionIdsForDeletion").
Where(
sq.Eq{"TableName": tableName},
).
Limit(uint64(limit))
queryString, args, err := query.ToSql()
if err != nil {
return nil, errors.Wrap(err, "get_ids_for_deletion_tosql")
}
rows, err := s.GetReplicaX().DB.Query(queryString, args...)
if err != nil {
return nil, errors.Wrap(err, "failed to get ids for deletion")
}
defer rows.Close()
isPostgres := s.DriverName() == model.DatabaseDriverPostgres
idsForDeletion, err := scanRetentionIdsForDeletion(rows, isPostgres)
if err != nil {
return nil, errors.Wrap(err, "failed to scan ids for deletion")
}
return idsForDeletion, nil
}
func insertRetentionIdsForDeletion(txn *sqlxTxWrapper, row *model.RetentionIdsForDeletion, s *SqlStore) error {
row.PreSave()
insertBuilder := s.getQueryBuilder().
Insert("RetentionIdsForDeletion").
Columns("Id", "TableName", "Ids")
if s.DriverName() == model.DatabaseDriverPostgres {
insertBuilder = insertBuilder.
Values(row.Id, row.TableName, pq.Array(row.Ids))
} else {
jsonIds, err := json.Marshal(row.Ids)
if err != nil {
return err
}
insertBuilder = insertBuilder.
Values(row.Id, row.TableName, jsonIds)
}
insertQuery, insertArgs, err := insertBuilder.ToSql()
if err != nil {
return err
}
if _, err = txn.Exec(insertQuery, insertArgs...); err != nil {
return err
}
return nil
}
// RetentionPolicyBatchDeletionInfo gives information on how to delete records
// under a retention policy; see `genericPermanentDeleteBatchForRetentionPolicies`.
//
@ -845,6 +932,7 @@ type RetentionPolicyBatchDeletionInfo struct {
NowMillis int64
GlobalPolicyEndTime int64
Limit int64
StoreDeletedIds bool
}
// genericPermanentDeleteBatchForRetentionPolicies is a helper function for tables
@ -958,31 +1046,115 @@ func genericRetentionPoliciesDeletion(
if err != nil {
return 0, errors.Wrap(err, r.Table+"_tosql")
}
if s.DriverName() == model.DatabaseDriverPostgres {
primaryKeysStr := "(" + strings.Join(r.PrimaryKeys, ",") + ")"
query = `
DELETE FROM ` + r.Table + ` WHERE ` + primaryKeysStr + ` IN (
` + query + `
)`
} else {
// MySQL does not support the LIMIT clause in a subquery with IN
clauses := make([]string, len(r.PrimaryKeys))
for i, key := range r.PrimaryKeys {
clauses[i] = r.Table + "." + key + " = A." + key
if r.StoreDeletedIds {
txn, err := s.GetMasterX().Beginx()
if err != nil {
return 0, err
}
defer finalizeTransactionX(txn, &err)
if s.DriverName() == model.DatabaseDriverPostgres {
primaryKeysStr := "(" + strings.Join(r.PrimaryKeys, ",") + ")"
query = fmt.Sprintf("DELETE FROM %s WHERE %s IN (%s) RETURNING %s.%s", r.Table, primaryKeysStr, query, r.Table, r.PrimaryKeys[0])
var rows *sql.Rows
rows, err = txn.Query(query, args...)
if err != nil {
return 0, errors.Wrap(err, "failed to delete "+r.Table)
}
defer rows.Close()
ids := []string{}
for rows.Next() {
var id string
if err = rows.Scan(&id); err != nil {
return 0, errors.Wrap(err, "unable to scan from rows")
}
ids = append(ids, id)
}
if err = rows.Err(); err != nil {
return 0, errors.Wrap(err, "failed while iterating over rows")
}
rowsAffected = int64(len(ids))
if len(ids) > 0 {
retentionIdsRow := model.RetentionIdsForDeletion{
TableName: r.Table,
Ids: ids,
}
err = insertRetentionIdsForDeletion(txn, &retentionIdsRow, s)
if err != nil {
return 0, err
}
}
} else {
retentionIdsRow := model.RetentionIdsForDeletion{
TableName: r.Table,
Ids: []string{},
}
// 1. Select rows that will be deleted
if err = txn.Select(&retentionIdsRow.Ids, query, args...); err != nil {
return 0, err
}
if len(retentionIdsRow.Ids) > 0 {
// 2. Insert selected ids into RetentionIdsForDeletion table
err = insertRetentionIdsForDeletion(txn, &retentionIdsRow, s)
if err != nil {
return 0, err
}
query = getDeleteQueriesForMySQL(r, query)
// 3. Delete from Parent table
var result sql.Result
result, err = txn.Exec(query, args...)
if err != nil {
return 0, errors.Wrap(err, "failed to delete "+r.Table)
}
rowsAffected, err = result.RowsAffected()
if err != nil {
return 0, errors.Wrap(err, "failed to get rows affected for "+r.Table)
}
}
}
if err = txn.Commit(); err != nil {
return 0, err
}
} else {
if s.DriverName() == model.DatabaseDriverPostgres {
primaryKeysStr := "(" + strings.Join(r.PrimaryKeys, ",") + ")"
query = fmt.Sprintf("DELETE FROM %s WHERE %s IN (%s)", r.Table, primaryKeysStr, query)
} else {
query = getDeleteQueriesForMySQL(r, query)
}
result, err := s.GetMasterX().Exec(query, args...)
if err != nil {
return 0, errors.Wrap(err, "failed to delete "+r.Table)
}
rowsAffected, err = result.RowsAffected()
if err != nil {
return 0, errors.Wrap(err, "failed to get rows affected for "+r.Table)
}
joinClause := strings.Join(clauses, " AND ")
query = `
DELETE ` + r.Table + ` FROM ` + r.Table + ` INNER JOIN (
` + query + `
) AS A ON ` + joinClause
}
result, err := s.GetMasterX().Exec(query, args...)
if err != nil {
return 0, errors.Wrap(err, "failed to delete "+r.Table)
}
rowsAffected, err = result.RowsAffected()
if err != nil {
return 0, errors.Wrap(err, "failed to get rows affected for "+r.Table)
}
return
}
func getDeleteQueriesForMySQL(r RetentionPolicyBatchDeletionInfo, query string) string {
// MySQL does not support the LIMIT clause in a subquery with IN
clauses := make([]string, len(r.PrimaryKeys))
for i, key := range r.PrimaryKeys {
clauses[i] = r.Table + "." + key + " = A." + key
}
joinClause := strings.Join(clauses, " AND ")
return fmt.Sprintf("DELETE %s FROM %s INNER JOIN (%s) AS A ON %s", r.Table, r.Table, query, joinClause)
}
func deleteFromRetentionIdsTx(txn *sqlxTxWrapper, id string) (err error) {
if _, err := txn.Exec("DELETE FROM RetentionIdsForDeletion WHERE Id = ?", id); err != nil {
return errors.Wrap(err, "Failed to delete from RetentionIdsForDeletion")
}
return nil
}

View File

@ -919,6 +919,7 @@ func (s *SqlThreadStore) PermanentDeleteBatchForRetentionPolicies(now, globalPol
NowMillis: now,
GlobalPolicyEndTime: globalPolicyEndTime,
Limit: limit,
StoreDeletedIds: false,
}, s.SqlStore, cursor)
}
@ -939,66 +940,32 @@ func (s *SqlThreadStore) PermanentDeleteBatchThreadMembershipsForRetentionPolici
NowMillis: now,
GlobalPolicyEndTime: globalPolicyEndTime,
Limit: limit,
StoreDeletedIds: false,
}, s.SqlStore, cursor)
}
// DeleteOrphanedRows removes orphaned rows from Threads and ThreadMemberships
func (s *SqlThreadStore) DeleteOrphanedRows(limit int) (deleted int64, err error) {
var threadsQuery string
// We need the extra level of nesting to deal with MySQL's locking
if s.DriverName() == model.DatabaseDriverMysql {
// MySQL fails to do a proper antijoin if the selecting column
// and the joining column are different. In that case, doing a subquery
// leads to a faster plan because MySQL materializes the sub-query
// and does a covering index scan on Threads table. More details on the PR with
// this commit.
threadsQuery = `
DELETE FROM Threads WHERE PostId IN (
SELECT * FROM (
SELECT Threads.PostId FROM Threads
WHERE Threads.ChannelId NOT IN (SELECT Id FROM Channels USE INDEX(PRIMARY))
LIMIT ?
) AS A
)`
} else {
threadsQuery = `
DELETE FROM Threads WHERE PostId IN (
SELECT * FROM (
SELECT Threads.PostId FROM Threads
LEFT JOIN Channels ON Threads.ChannelId = Channels.Id
WHERE Channels.Id IS NULL
LIMIT ?
) AS A
)`
}
// We only delete a thread membership if the entire thread no longer exists,
// not if the root post has been deleted
const threadMembershipsQuery = `
DELETE FROM ThreadMemberships WHERE PostId IN (
SELECT * FROM (
SELECT ThreadMemberships.PostId FROM ThreadMemberships
LEFT JOIN Threads ON ThreadMemberships.PostId = Threads.PostId
WHERE Threads.PostId IS NULL
LIMIT ?
) AS A
)`
result, err := s.GetMasterX().Exec(threadsQuery, limit)
DELETE FROM ThreadMemberships WHERE PostId IN (
SELECT * FROM (
SELECT ThreadMemberships.PostId FROM ThreadMemberships
LEFT JOIN Threads ON ThreadMemberships.PostId = Threads.PostId
WHERE Threads.PostId IS NULL
LIMIT ?
) AS A
)`
result, err := s.GetMasterX().Exec(threadMembershipsQuery, limit)
if err != nil {
return
}
rpcDeleted, err := result.RowsAffected()
deleted, err = result.RowsAffected()
if err != nil {
return
}
result, err = s.GetMasterX().Exec(threadMembershipsQuery, limit)
if err != nil {
return
}
rptDeleted, err := result.RowsAffected()
if err != nil {
return
}
deleted = rpcDeleted + rptDeleted
return
}

View File

@ -110,6 +110,7 @@ type RetentionPolicyStore interface {
GetTeamPoliciesCountForUser(userID string) (int64, error)
GetChannelPoliciesForUser(userID string, offset, limit int) ([]*model.RetentionPolicyForChannel, error)
GetChannelPoliciesCountForUser(userID string) (int64, error)
GetIdsForDeletionByTableName(tableName string, limit int) ([]*model.RetentionIdsForDeletion, error)
}
type TeamStore interface {
@ -377,7 +378,6 @@ type PostStore interface {
GetEditHistoryForPost(postId string) ([]*model.Post, error)
GetPostsBatchForIndexing(startTime int64, startPostID string, limit int) ([]*model.PostForIndexing, error)
PermanentDeleteBatchForRetentionPolicies(now, globalPolicyEndTime, limit int64, cursor model.RetentionPolicyCursor) (int64, model.RetentionPolicyCursor, error)
DeleteOrphanedRows(limit int) (deleted int64, err error)
PermanentDeleteBatch(endTime int64, limit int64) (int64, error)
GetOldest() (*model.Post, error)
GetMaxPostSize() int
@ -720,8 +720,9 @@ type ReactionStore interface {
GetForPostSince(postId string, since int64, excludeRemoteId string, inclDeleted bool) ([]*model.Reaction, error)
DeleteAllWithEmojiName(emojiName string) error
BulkGetForPosts(postIds []string) ([]*model.Reaction, error)
DeleteOrphanedRows(limit int) (int64, error)
DeleteOrphanedRowsByIds(r *model.RetentionIdsForDeletion) error
PermanentDeleteBatch(endTime int64, limit int64) (int64, error)
PermanentDeleteByUser(userID string) error
}
type JobStore interface {

View File

@ -389,6 +389,9 @@ func testPermanentDeleteBatchForRetentionPolicies(t *testing.T, ss store.Store)
result, err := ss.ChannelMemberHistory().GetUsersInChannelDuring(joinTime, leaveTime, channel.Id)
require.NoError(t, err)
require.Empty(t, result, "history should have been deleted by channel policy")
rows, err := ss.RetentionPolicy().GetIdsForDeletionByTableName("ChannelMemberHistory", 1000)
require.NoError(t, err)
require.Equal(t, 0, len(rows))
}
func testGetChannelsLeftSince(t *testing.T, ss store.Store) {

View File

@ -113,30 +113,6 @@ func (_m *PostStore) Delete(postID string, timestamp int64, deleteByID string) e
return r0
}
// DeleteOrphanedRows provides a mock function with given fields: limit
func (_m *PostStore) DeleteOrphanedRows(limit int) (int64, error) {
ret := _m.Called(limit)
var r0 int64
var r1 error
if rf, ok := ret.Get(0).(func(int) (int64, error)); ok {
return rf(limit)
}
if rf, ok := ret.Get(0).(func(int) int64); ok {
r0 = rf(limit)
} else {
r0 = ret.Get(0).(int64)
}
if rf, ok := ret.Get(1).(func(int) error); ok {
r1 = rf(limit)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Get provides a mock function with given fields: ctx, id, opts, userID, sanitizeOptions
func (_m *PostStore) Get(ctx context.Context, id string, opts model.GetPostsOptions, userID string, sanitizeOptions map[string]bool) (*model.PostList, error) {
ret := _m.Called(ctx, id, opts, userID, sanitizeOptions)

View File

@ -80,28 +80,18 @@ func (_m *ReactionStore) DeleteAllWithEmojiName(emojiName string) error {
return r0
}
// DeleteOrphanedRows provides a mock function with given fields: limit
func (_m *ReactionStore) DeleteOrphanedRows(limit int) (int64, error) {
ret := _m.Called(limit)
// DeleteOrphanedRowsByIds provides a mock function with given fields: r
func (_m *ReactionStore) DeleteOrphanedRowsByIds(r *model.RetentionIdsForDeletion) error {
ret := _m.Called(r)
var r0 int64
var r1 error
if rf, ok := ret.Get(0).(func(int) (int64, error)); ok {
return rf(limit)
}
if rf, ok := ret.Get(0).(func(int) int64); ok {
r0 = rf(limit)
var r0 error
if rf, ok := ret.Get(0).(func(*model.RetentionIdsForDeletion) error); ok {
r0 = rf(r)
} else {
r0 = ret.Get(0).(int64)
r0 = ret.Error(0)
}
if rf, ok := ret.Get(1).(func(int) error); ok {
r1 = rf(limit)
} else {
r1 = ret.Error(1)
}
return r0, r1
return r0
}
// GetForPost provides a mock function with given fields: postID, allowFromCache
@ -180,6 +170,20 @@ func (_m *ReactionStore) PermanentDeleteBatch(endTime int64, limit int64) (int64
return r0, r1
}
// PermanentDeleteByUser provides a mock function with given fields: userID
func (_m *ReactionStore) PermanentDeleteByUser(userID string) error {
ret := _m.Called(userID)
var r0 error
if rf, ok := ret.Get(0).(func(string) error); ok {
r0 = rf(userID)
} else {
r0 = ret.Error(0)
}
return r0
}
// Save provides a mock function with given fields: reaction
func (_m *ReactionStore) Save(reaction *model.Reaction) (*model.Reaction, error) {
ret := _m.Called(reaction)

View File

@ -256,6 +256,32 @@ func (_m *RetentionPolicyStore) GetCount() (int64, error) {
return r0, r1
}
// GetIdsForDeletionByTableName provides a mock function with given fields: tableName, limit
func (_m *RetentionPolicyStore) GetIdsForDeletionByTableName(tableName string, limit int) ([]*model.RetentionIdsForDeletion, error) {
ret := _m.Called(tableName, limit)
var r0 []*model.RetentionIdsForDeletion
var r1 error
if rf, ok := ret.Get(0).(func(string, int) ([]*model.RetentionIdsForDeletion, error)); ok {
return rf(tableName, limit)
}
if rf, ok := ret.Get(0).(func(string, int) []*model.RetentionIdsForDeletion); ok {
r0 = rf(tableName, limit)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*model.RetentionIdsForDeletion)
}
}
if rf, ok := ret.Get(1).(func(string, int) error); ok {
r1 = rf(tableName, limit)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GetTeamPoliciesCountForUser provides a mock function with given fields: userID
func (_m *RetentionPolicyStore) GetTeamPoliciesCountForUser(userID string) (int64, error) {
ret := _m.Called(userID)

View File

@ -1411,6 +1411,30 @@ func testPostStorePermDelete1Level(t *testing.T, ss store.Store) {
o2, err = ss.Post().Save(o2)
require.NoError(t, err)
r1 := &model.Reaction{}
r1.ChannelId = o1.ChannelId
r1.UserId = o2.UserId
r1.PostId = o1.Id
r1.EmojiName = "smile"
r1, err = ss.Reaction().Save(r1)
require.NoError(t, err)
r2 := &model.Reaction{}
r2.ChannelId = o1.ChannelId
r2.UserId = o1.UserId
r2.PostId = o2.Id
r2.EmojiName = "wave"
_, err = ss.Reaction().Save(r2)
require.NoError(t, err)
r3 := &model.Reaction{}
r3.ChannelId = o1.ChannelId
r3.UserId = model.NewId()
r3.PostId = o1.Id
r3.EmojiName = "sad"
r3, err = ss.Reaction().Save(r3)
require.NoError(t, err)
channel2, err := ss.Channel().Save(&model.Channel{
TeamId: teamId,
DisplayName: "DisplayName2",
@ -1425,6 +1449,14 @@ func testPostStorePermDelete1Level(t *testing.T, ss store.Store) {
o3, err = ss.Post().Save(o3)
require.NoError(t, err)
r4 := &model.Reaction{}
r4.ChannelId = channel2.Id
r4.UserId = model.NewId()
r4.PostId = o3.Id
r4.EmojiName = "angry"
_, err = ss.Reaction().Save(r4)
require.NoError(t, err)
channel3, err := ss.Channel().Save(&model.Channel{
TeamId: teamId,
DisplayName: "DisplayName3",
@ -1475,9 +1507,21 @@ func testPostStorePermDelete1Level(t *testing.T, ss store.Store) {
_, err = ss.Post().Get(context.Background(), o1.Id, model.GetPostsOptions{}, "", map[string]bool{})
require.NoError(t, err, "Deleted id shouldn't have failed")
reactions, err := ss.Reaction().GetForPost(o1.Id, false)
require.NoError(t, err, "Reactions should exist")
require.Equal(t, 2, len(reactions))
emojis := []string{r1.EmojiName, r3.EmojiName}
for _, reaction := range reactions {
require.Contains(t, emojis, reaction.EmojiName)
}
_, err = ss.Post().Get(context.Background(), o2.Id, model.GetPostsOptions{}, "", map[string]bool{})
require.Error(t, err, "Deleted id should have failed")
reactions, err = ss.Reaction().GetForPost(o2.Id, false)
require.NoError(t, err, "No error for not found")
require.Equal(t, 0, len(reactions))
thread, err = ss.Thread().Get(o5.Id)
require.NoError(t, err)
require.NotEmpty(t, thread)
@ -1489,6 +1533,17 @@ func testPostStorePermDelete1Level(t *testing.T, ss store.Store) {
require.NoError(t, err)
require.Nil(t, thread)
reactions, err = ss.Reaction().GetForPost(o3.Id, false)
require.NoError(t, err, "No error for not found")
require.Equal(t, 0, len(reactions))
reactions, err = ss.Reaction().GetForPost(o1.Id, false)
require.NoError(t, err, "Reactions should exist")
require.Equal(t, 2, len(reactions))
for _, reaction := range reactions {
require.Contains(t, emojis, reaction.EmojiName)
}
_, err = ss.Post().Get(context.Background(), o3.Id, model.GetPostsOptions{}, "", map[string]bool{})
require.Error(t, err, "Deleted id should have failed")
@ -3926,8 +3981,9 @@ func testPostStorePermanentDeleteBatch(t *testing.T, ss store.Store) {
o3, err = ss.Post().Save(o3)
require.NoError(t, err)
_, _, err = ss.Post().PermanentDeleteBatchForRetentionPolicies(0, 2000, 1000, model.RetentionPolicyCursor{})
deleted, _, err := ss.Post().PermanentDeleteBatchForRetentionPolicies(0, 2000, 1000, model.RetentionPolicyCursor{})
require.NoError(t, err)
require.Equal(t, int64(2), deleted)
_, err = ss.Post().Get(context.Background(), o1.Id, model.GetPostsOptions{}, "", map[string]bool{})
require.Error(t, err, "Should have not found post 1 after purge")
@ -3938,6 +3994,14 @@ func testPostStorePermanentDeleteBatch(t *testing.T, ss store.Store) {
_, err = ss.Post().Get(context.Background(), o3.Id, model.GetPostsOptions{}, "", map[string]bool{})
require.NoError(t, err, "Should have found post 3 after purge")
rows, err := ss.RetentionPolicy().GetIdsForDeletionByTableName("Posts", 1000)
require.NoError(t, err)
require.Equal(t, 1, len(rows))
require.Equal(t, 2, len(rows[0].Ids))
// Clean up retention ids table
err = ss.Reaction().DeleteOrphanedRowsByIds(rows[0])
require.NoError(t, err)
t.Run("with pagination", func(t *testing.T) {
for i := 0; i < 3; i++ {
_, err = ss.Post().Save(&model.Post{
@ -3950,13 +4014,31 @@ func testPostStorePermanentDeleteBatch(t *testing.T, ss store.Store) {
}
cursor := model.RetentionPolicyCursor{}
deleted, cursor, err := ss.Post().PermanentDeleteBatchForRetentionPolicies(0, 2, 2, cursor)
deleted, cursor, err = ss.Post().PermanentDeleteBatchForRetentionPolicies(0, 2, 2, cursor)
require.NoError(t, err)
require.Equal(t, int64(2), deleted)
rows, err = ss.RetentionPolicy().GetIdsForDeletionByTableName("Posts", 1000)
require.NoError(t, err)
require.Equal(t, 1, len(rows))
require.Equal(t, 2, len(rows[0].Ids))
// Clean up retention ids table
err = ss.Reaction().DeleteOrphanedRowsByIds(rows[0])
require.NoError(t, err)
deleted, _, err = ss.Post().PermanentDeleteBatchForRetentionPolicies(0, 2, 2, cursor)
require.NoError(t, err)
require.Equal(t, int64(1), deleted)
rows, err = ss.RetentionPolicy().GetIdsForDeletionByTableName("Posts", 1000)
require.NoError(t, err)
require.Equal(t, 1, len(rows))
require.Equal(t, 1, len(rows[0].Ids))
// Clean up retention ids table
err = ss.Reaction().DeleteOrphanedRowsByIds(rows[0])
require.NoError(t, err)
})
t.Run("with data retention policies", func(t *testing.T) {
@ -4024,6 +4106,14 @@ func testPostStorePermanentDeleteBatch(t *testing.T, ss store.Store) {
err2 = ss.RetentionPolicy().Delete(teamPolicy.ID)
require.NoError(t, err2)
// Clean up retention ids table
rows, err = ss.RetentionPolicy().GetIdsForDeletionByTableName("Posts", 1000)
require.NoError(t, err)
for _, row := range rows {
err = ss.Reaction().DeleteOrphanedRowsByIds(row)
require.NoError(t, err)
}
})
t.Run("with channel, team and global policies", func(t *testing.T) {
@ -4089,6 +4179,17 @@ func testPostStorePermanentDeleteBatch(t *testing.T, ss store.Store) {
deleted, _, err2 := ss.Post().PermanentDeleteBatchForRetentionPolicies(nowMillis, 2, 1000, model.RetentionPolicyCursor{})
require.NoError(t, err2)
require.Equal(t, int64(3), deleted)
rows, err = ss.RetentionPolicy().GetIdsForDeletionByTableName("Posts", 1000)
require.NoError(t, err)
// Each policy would generate it's own row
require.Equal(t, 3, len(rows))
// Clean up retention ids table
for _, row := range rows {
err = ss.Reaction().DeleteOrphanedRowsByIds(row)
require.NoError(t, err)
}
})
}

View File

@ -384,6 +384,14 @@ func testPreferenceDeleteOrphanedRows(t *testing.T, ss store.Store) {
_, _, nErr = ss.Post().PermanentDeleteBatchForRetentionPolicies(0, 2000, limit, model.RetentionPolicyCursor{})
assert.NoError(t, nErr)
rows, err := ss.RetentionPolicy().GetIdsForDeletionByTableName("Posts", 1000)
require.NoError(t, err)
require.Equal(t, 1, len(rows))
// Clean up retention ids table
err = ss.Reaction().DeleteOrphanedRowsByIds(rows[0])
require.NoError(t, err)
_, nErr = ss.Preference().DeleteOrphanedRows(limit)
assert.NoError(t, nErr)

View File

@ -24,6 +24,7 @@ func TestReactionStore(t *testing.T, ss store.Store, s SqlStore) {
t.Run("ReactionGetForPost", func(t *testing.T) { testReactionGetForPost(t, ss) })
t.Run("ReactionGetForPostSince", func(t *testing.T) { testReactionGetForPostSince(t, ss, s) })
t.Run("ReactionDeleteAllWithEmojiName", func(t *testing.T) { testReactionDeleteAllWithEmojiName(t, ss, s) })
t.Run("PermanentDeleteByUser", func(t *testing.T) { testPermanentDeleteByUser(t, ss) })
t.Run("PermanentDeleteBatch", func(t *testing.T) { testReactionStorePermanentDeleteBatch(t, ss) })
t.Run("ReactionBulkGetForPosts", func(t *testing.T) { testReactionBulkGetForPosts(t, ss) })
t.Run("ReactionDeadlock", func(t *testing.T) { testReactionDeadlock(t, ss) })
@ -563,6 +564,91 @@ func testReactionDeleteAllWithEmojiName(t *testing.T, ss store.Store, s SqlStore
}
func testPermanentDeleteByUser(t *testing.T, ss store.Store) {
userId := model.NewId()
post, err1 := ss.Post().Save(&model.Post{
ChannelId: model.NewId(),
UserId: model.NewId(),
})
require.NoError(t, err1)
post2, err2 := ss.Post().Save(&model.Post{
ChannelId: model.NewId(),
UserId: model.NewId(),
})
require.NoError(t, err2)
post3, err3 := ss.Post().Save(&model.Post{
ChannelId: model.NewId(),
UserId: model.NewId(),
})
require.NoError(t, err3)
reactions := []*model.Reaction{
{
UserId: userId,
PostId: post.Id,
EmojiName: "happy",
},
{
UserId: model.NewId(),
PostId: post.Id,
EmojiName: "smile",
},
{
UserId: model.NewId(),
PostId: post.Id,
EmojiName: "sad",
},
{
UserId: userId,
PostId: post2.Id,
EmojiName: "angry",
},
{
UserId: userId,
PostId: post3.Id,
EmojiName: "joy",
},
}
for _, reaction := range reactions {
_, err := ss.Reaction().Save(reaction)
require.NoError(t, err)
}
err := ss.Reaction().PermanentDeleteByUser(userId)
require.NoError(t, err)
// check that the reactions were deleted
returned, err := ss.Reaction().GetForPost(post.Id, false)
require.NoError(t, err)
require.Len(t, returned, 2, "should only have removed reaction for user")
for _, reaction := range returned {
assert.NotEqual(t, reaction.EmojiName, "happy", "should've removed reaction with emoji name")
}
returned, err = ss.Reaction().GetForPost(post2.Id, false)
require.NoError(t, err)
require.Len(t, returned, 0, "should have removed reaction for user")
returned, err = ss.Reaction().GetForPost(post3.Id, false)
require.NoError(t, err)
require.Len(t, returned, 0, "should remove reaction for user")
// check that the posts are updated
postList, err := ss.Post().Get(context.Background(), post.Id, model.GetPostsOptions{}, "", map[string]bool{})
require.NoError(t, err)
assert.True(t, postList.Posts[post.Id].HasReactions, "post should still have reactions")
postList, err = ss.Post().Get(context.Background(), post2.Id, model.GetPostsOptions{}, "", map[string]bool{})
require.NoError(t, err)
assert.False(t, postList.Posts[post2.Id].HasReactions, "post shouldn't have reactions any more")
postList, err = ss.Post().Get(context.Background(), post3.Id, model.GetPostsOptions{}, "", map[string]bool{})
require.NoError(t, err)
assert.False(t, postList.Posts[post3.Id].HasReactions, "post shouldn't have reactions any more")
}
func testReactionStorePermanentDeleteBatch(t *testing.T, ss store.Store) {
const limit = 1000
team, err := ss.Team().Save(&model.Team{
@ -620,8 +706,20 @@ func testReactionStorePermanentDeleteBatch(t *testing.T, ss store.Store) {
_, _, err = ss.Post().PermanentDeleteBatchForRetentionPolicies(0, 2000, limit, model.RetentionPolicyCursor{})
require.NoError(t, err)
_, err = ss.Reaction().DeleteOrphanedRows(limit)
rows, err := ss.RetentionPolicy().GetIdsForDeletionByTableName("Posts", 1000)
require.NoError(t, err)
require.Equal(t, 1, len(rows))
require.Equal(t, 1, len(rows[0].Ids))
require.Contains(t, rows[0].Ids, olderPost.Id)
for _, row := range rows {
err = ss.Reaction().DeleteOrphanedRowsByIds(row)
require.NoError(t, err)
}
rows, err = ss.RetentionPolicy().GetIdsForDeletionByTableName("Posts", 1000)
require.NoError(t, err)
require.Equal(t, 0, len(rows))
returned, err := ss.Reaction().GetForPost(olderPost.Id, false)
require.NoError(t, err)

View File

@ -5439,22 +5439,6 @@ func (s *TimerLayerPostStore) Delete(postID string, timestamp int64, deleteByID
return err
}
func (s *TimerLayerPostStore) DeleteOrphanedRows(limit int) (int64, error) {
start := time.Now()
result, err := s.PostStore.DeleteOrphanedRows(limit)
elapsed := float64(time.Since(start)) / float64(time.Second)
if s.Root.Metrics != nil {
success := "false"
if err == nil {
success = "true"
}
s.Root.Metrics.ObserveStoreMethodDuration("PostStore.DeleteOrphanedRows", success, elapsed)
}
return result, err
}
func (s *TimerLayerPostStore) Get(ctx context.Context, id string, opts model.GetPostsOptions, userID string, sanitizeOptions map[string]bool) (*model.PostList, error) {
start := time.Now()
@ -6638,10 +6622,10 @@ func (s *TimerLayerReactionStore) DeleteAllWithEmojiName(emojiName string) error
return err
}
func (s *TimerLayerReactionStore) DeleteOrphanedRows(limit int) (int64, error) {
func (s *TimerLayerReactionStore) DeleteOrphanedRowsByIds(r *model.RetentionIdsForDeletion) error {
start := time.Now()
result, err := s.ReactionStore.DeleteOrphanedRows(limit)
err := s.ReactionStore.DeleteOrphanedRowsByIds(r)
elapsed := float64(time.Since(start)) / float64(time.Second)
if s.Root.Metrics != nil {
@ -6649,9 +6633,9 @@ func (s *TimerLayerReactionStore) DeleteOrphanedRows(limit int) (int64, error) {
if err == nil {
success = "true"
}
s.Root.Metrics.ObserveStoreMethodDuration("ReactionStore.DeleteOrphanedRows", success, elapsed)
s.Root.Metrics.ObserveStoreMethodDuration("ReactionStore.DeleteOrphanedRowsByIds", success, elapsed)
}
return result, err
return err
}
func (s *TimerLayerReactionStore) GetForPost(postID string, allowFromCache bool) ([]*model.Reaction, error) {
@ -6702,6 +6686,22 @@ func (s *TimerLayerReactionStore) PermanentDeleteBatch(endTime int64, limit int6
return result, err
}
func (s *TimerLayerReactionStore) PermanentDeleteByUser(userID string) error {
start := time.Now()
err := s.ReactionStore.PermanentDeleteByUser(userID)
elapsed := float64(time.Since(start)) / float64(time.Second)
if s.Root.Metrics != nil {
success := "false"
if err == nil {
success = "true"
}
s.Root.Metrics.ObserveStoreMethodDuration("ReactionStore.PermanentDeleteByUser", success, elapsed)
}
return err
}
func (s *TimerLayerReactionStore) Save(reaction *model.Reaction) (*model.Reaction, error) {
start := time.Now()
@ -7006,6 +7006,22 @@ func (s *TimerLayerRetentionPolicyStore) GetCount() (int64, error) {
return result, err
}
func (s *TimerLayerRetentionPolicyStore) GetIdsForDeletionByTableName(tableName string, limit int) ([]*model.RetentionIdsForDeletion, error) {
start := time.Now()
result, err := s.RetentionPolicyStore.GetIdsForDeletionByTableName(tableName, limit)
elapsed := float64(time.Since(start)) / float64(time.Second)
if s.Root.Metrics != nil {
success := "false"
if err == nil {
success = "true"
}
s.Root.Metrics.ObserveStoreMethodDuration("RetentionPolicyStore.GetIdsForDeletionByTableName", success, elapsed)
}
return result, err
}
func (s *TimerLayerRetentionPolicyStore) GetTeamPoliciesCountForUser(userID string) (int64, error) {
start := time.Now()

View File

@ -6407,6 +6407,10 @@
"id": "app.reaction.get_for_post.app_error",
"translation": "Unable to get reactions for post."
},
{
"id": "app.reaction.permanent_delete_by_user.app_error",
"translation": "Unable to delete reactions for user."
},
{
"id": "app.reaction.save.save.app_error",
"translation": "Unable to save reaction."

View File

@ -832,6 +832,7 @@ func (ts *TelemetryService) trackConfig() {
"deletion_job_start_time": *cfg.DataRetentionSettings.DeletionJobStartTime,
"batch_size": *cfg.DataRetentionSettings.BatchSize,
"time_between_batches": *cfg.DataRetentionSettings.TimeBetweenBatchesMilliseconds,
"retention_ids_batch_size": *cfg.DataRetentionSettings.RetentionIdsBatchSize,
"cleanup_jobs_threshold_days": *cfg.JobSettings.CleanupJobsThresholdDays,
"cleanup_config_threshold_days": *cfg.JobSettings.CleanupConfigThresholdDays,
})

View File

@ -210,6 +210,7 @@ const (
DataRetentionSettingsDefaultDeletionJobStartTime = "02:00"
DataRetentionSettingsDefaultBatchSize = 3000
DataRetentionSettingsDefaultTimeBetweenBatchesMilliseconds = 100
DataRetentionSettingsDefaultRetentionIdsBatchSize = 100
PluginSettingsDefaultDirectory = "./plugins"
PluginSettingsDefaultClientDirectory = "./client/plugins"
@ -2868,6 +2869,7 @@ type DataRetentionSettings struct {
DeletionJobStartTime *string `access:"compliance_data_retention_policy"`
BatchSize *int `access:"compliance_data_retention_policy"`
TimeBetweenBatchesMilliseconds *int `access:"compliance_data_retention_policy"`
RetentionIdsBatchSize *int `access:"compliance_data_retention_policy"`
}
func (s *DataRetentionSettings) SetDefaults() {
@ -2906,6 +2908,9 @@ func (s *DataRetentionSettings) SetDefaults() {
if s.TimeBetweenBatchesMilliseconds == nil {
s.TimeBetweenBatchesMilliseconds = NewInt(DataRetentionSettingsDefaultTimeBetweenBatchesMilliseconds)
}
if s.RetentionIdsBatchSize == nil {
s.RetentionIdsBatchSize = NewInt(DataRetentionSettingsDefaultRetentionIdsBatchSize)
}
}
type JobSettings struct {

View File

@ -84,3 +84,15 @@ type RetentionPolicyCursor struct {
TeamPoliciesDone bool
GlobalPoliciesDone bool
}
type RetentionIdsForDeletion struct {
Id string
TableName string
Ids []string
}
func (r *RetentionIdsForDeletion) PreSave() {
if r.Id == "" {
r.Id = NewId()
}
}

View File

@ -45,8 +45,6 @@ type FeatureFlags struct {
EnableExportDirectDownload bool
DataRetentionConcurrencyEnabled bool
StreamlinedMarketplace bool
}
@ -63,7 +61,6 @@ func (f *FeatureFlags) SetDefaults() {
f.OnboardingTourTips = true
f.CloudReverseTrial = false
f.EnableExportDirectDownload = false
f.DataRetentionConcurrencyEnabled = true
f.StreamlinedMarketplace = true
}