From 89492a6a464d6da19e41a157d862b0cd4f1ddf0d Mon Sep 17 00:00:00 2001 From: Devin Binnie <52460000+devinbinnie@users.noreply.github.com> Date: Thu, 12 Oct 2023 10:52:10 -0400 Subject: [PATCH] [MM-53428] Delete empty drafts on upsert (#24046) * [MM-53428] Delete empty drafts on upsert * Add migrations to fix existing drafts * Fix CI * Delete empty drafts entirely from the DB * Fix lint * Implement batch migration for deleting drafts * Missing store layers * Add updated mock * Remove unnecessary test * PR feedback * Add check for cluster migration * Fix MySQL * Don't check for len<2 * Bit of PR feedback * Use query builder for parameters * PR feedback * More PR feedback * Merge'd * unit test GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration * simplified builder interface * fix DeleteEmptyDraftsByCreateAtAndUserId for MySQL * rework as batch migration worker * fix typo * log ip address on version mismatches too * simplify reset semantics * remove trace log in favour of low spam * document parameters for clarity --------- Co-authored-by: Mattermost Build Co-authored-by: Jesse Hallam --- server/channels/app/draft.go | 9 + server/channels/app/migrations.go | 22 ++ server/channels/app/server.go | 6 + .../channels/jobs/batch_migration_worker.go | 236 +++++++++++++ .../jobs/batch_migration_worker_test.go | 310 ++++++++++++++++++ .../delete_empty_drafts_migration.go | 85 +++++ .../delete_empty_drafts_migration_test.go | 180 ++++++++++ server/channels/jobs/helper_test.go | 223 +++++++++++++ server/channels/jobs/main_test.go | 24 ++ .../opentracinglayer/opentracinglayer.go | 36 ++ .../channels/store/retrylayer/retrylayer.go | 42 +++ server/channels/store/sqlstore/draft_store.go | 92 +++++- server/channels/store/store.go | 2 + .../channels/store/storetest/draft_store.go | 244 +++++++++++++- .../store/storetest/mocks/DraftStore.go | 45 +++ .../channels/store/timerlayer/timerlayer.go | 32 ++ server/channels/testlib/store.go | 1 + server/go.mod | 2 +- server/go.sum | 4 +- server/public/model/draft.go | 2 +- server/public/model/job.go | 1 + server/public/model/migration.go | 1 + 22 files changed, 1573 insertions(+), 26 deletions(-) create mode 100644 server/channels/jobs/batch_migration_worker.go create mode 100644 server/channels/jobs/batch_migration_worker_test.go create mode 100644 server/channels/jobs/delete_empty_drafts_migration/delete_empty_drafts_migration.go create mode 100644 server/channels/jobs/delete_empty_drafts_migration/delete_empty_drafts_migration_test.go create mode 100644 server/channels/jobs/helper_test.go create mode 100644 server/channels/jobs/main_test.go diff --git a/server/channels/app/draft.go b/server/channels/app/draft.go index b50c910904..30c1fe843e 100644 --- a/server/channels/app/draft.go +++ b/server/channels/app/draft.go @@ -56,6 +56,15 @@ func (a *App) UpsertDraft(c *request.Context, draft *model.Draft, connectionID s return nil, model.NewAppError("CreateDraft", "app.user.get.app_error", nil, nErr.Error(), http.StatusInternalServerError) } + // If the draft is empty, just delete it + if draft.Message == "" { + deleteErr := a.Srv().Store().Draft().Delete(draft.UserId, draft.ChannelId, draft.RootId) + if deleteErr != nil { + return nil, model.NewAppError("CreateDraft", "app.draft.save.app_error", nil, deleteErr.Error(), http.StatusInternalServerError) + } + return nil, nil + } + dt, nErr := a.Srv().Store().Draft().Upsert(draft) if nErr != nil { return nil, model.NewAppError("CreateDraft", "app.draft.save.app_error", nil, nErr.Error(), http.StatusInternalServerError) diff --git a/server/channels/app/migrations.go b/server/channels/app/migrations.go index 40605f85b9..53612d35ea 100644 --- a/server/channels/app/migrations.go +++ b/server/channels/app/migrations.go @@ -600,6 +600,27 @@ func (s *Server) doCloudS3PathMigrations(c *request.Context) { } } +func (s *Server) doDeleteEmptyDraftsMigration(c *request.Context) { + // If the migration is already marked as completed, don't do it again. + if _, err := s.Store().System().GetByName(model.MigrationKeyDeleteEmptyDrafts); err == nil { + return + } + + jobs, err := s.Store().Job().GetAllByTypeAndStatus(c, model.JobTypeDeleteEmptyDraftsMigration, model.JobStatusPending) + if err != nil { + mlog.Fatal("failed to get jobs by type and status", mlog.Err(err)) + return + } + if len(jobs) > 0 { + return + } + + if _, appErr := s.Jobs.CreateJobOnce(c, model.JobTypeDeleteEmptyDraftsMigration, nil); appErr != nil { + mlog.Fatal("failed to start job for deleting empty drafts", mlog.Err(appErr)) + return + } +} + func (a *App) DoAppMigrations() { a.Srv().doAppMigrations() } @@ -625,4 +646,5 @@ func (s *Server) doAppMigrations() { s.doPostPriorityConfigDefaultTrueMigration() s.doElasticsearchFixChannelIndex(c) s.doCloudS3PathMigrations(c) + s.doDeleteEmptyDraftsMigration(c) } diff --git a/server/channels/app/server.go b/server/channels/app/server.go index 3aebc7b7f5..6d4b43e07c 100644 --- a/server/channels/app/server.go +++ b/server/channels/app/server.go @@ -40,6 +40,7 @@ import ( "github.com/mattermost/mattermost/server/v8/channels/jobs" "github.com/mattermost/mattermost/server/v8/channels/jobs/active_users" "github.com/mattermost/mattermost/server/v8/channels/jobs/cleanup_desktop_tokens" + "github.com/mattermost/mattermost/server/v8/channels/jobs/delete_empty_drafts_migration" "github.com/mattermost/mattermost/server/v8/channels/jobs/expirynotify" "github.com/mattermost/mattermost/server/v8/channels/jobs/export_delete" "github.com/mattermost/mattermost/server/v8/channels/jobs/export_process" @@ -1573,6 +1574,11 @@ func (s *Server) initJobs() { s3_path_migration.MakeWorker(s.Jobs, s.Store(), s.FileBackend()), nil) + s.Jobs.RegisterJobType( + model.JobTypeDeleteEmptyDraftsMigration, + delete_empty_drafts_migration.MakeWorker(s.Jobs, s.Store(), New(ServerConnector(s.Channels()))), + nil) + s.Jobs.RegisterJobType( model.JobTypeExportDelete, export_delete.MakeWorker(s.Jobs, New(ServerConnector(s.Channels()))), diff --git a/server/channels/jobs/batch_migration_worker.go b/server/channels/jobs/batch_migration_worker.go new file mode 100644 index 0000000000..464170fa8e --- /dev/null +++ b/server/channels/jobs/batch_migration_worker.go @@ -0,0 +1,236 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package jobs + +import ( + "net/http" + "time" + + "github.com/mattermost/mattermost/server/public/model" + "github.com/mattermost/mattermost/server/public/shared/mlog" + "github.com/mattermost/mattermost/server/public/shared/request" + "github.com/mattermost/mattermost/server/v8/channels/store" +) + +type BatchMigrationWorkerAppIFace interface { + GetClusterStatus() []*model.ClusterInfo +} + +// BatchMigrationWorker processes database migration jobs in batches to help avoid table locks. +// +// It uses the jobs infrastructure to ensure only one node in the cluster runs the migration at +// any given time, avoids running the migration until the cluster is uniform, and automatically +// resets the migration if the cluster version diverges after starting. +// +// In principle, the job infrastructure is overkill for this kind of work, as there's a worker +// created per migration. There's also complication with edge cases, like having to restart the +// server in order to retry a failed migration job. Refactoring the job infrastructure is left as +// a future exercise. +type BatchMigrationWorker struct { + jobServer *JobServer + logger mlog.LoggerIFace + store store.Store + app BatchMigrationWorkerAppIFace + + stop chan bool + stopped chan bool + jobs chan model.Job + + migrationKey string + timeBetweenBatches time.Duration + doMigrationBatch func(data model.StringMap, store store.Store) (model.StringMap, bool, error) +} + +// MakeBatchMigrationWorker creates a worker to process the given migration batch function. +func MakeBatchMigrationWorker(jobServer *JobServer, store store.Store, app BatchMigrationWorkerAppIFace, migrationKey string, timeBetweenBatches time.Duration, doMigrationBatch func(data model.StringMap, store store.Store) (model.StringMap, bool, error)) model.Worker { + worker := &BatchMigrationWorker{ + jobServer: jobServer, + logger: jobServer.Logger().With(mlog.String("worker_name", migrationKey)), + store: store, + app: app, + stop: make(chan bool, 1), + stopped: make(chan bool, 1), + jobs: make(chan model.Job), + migrationKey: migrationKey, + timeBetweenBatches: timeBetweenBatches, + doMigrationBatch: doMigrationBatch, + } + return worker +} + +// Run starts the worker dedicated to the unique migration batch job it will be given to process. +func (worker *BatchMigrationWorker) Run() { + worker.logger.Debug("Worker started") + // We have to re-assign the stop channel again, because + // it might happen that the job was restarted due to a config change. + worker.stop = make(chan bool, 1) + + defer func() { + worker.logger.Debug("Worker finished") + worker.stopped <- true + }() + + for { + select { + case <-worker.stop: + worker.logger.Debug("Worker received stop signal") + return + case job := <-worker.jobs: + worker.DoJob(&job) + } + } +} + +// Stop interrupts the worker even if the migration has not yet completed. +func (worker *BatchMigrationWorker) Stop() { + worker.logger.Debug("Worker stopping") + close(worker.stop) + <-worker.stopped +} + +// JobChannel is the means by which the jobs infrastructure provides the worker the job to execute. +func (worker *BatchMigrationWorker) JobChannel() chan<- model.Job { + return worker.jobs +} + +// IsEnabled is always true for batch migrations. +func (worker *BatchMigrationWorker) IsEnabled(_ *model.Config) bool { + return true +} + +// checkIsClusterInSync returns true if all nodes in the cluster are running the same version, +// logging a warning on the first mismatch found. +func (worker *BatchMigrationWorker) checkIsClusterInSync() bool { + clusterStatus := worker.app.GetClusterStatus() + for i := 1; i < len(clusterStatus); i++ { + if clusterStatus[i].SchemaVersion != clusterStatus[0].SchemaVersion { + worker.logger.Warn( + "Worker: cluster not in sync", + mlog.String("schema_version_a", clusterStatus[0].SchemaVersion), + mlog.String("schema_version_b", clusterStatus[1].SchemaVersion), + mlog.String("server_ip_a", clusterStatus[0].IPAddress), + mlog.String("server_ip_b", clusterStatus[1].IPAddress), + ) + return false + } + } + + return true +} + +// DoJob executes the job picked up through the job channel. +// +// Note that this is a lot of distracting machinery here to claim the job, then double check the +// status, and keep the status up to date in line with job infrastrcuture semantics. Unless an +// error occurs, this worker should hold onto the job until its completed. +func (worker *BatchMigrationWorker) DoJob(job *model.Job) { + logger := worker.logger.With(mlog.Any("job", job)) + logger.Debug("Worker received a new candidate job.") + defer worker.jobServer.HandleJobPanic(logger, job) + + if claimed, err := worker.jobServer.ClaimJob(job); err != nil { + logger.Warn("Worker experienced an error while trying to claim job", mlog.Err(err)) + return + } else if !claimed { + return + } + + c := request.EmptyContext(logger) + var appErr *model.AppError + + // We get the job again because ClaimJob changes the job status. + job, appErr = worker.jobServer.GetJob(c, job.Id) + if appErr != nil { + worker.logger.Error("Worker: job execution error", mlog.Err(appErr)) + worker.setJobError(logger, job, appErr) + return + } + + if job.Data == nil { + job.Data = make(model.StringMap) + } + + for { + select { + case <-worker.stop: + logger.Info("Worker: Migration has been canceled via Worker Stop. Setting the job back to pending.") + if err := worker.jobServer.SetJobPending(job); err != nil { + worker.logger.Error("Worker: Failed to mark job as pending", mlog.Err(err)) + } + return + case <-time.After(worker.timeBetweenBatches): + // Ensure the cluster remains in sync, otherwise we restart the job to + // ensure a complete migration. Technically, the cluster could go out of + // sync briefly within a batch, but we accept that risk. + if !worker.checkIsClusterInSync() { + worker.logger.Warn("Worker: Resetting job") + worker.resetJob(logger, job) + return + } + + nextData, done, err := worker.doMigrationBatch(job.Data, worker.store) + if err != nil { + worker.logger.Error("Worker: Failed to do migration batch. Exiting", mlog.Err(err)) + worker.setJobError(logger, job, model.NewAppError("doMigrationBatch", model.NoTranslation, nil, "", http.StatusInternalServerError).Wrap(err)) + return + } else if done { + logger.Info("Worker: Job is complete") + worker.setJobSuccess(logger, job) + worker.markAsComplete() + return + } + + job.Data = nextData + + // Migrations currently don't support reporting meaningful progress. + worker.jobServer.SetJobProgress(job, 0) + } + } +} + +// resetJob erases the data tracking the next batch to execute and returns the job status to +// pending to allow the job infrastructure to requeue it. +func (worker *BatchMigrationWorker) resetJob(logger mlog.LoggerIFace, job *model.Job) { + job.Data = nil + job.Progress = 0 + job.Status = model.JobStatusPending + + if _, err := worker.store.Job().UpdateOptimistically(job, model.JobStatusInProgress); err != nil { + worker.logger.Error("Worker: Failed to reset job data. May resume instead of restarting.", mlog.Err(err)) + } +} + +// setJobSuccess records the job as successful. +func (worker *BatchMigrationWorker) setJobSuccess(logger mlog.LoggerIFace, job *model.Job) { + if err := worker.jobServer.SetJobProgress(job, 100); err != nil { + logger.Error("Worker: Failed to update progress for job", mlog.Err(err)) + worker.setJobError(logger, job, err) + } + + if err := worker.jobServer.SetJobSuccess(job); err != nil { + logger.Error("Worker: Failed to set success for job", mlog.Err(err)) + worker.setJobError(logger, job, err) + } +} + +// setJobError puts the job into an error state, preventing the job from running again. +func (worker *BatchMigrationWorker) setJobError(logger mlog.LoggerIFace, job *model.Job, appError *model.AppError) { + if err := worker.jobServer.SetJobError(job, appError); err != nil { + logger.Error("Worker: Failed to set job error", mlog.Err(err)) + } +} + +// markAsComplete records a discrete migration key to prevent this job from ever running again. +func (worker *BatchMigrationWorker) markAsComplete() { + system := model.System{ + Name: worker.migrationKey, + Value: "true", + } + + // Note that if this fails, then the job would have still succeeded. We will spuriously + // run the job again in the future, but as migrations are idempotent it won't be an issue. + if err := worker.jobServer.Store.System().Save(&system); err != nil { + worker.logger.Error("Worker: Failed to mark migration as completed in the systems table.", mlog.String("migration_key", worker.migrationKey), mlog.Err(err)) + } +} diff --git a/server/channels/jobs/batch_migration_worker_test.go b/server/channels/jobs/batch_migration_worker_test.go new file mode 100644 index 0000000000..d569ca85cf --- /dev/null +++ b/server/channels/jobs/batch_migration_worker_test.go @@ -0,0 +1,310 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package jobs_test + +import ( + "strconv" + "testing" + "time" + + "github.com/mattermost/mattermost/server/public/model" + "github.com/mattermost/mattermost/server/public/shared/mlog" + "github.com/mattermost/mattermost/server/public/shared/request" + "github.com/mattermost/mattermost/server/v8/channels/jobs" + "github.com/mattermost/mattermost/server/v8/channels/store" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type MockApp struct { + clusterInfo []*model.ClusterInfo +} + +func (ma MockApp) GetClusterStatus() []*model.ClusterInfo { + return ma.clusterInfo +} + +func (ma *MockApp) SetInSync() { + ma.clusterInfo = nil + ma.clusterInfo = append(ma.clusterInfo, &model.ClusterInfo{ + SchemaVersion: "a", + }) + ma.clusterInfo = append(ma.clusterInfo, &model.ClusterInfo{ + SchemaVersion: "a", + }) +} + +func (ma *MockApp) SetOutOfSync() { + ma.clusterInfo = nil + ma.clusterInfo = append(ma.clusterInfo, &model.ClusterInfo{ + SchemaVersion: "a", + }) + ma.clusterInfo = append(ma.clusterInfo, &model.ClusterInfo{ + SchemaVersion: "b", + }) +} + +func TestBatchMigrationWorker(t *testing.T) { + waitDone := func(t *testing.T, done chan bool, msg string) { + t.Helper() + + require.Eventually(t, func() bool { + select { + case <-done: + return true + default: + return false + } + }, 5*time.Second, 100*time.Millisecond, msg) + } + + setupBatchWorker := func(t *testing.T, th *TestHelper, mockApp *MockApp, doMigrationBatch func(model.StringMap, store.Store) (model.StringMap, bool, error)) (model.Worker, *model.Job) { + t.Helper() + + migrationKey := model.NewId() + timeBetweenBatches := 1 * time.Second + + worker := jobs.MakeBatchMigrationWorker( + th.Server.Jobs, + th.Server.Store(), + mockApp, + migrationKey, + timeBetweenBatches, + doMigrationBatch, + ) + th.Server.Jobs.RegisterJobType(migrationKey, worker, nil) + + ctx := request.EmptyContext(mlog.CreateConsoleTestLogger(t)) + job, appErr := th.Server.Jobs.CreateJob(ctx, migrationKey, nil) + require.Nil(t, appErr) + + done := make(chan bool) + go func() { + defer close(done) + worker.Run() + }() + + // When ending the test, ensure we wait for the worker to finish. + t.Cleanup(func() { + waitDone(t, done, "worker did not stop running") + }) + + // Give the worker time to start running + time.Sleep(500 * time.Millisecond) + + return worker, job + } + + stopWorker := func(t *testing.T, worker model.Worker) { + t.Helper() + + stopped := make(chan bool, 1) + go func() { + worker.Stop() + close(stopped) + }() + + waitDone(t, stopped, "worker did not stop") + } + + waitForJobStatus := func(t *testing.T, th *TestHelper, job *model.Job, status string) { + t.Helper() + + require.Eventuallyf(t, func() bool { + ctx := request.EmptyContext(mlog.CreateConsoleTestLogger(t)) + actualJob, appErr := th.Server.Jobs.GetJob(ctx, job.Id) + require.Nil(t, appErr) + require.Equal(t, job.Id, actualJob.Id) + + return actualJob.Status == status + }, 5*time.Second, 250*time.Millisecond, "job never transitioned to %s", status) + } + + assertJobReset := func(t *testing.T, th *TestHelper, job *model.Job) { + ctx := request.EmptyContext(mlog.CreateConsoleTestLogger(t)) + actualJob, appErr := th.Server.Jobs.GetJob(ctx, job.Id) + require.Nil(t, appErr) + assert.Empty(t, actualJob.Progress) + assert.Empty(t, actualJob.Data) + } + + getBatchNumberFromData := func(t *testing.T, data model.StringMap) int { + t.Helper() + + if data["batch_number"] == "" { + data["batch_number"] = "1" + } + batchNumber, err := strconv.Atoi(data["batch_number"]) + require.NoError(t, err) + + return batchNumber + } + + getDataFromBatchNumber := func(batchNumber int) model.StringMap { + data := make(model.StringMap) + data["batch_number"] = strconv.Itoa(batchNumber) + + return data + } + + t.Run("clusters not in sync before first batch", func(t *testing.T) { + th := Setup(t).InitBasic() + defer th.TearDown() + + mockApp := &MockApp{} + mockApp.SetOutOfSync() + + var worker model.Worker + var job *model.Job + worker, job = setupBatchWorker(t, th, mockApp, func(model.StringMap, store.Store) (model.StringMap, bool, error) { + require.Fail(t, "migration batch should never run while clusters not in sync") + + return nil, false, nil + }) + + // Give the worker time to start running + time.Sleep(500 * time.Millisecond) + + // Queue the work to be done + worker.JobChannel() <- *job + + waitForJobStatus(t, th, job, model.JobStatusPending) + assertJobReset(t, th, job) + + stopWorker(t, worker) + }) + + t.Run("stop after first batch", func(t *testing.T) { + th := Setup(t).InitBasic() + defer th.TearDown() + + mockApp := &MockApp{} + + var worker model.Worker + var job *model.Job + worker, job = setupBatchWorker(t, th, mockApp, func(data model.StringMap, s store.Store) (model.StringMap, bool, error) { + batchNumber := getBatchNumberFromData(t, data) + + require.Equal(t, 1, batchNumber, "only batch 1 should have run") + + // Shut down the worker after the first batch to prevent subsequent ones. + go worker.Stop() + + batchNumber++ + + return getDataFromBatchNumber(batchNumber), false, nil + }) + + // Queue the work to be done + worker.JobChannel() <- *job + + waitForJobStatus(t, th, job, model.JobStatusPending) + }) + + t.Run("stop after second batch", func(t *testing.T) { + th := Setup(t).InitBasic() + defer th.TearDown() + + mockApp := &MockApp{} + + var worker model.Worker + var job *model.Job + worker, job = setupBatchWorker(t, th, mockApp, func(data model.StringMap, s store.Store) (model.StringMap, bool, error) { + batchNumber := getBatchNumberFromData(t, data) + + require.LessOrEqual(t, batchNumber, 2, "only batches 1 and 2 should have run") + + // Shut down the worker after the first batch to prevent subsequent ones. + go worker.Stop() + batchNumber++ + + return getDataFromBatchNumber(batchNumber), false, nil + }) + + // Queue the work to be done + worker.JobChannel() <- *job + + waitForJobStatus(t, th, job, model.JobStatusPending) + }) + + t.Run("clusters not in sync after first batch", func(t *testing.T) { + th := Setup(t).InitBasic() + defer th.TearDown() + + mockApp := &MockApp{} + + var worker model.Worker + var job *model.Job + worker, job = setupBatchWorker(t, th, mockApp, func(data model.StringMap, s store.Store) (model.StringMap, bool, error) { + batchNumber := getBatchNumberFromData(t, data) + require.Equal(t, 1, batchNumber, "only batch 1 should have run") + + mockApp.SetOutOfSync() + batchNumber++ + + return getDataFromBatchNumber(batchNumber), false, nil + }) + + // Give the worker time to start running + time.Sleep(500 * time.Millisecond) + + // Queue the work to be done + worker.JobChannel() <- *job + + waitForJobStatus(t, th, job, model.JobStatusPending) + assertJobReset(t, th, job) + + stopWorker(t, worker) + }) + + t.Run("done after first batch", func(t *testing.T) { + th := Setup(t).InitBasic() + defer th.TearDown() + + mockApp := &MockApp{} + + var worker model.Worker + var job *model.Job + worker, job = setupBatchWorker(t, th, mockApp, func(data model.StringMap, s store.Store) (model.StringMap, bool, error) { + batchNumber := getBatchNumberFromData(t, data) + require.Equal(t, 1, batchNumber, "only batch 1 should have run") + + // Shut down the worker after the first batch to prevent subsequent ones. + go worker.Stop() + batchNumber++ + + return getDataFromBatchNumber(batchNumber), true, nil + }) + + // Queue the work to be done + worker.JobChannel() <- *job + + waitForJobStatus(t, th, job, model.JobStatusSuccess) + }) + + t.Run("done after three batches", func(t *testing.T) { + th := Setup(t).InitBasic() + defer th.TearDown() + + mockApp := &MockApp{} + + var worker model.Worker + var job *model.Job + worker, job = setupBatchWorker(t, th, mockApp, func(data model.StringMap, s store.Store) (model.StringMap, bool, error) { + batchNumber := getBatchNumberFromData(t, data) + require.LessOrEqual(t, batchNumber, 3, "only 3 batches should have run") + + // Shut down the worker after the first batch to prevent subsequent ones. + go worker.Stop() + batchNumber++ + + return getDataFromBatchNumber(batchNumber), true, nil + }) + + // Queue the work to be done + worker.JobChannel() <- *job + + waitForJobStatus(t, th, job, model.JobStatusSuccess) + }) +} diff --git a/server/channels/jobs/delete_empty_drafts_migration/delete_empty_drafts_migration.go b/server/channels/jobs/delete_empty_drafts_migration/delete_empty_drafts_migration.go new file mode 100644 index 0000000000..74d6fc0c47 --- /dev/null +++ b/server/channels/jobs/delete_empty_drafts_migration/delete_empty_drafts_migration.go @@ -0,0 +1,85 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package delete_empty_drafts_migration + +import ( + "strconv" + "time" + + "github.com/mattermost/mattermost/server/public/model" + "github.com/mattermost/mattermost/server/v8/channels/jobs" + "github.com/mattermost/mattermost/server/v8/channels/store" + "github.com/pkg/errors" +) + +const ( + timeBetweenBatches = 1 * time.Second +) + +// MakeWorker creates a batch migration worker to delete empty drafts. +func MakeWorker(jobServer *jobs.JobServer, store store.Store, app jobs.BatchMigrationWorkerAppIFace) model.Worker { + return jobs.MakeBatchMigrationWorker( + jobServer, + store, + app, + model.MigrationKeyDeleteEmptyDrafts, + timeBetweenBatches, + doDeleteEmptyDraftsMigrationBatch, + ) +} + +// parseJobMetadata parses the opaque job metadata to return the information needed to decide which +// batch to process next. +func parseJobMetadata(data model.StringMap) (int64, string, error) { + createAt := int64(0) + if data["create_at"] != "" { + parsedCreateAt, parseErr := strconv.ParseInt(data["create_at"], 10, 64) + if parseErr != nil { + return 0, "", errors.Wrap(parseErr, "failed to parse create_at") + } + createAt = parsedCreateAt + } + + userID := data["user_id"] + + return createAt, userID, nil +} + +// makeJobMetadata encodes the information needed to decide which batch to process next back into +// the opaque job metadata. +func makeJobMetadata(createAt int64, userID string) model.StringMap { + data := make(model.StringMap) + data["create_at"] = strconv.FormatInt(createAt, 10) + data["user_id"] = userID + + return data +} + +// doDeleteEmptyDraftsMigrationBatch iterates through all drafts, deleting empty drafts within each +// batch keyed by the compound primary key (createAt, userID) +func doDeleteEmptyDraftsMigrationBatch(data model.StringMap, store store.Store) (model.StringMap, bool, error) { + createAt, userID, err := parseJobMetadata(data) + if err != nil { + return nil, false, errors.Wrap(err, "failed to parse job metadata") + } + + // Determine the /next/ (createAt, userId) by finding the last record in the batch we're + // about to delete. + nextCreateAt, nextUserID, err := store.Draft().GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt, userID) + if err != nil { + return nil, false, errors.Wrapf(err, "failed to get the next batch (create_at=%v, user_id=%v)", createAt, userID) + } + + // If we get the nil values, it means the batch was empty and we're done. + if nextCreateAt == 0 && nextUserID == "" { + return nil, true, nil + } + + err = store.Draft().DeleteEmptyDraftsByCreateAtAndUserId(createAt, userID) + if err != nil { + return nil, false, errors.Wrapf(err, "failed to delete empty drafts (create_at=%v, user_id=%v)", createAt, userID) + } + + return makeJobMetadata(nextCreateAt, nextUserID), false, nil +} diff --git a/server/channels/jobs/delete_empty_drafts_migration/delete_empty_drafts_migration_test.go b/server/channels/jobs/delete_empty_drafts_migration/delete_empty_drafts_migration_test.go new file mode 100644 index 0000000000..021e615b1b --- /dev/null +++ b/server/channels/jobs/delete_empty_drafts_migration/delete_empty_drafts_migration_test.go @@ -0,0 +1,180 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package delete_empty_drafts_migration + +import ( + "errors" + "testing" + + "github.com/mattermost/mattermost/server/public/model" + "github.com/mattermost/mattermost/server/v8/channels/store/storetest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestJobMetadata(t *testing.T) { + t.Run("parse nil data", func(t *testing.T) { + var data model.StringMap + createAt, userID, err := parseJobMetadata(data) + require.NoError(t, err) + assert.Empty(t, createAt) + assert.Empty(t, userID) + }) + + t.Run("parse invalid create_at", func(t *testing.T) { + data := make(model.StringMap) + data["user_id"] = "user_id" + data["create_at"] = "invalid" + _, _, err := parseJobMetadata(data) + require.Error(t, err) + }) + + t.Run("parse valid", func(t *testing.T) { + data := make(model.StringMap) + data["user_id"] = "user_id" + data["create_at"] = "1695918431" + + createAt, userID, err := parseJobMetadata(data) + require.NoError(t, err) + assert.EqualValues(t, 1695918431, createAt) + assert.Equal(t, "user_id", userID) + }) + + t.Run("parse/make", func(t *testing.T) { + data := makeJobMetadata(1695918431, "user_id") + assert.Equal(t, "1695918431", data["create_at"]) + assert.Equal(t, "user_id", data["user_id"]) + + createAt, userID, err := parseJobMetadata(data) + require.NoError(t, err) + assert.EqualValues(t, 1695918431, createAt) + assert.Equal(t, "user_id", userID) + }) +} + +func TestDoDeleteEmptyDraftsMigrationBatch(t *testing.T) { + t.Run("invalid job metadata", func(t *testing.T) { + mockStore := &storetest.Store{} + t.Cleanup(func() { + mockStore.AssertExpectations(t) + }) + + data := make(model.StringMap) + data["user_id"] = "user_id" + data["create_at"] = "invalid" + data, done, err := doDeleteEmptyDraftsMigrationBatch(data, mockStore) + require.Error(t, err) + assert.False(t, done) + assert.Nil(t, data) + }) + + t.Run("failure getting next offset", func(t *testing.T) { + mockStore := &storetest.Store{} + t.Cleanup(func() { + mockStore.AssertExpectations(t) + }) + + createAt, userID := int64(1695920000), "user_id_1" + nextCreateAt, nextUserID := int64(0), "" + + mockStore.DraftStore.On("GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration", createAt, userID).Return(nextCreateAt, nextUserID, errors.New("failure")) + + data, done, err := doDeleteEmptyDraftsMigrationBatch(makeJobMetadata(createAt, userID), mockStore) + require.EqualError(t, err, "failed to get the next batch (create_at=1695920000, user_id=user_id_1): failure") + assert.False(t, done) + assert.Nil(t, data) + }) + + t.Run("failure deleting batch", func(t *testing.T) { + mockStore := &storetest.Store{} + t.Cleanup(func() { + mockStore.AssertExpectations(t) + }) + + createAt, userID := int64(1695920000), "user_id_1" + nextCreateAt, nextUserID := int64(1695922034), "user_id_2" + + mockStore.DraftStore.On("GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration", createAt, userID).Return(nextCreateAt, nextUserID, nil) + mockStore.DraftStore.On("DeleteEmptyDraftsByCreateAtAndUserId", createAt, userID).Return(errors.New("failure")) + + data, done, err := doDeleteEmptyDraftsMigrationBatch(makeJobMetadata(createAt, userID), mockStore) + require.EqualError(t, err, "failed to delete empty drafts (create_at=1695920000, user_id=user_id_1): failure") + assert.False(t, done) + assert.Nil(t, data) + }) + + t.Run("do first batch (nil job metadata)", func(t *testing.T) { + mockStore := &storetest.Store{} + t.Cleanup(func() { + mockStore.AssertExpectations(t) + }) + + createAt, userID := int64(0), "" + nextCreateAt, nextUserID := int64(1695922034), "user_id_2" + + mockStore.DraftStore.On("GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration", createAt, userID).Return(nextCreateAt, nextUserID, nil) + mockStore.DraftStore.On("DeleteEmptyDraftsByCreateAtAndUserId", createAt, userID).Return(nil) + + data, done, err := doDeleteEmptyDraftsMigrationBatch(nil, mockStore) + require.NoError(t, err) + assert.False(t, done) + assert.Equal(t, model.StringMap{ + "create_at": "1695922034", + "user_id": "user_id_2", + }, data) + }) + + t.Run("do first batch (empty job metadata)", func(t *testing.T) { + mockStore := &storetest.Store{} + t.Cleanup(func() { + mockStore.AssertExpectations(t) + }) + + createAt, userID := int64(0), "" + nextCreateAt, nextUserID := int64(1695922034), "user_id_2" + + mockStore.DraftStore.On("GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration", createAt, userID).Return(nextCreateAt, nextUserID, nil) + mockStore.DraftStore.On("DeleteEmptyDraftsByCreateAtAndUserId", createAt, userID).Return(nil) + + data, done, err := doDeleteEmptyDraftsMigrationBatch(model.StringMap{}, mockStore) + require.NoError(t, err) + assert.False(t, done) + assert.Equal(t, makeJobMetadata(nextCreateAt, nextUserID), data) + }) + + t.Run("do batch", func(t *testing.T) { + mockStore := &storetest.Store{} + t.Cleanup(func() { + mockStore.AssertExpectations(t) + }) + + createAt, userID := int64(1695922000), "user_id_1" + nextCreateAt, nextUserID := int64(1695922034), "user_id_2" + + mockStore.DraftStore.On("GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration", createAt, userID).Return(nextCreateAt, nextUserID, nil) + mockStore.DraftStore.On("DeleteEmptyDraftsByCreateAtAndUserId", createAt, userID).Return(nil) + + data, done, err := doDeleteEmptyDraftsMigrationBatch(makeJobMetadata(createAt, userID), mockStore) + require.NoError(t, err) + assert.False(t, done) + assert.Equal(t, makeJobMetadata(nextCreateAt, nextUserID), data) + }) + + t.Run("done batches", func(t *testing.T) { + mockStore := &storetest.Store{} + t.Cleanup(func() { + mockStore.AssertExpectations(t) + }) + + createAt, userID := int64(1695922000), "user_id_1" + nextCreateAt, nextUserID := int64(0), "" + + mockStore.DraftStore.On("GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration", createAt, userID).Return(nextCreateAt, nextUserID, nil) + + data, done, err := doDeleteEmptyDraftsMigrationBatch(makeJobMetadata(createAt, userID), mockStore) + require.NoError(t, err) + assert.True(t, done) + assert.Nil(t, data) + }) +} diff --git a/server/channels/jobs/helper_test.go b/server/channels/jobs/helper_test.go new file mode 100644 index 0000000000..fb2867ab9d --- /dev/null +++ b/server/channels/jobs/helper_test.go @@ -0,0 +1,223 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package jobs_test + +import ( + "os" + "path/filepath" + "sync" + "testing" + "time" + + "github.com/mattermost/mattermost/server/public/model" + "github.com/mattermost/mattermost/server/public/shared/mlog" + "github.com/mattermost/mattermost/server/public/shared/request" + "github.com/mattermost/mattermost/server/v8/channels/app" + "github.com/mattermost/mattermost/server/v8/channels/store" + "github.com/mattermost/mattermost/server/v8/config" +) + +type TestHelper struct { + App *app.App + Context *request.Context + Server *app.Server + BasicTeam *model.Team + BasicUser *model.User + BasicUser2 *model.User + + SystemAdminUser *model.User + LogBuffer *mlog.Buffer + TestLogger *mlog.Logger + IncludeCacheLayer bool + ConfigStore *config.Store + + tempWorkspace string +} + +func setupTestHelper(dbStore store.Store, enterprise bool, includeCacheLayer bool, options []app.Option, tb testing.TB) *TestHelper { + tempWorkspace, err := os.MkdirTemp("", "jobstest") + if err != nil { + panic(err) + } + + configStore := config.NewTestMemoryStore() + memoryConfig := configStore.Get() + memoryConfig.SqlSettings = *mainHelper.GetSQLSettings() + *memoryConfig.PluginSettings.Directory = filepath.Join(tempWorkspace, "plugins") + *memoryConfig.PluginSettings.ClientDirectory = filepath.Join(tempWorkspace, "webapp") + *memoryConfig.PluginSettings.AutomaticPrepackagedPlugins = false + *memoryConfig.LogSettings.EnableSentry = false // disable error reporting during tests + *memoryConfig.AnnouncementSettings.AdminNoticesEnabled = false + *memoryConfig.AnnouncementSettings.UserNoticesEnabled = false + configStore.Set(memoryConfig) + + buffer := &mlog.Buffer{} + + options = append(options, app.ConfigStore(configStore)) + if includeCacheLayer { + // Adds the cache layer to the test store + options = append(options, app.StoreOverrideWithCache(dbStore)) + } else { + options = append(options, app.StoreOverride(dbStore)) + } + + testLogger, _ := mlog.NewLogger() + logCfg, _ := config.MloggerConfigFromLoggerConfig(&memoryConfig.LogSettings, nil, config.GetLogFileLocation) + if errCfg := testLogger.ConfigureTargets(logCfg, nil); errCfg != nil { + panic("failed to configure test logger: " + errCfg.Error()) + } + if errW := mlog.AddWriterTarget(testLogger, buffer, true, mlog.StdAll...); errW != nil { + panic("failed to add writer target to test logger: " + errW.Error()) + } + // lock logger config so server init cannot override it during testing. + testLogger.LockConfiguration() + options = append(options, app.SetLogger(testLogger)) + + s, err := app.NewServer(options...) + if err != nil { + panic(err) + } + + th := &TestHelper{ + App: app.New(app.ServerConnector(s.Channels())), + Context: request.EmptyContext(testLogger), + Server: s, + LogBuffer: buffer, + TestLogger: testLogger, + IncludeCacheLayer: includeCacheLayer, + ConfigStore: configStore, + } + th.Context.SetLogger(testLogger) + + prevListenAddress := *th.App.Config().ServiceSettings.ListenAddress + th.App.UpdateConfig(func(cfg *model.Config) { *cfg.ServiceSettings.ListenAddress = "localhost:0" }) + serverErr := th.Server.Start() + if serverErr != nil { + panic(serverErr) + } + + th.App.UpdateConfig(func(cfg *model.Config) { *cfg.ServiceSettings.ListenAddress = prevListenAddress }) + + th.App.Srv().Store().MarkSystemRanUnitTests() + + return th +} + +func Setup(tb testing.TB, options ...app.Option) *TestHelper { + if testing.Short() { + tb.SkipNow() + } + dbStore := mainHelper.GetStore() + dbStore.DropAllTables() + dbStore.MarkSystemRanUnitTests() + mainHelper.PreloadMigrations() + + return setupTestHelper(dbStore, false, true, options, tb) +} + +var initBasicOnce sync.Once +var userCache struct { + SystemAdminUser *model.User + BasicUser *model.User + BasicUser2 *model.User +} + +func (th *TestHelper) InitBasic() *TestHelper { + // create users once and cache them because password hashing is slow + initBasicOnce.Do(func() { + th.SystemAdminUser = th.CreateUser() + th.App.UpdateUserRoles(th.Context, th.SystemAdminUser.Id, model.SystemUserRoleId+" "+model.SystemAdminRoleId, false) + th.SystemAdminUser, _ = th.App.GetUser(th.SystemAdminUser.Id) + userCache.SystemAdminUser = th.SystemAdminUser.DeepCopy() + + th.BasicUser = th.CreateUser() + th.BasicUser, _ = th.App.GetUser(th.BasicUser.Id) + userCache.BasicUser = th.BasicUser.DeepCopy() + + th.BasicUser2 = th.CreateUser() + th.BasicUser2, _ = th.App.GetUser(th.BasicUser2.Id) + userCache.BasicUser2 = th.BasicUser2.DeepCopy() + }) + // restore cached users + th.SystemAdminUser = userCache.SystemAdminUser.DeepCopy() + th.BasicUser = userCache.BasicUser.DeepCopy() + th.BasicUser2 = userCache.BasicUser2.DeepCopy() + + users := []*model.User{th.SystemAdminUser, th.BasicUser, th.BasicUser2} + mainHelper.GetSQLStore().User().InsertUsers(users) + + th.BasicTeam = th.CreateTeam() + return th +} + +func (th *TestHelper) CreateTeam() *model.Team { + id := model.NewId() + team := &model.Team{ + DisplayName: "dn_" + id, + Name: "name" + id, + Email: "success+" + id + "@simulator.amazonses.com", + Type: model.TeamOpen, + } + + var err *model.AppError + if team, err = th.App.CreateTeam(th.Context, team); err != nil { + panic(err) + } + return team +} + +func (th *TestHelper) CreateUser() *model.User { + return th.CreateUserOrGuest(false) +} + +func (th *TestHelper) CreateUserOrGuest(guest bool) *model.User { + id := model.NewId() + + user := &model.User{ + Email: "success+" + id + "@simulator.amazonses.com", + Username: "un_" + id, + Nickname: "nn_" + id, + Password: "Password1", + EmailVerified: true, + } + + var err *model.AppError + if guest { + if user, err = th.App.CreateGuest(th.Context, user); err != nil { + panic(err) + } + } else { + if user, err = th.App.CreateUser(th.Context, user); err != nil { + panic(err) + } + } + return user +} + +func (th *TestHelper) ShutdownApp() { + done := make(chan bool) + go func() { + th.Server.Shutdown() + close(done) + }() + + select { + case <-done: + case <-time.After(30 * time.Second): + // panic instead of fatal to terminate all tests in this package, otherwise the + // still running App could spuriously fail subsequent tests. + panic("failed to shutdown App within 30 seconds") + } +} + +func (th *TestHelper) TearDown() { + if th.IncludeCacheLayer { + // Clean all the caches + th.App.Srv().InvalidateAllCaches() + } + th.ShutdownApp() + if th.tempWorkspace != "" { + os.RemoveAll(th.tempWorkspace) + } +} diff --git a/server/channels/jobs/main_test.go b/server/channels/jobs/main_test.go new file mode 100644 index 0000000000..943d307f6d --- /dev/null +++ b/server/channels/jobs/main_test.go @@ -0,0 +1,24 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package jobs_test + +import ( + "testing" + + "github.com/mattermost/mattermost/server/v8/channels/testlib" +) + +var mainHelper *testlib.MainHelper + +func TestMain(m *testing.M) { + var options = testlib.HelperOptions{ + EnableStore: true, + EnableResources: true, + } + + mainHelper = testlib.NewMainHelperWithOptions(&options) + defer mainHelper.Close() + + mainHelper.Main(m) +} diff --git a/server/channels/store/opentracinglayer/opentracinglayer.go b/server/channels/store/opentracinglayer/opentracinglayer.go index fd86e98e62..014374d2ed 100644 --- a/server/channels/store/opentracinglayer/opentracinglayer.go +++ b/server/channels/store/opentracinglayer/opentracinglayer.go @@ -3335,6 +3335,24 @@ func (s *OpenTracingLayerDraftStore) Delete(userID string, channelID string, roo return err } +func (s *OpenTracingLayerDraftStore) DeleteEmptyDraftsByCreateAtAndUserId(createAt int64, userId string) error { + origCtx := s.Root.Store.Context() + span, newCtx := tracing.StartSpanWithParentByContext(s.Root.Store.Context(), "DraftStore.DeleteEmptyDraftsByCreateAtAndUserId") + s.Root.Store.SetContext(newCtx) + defer func() { + s.Root.Store.SetContext(origCtx) + }() + + defer span.Finish() + err := s.DraftStore.DeleteEmptyDraftsByCreateAtAndUserId(createAt, userId) + if err != nil { + span.LogFields(spanlog.Error(err)) + ext.Error.Set(span, true) + } + + return err +} + func (s *OpenTracingLayerDraftStore) Get(userID string, channelID string, rootID string, includeDeleted bool) (*model.Draft, error) { origCtx := s.Root.Store.Context() span, newCtx := tracing.StartSpanWithParentByContext(s.Root.Store.Context(), "DraftStore.Get") @@ -3371,6 +3389,24 @@ func (s *OpenTracingLayerDraftStore) GetDraftsForUser(userID string, teamID stri return result, err } +func (s *OpenTracingLayerDraftStore) GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt int64, userId string) (int64, string, error) { + origCtx := s.Root.Store.Context() + span, newCtx := tracing.StartSpanWithParentByContext(s.Root.Store.Context(), "DraftStore.GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration") + s.Root.Store.SetContext(newCtx) + defer func() { + s.Root.Store.SetContext(origCtx) + }() + + defer span.Finish() + result, resultVar1, err := s.DraftStore.GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt, userId) + if err != nil { + span.LogFields(spanlog.Error(err)) + ext.Error.Set(span, true) + } + + return result, resultVar1, err +} + func (s *OpenTracingLayerDraftStore) Upsert(d *model.Draft) (*model.Draft, error) { origCtx := s.Root.Store.Context() span, newCtx := tracing.StartSpanWithParentByContext(s.Root.Store.Context(), "DraftStore.Upsert") diff --git a/server/channels/store/retrylayer/retrylayer.go b/server/channels/store/retrylayer/retrylayer.go index a71aba933e..22edcdc277 100644 --- a/server/channels/store/retrylayer/retrylayer.go +++ b/server/channels/store/retrylayer/retrylayer.go @@ -3724,6 +3724,27 @@ func (s *RetryLayerDraftStore) Delete(userID string, channelID string, rootID st } +func (s *RetryLayerDraftStore) DeleteEmptyDraftsByCreateAtAndUserId(createAt int64, userId string) error { + + tries := 0 + for { + err := s.DraftStore.DeleteEmptyDraftsByCreateAtAndUserId(createAt, userId) + if err == nil { + return nil + } + if !isRepeatableError(err) { + return err + } + tries++ + if tries >= 3 { + err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures") + return err + } + timepkg.Sleep(100 * timepkg.Millisecond) + } + +} + func (s *RetryLayerDraftStore) Get(userID string, channelID string, rootID string, includeDeleted bool) (*model.Draft, error) { tries := 0 @@ -3766,6 +3787,27 @@ func (s *RetryLayerDraftStore) GetDraftsForUser(userID string, teamID string) ([ } +func (s *RetryLayerDraftStore) GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt int64, userId string) (int64, string, error) { + + tries := 0 + for { + result, resultVar1, err := s.DraftStore.GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt, userId) + if err == nil { + return result, resultVar1, nil + } + if !isRepeatableError(err) { + return result, resultVar1, err + } + tries++ + if tries >= 3 { + err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures") + return result, resultVar1, err + } + timepkg.Sleep(100 * timepkg.Millisecond) + } + +} + func (s *RetryLayerDraftStore) Upsert(d *model.Draft) (*model.Draft, error) { tries := 0 diff --git a/server/channels/store/sqlstore/draft_store.go b/server/channels/store/sqlstore/draft_store.go index 0996b60b89..6e12d3ea61 100644 --- a/server/channels/store/sqlstore/draft_store.go +++ b/server/channels/store/sqlstore/draft_store.go @@ -159,11 +159,8 @@ func (s *SqlDraftStore) GetDraftsForUser(userID, teamID string) ([]*model.Draft, } func (s *SqlDraftStore) Delete(userID, channelID, rootID string) error { - time := model.GetMillis() query := s.getQueryBuilder(). - Update("Drafts"). - Set("UpdateAt", time). - Set("DeleteAt", time). + Delete("Drafts"). Where(sq.Eq{ "UserId": userID, "ChannelId": channelID, @@ -236,3 +233,90 @@ func (s *SqlDraftStore) determineMaxDraftSize() int { return maxDraftSize } + +func (s *SqlDraftStore) GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt int64, userId string) (int64, string, error) { + var drafts []struct { + CreateAt int64 + UserId string + } + + query := s.getQueryBuilder(). + Select("CreateAt", "UserId"). + From("Drafts"). + Where(sq.Or{ + sq.Gt{"CreateAt": createAt}, + sq.And{ + sq.Eq{"CreateAt": createAt}, + sq.Gt{"UserId": userId}, + }, + }). + OrderBy("CreateAt", "UserId ASC"). + Limit(100) + + err := s.GetReplicaX().SelectBuilder(&drafts, query) + if err != nil { + return 0, "", errors.Wrap(err, "failed to get the list of drafts") + } + + if len(drafts) == 0 { + return 0, "", nil + } + + lastElement := drafts[len(drafts)-1] + return lastElement.CreateAt, lastElement.UserId, nil +} + +func (s *SqlDraftStore) DeleteEmptyDraftsByCreateAtAndUserId(createAt int64, userId string) error { + var builder Builder + if s.DriverName() == model.DatabaseDriverPostgres { + builder = s.getQueryBuilder(). + Delete("Drafts d"). + PrefixExpr(s.getQueryBuilder().Select(). + Prefix("WITH dd AS ("). + Columns("UserId", "ChannelId", "RootId"). + From("Drafts"). + Where(sq.Or{ + sq.Gt{"CreateAt": createAt}, + sq.And{ + sq.Eq{"CreateAt": createAt}, + sq.Gt{"UserId": userId}, + }, + }). + OrderBy("CreateAt", "UserId"). + Limit(100). + Suffix(")"), + ). + Using("dd"). + Where("d.UserId = dd.UserId"). + Where("d.ChannelId = dd.ChannelId"). + Where("d.RootId = dd.RootId"). + Where("d.Message = ''") + } else if s.DriverName() == model.DatabaseDriverMysql { + builder = s.getQueryBuilder(). + Delete("Drafts d"). + What("d.*"). + JoinClause(s.getQueryBuilder().Select(). + Prefix("INNER JOIN ("). + Columns("UserId, ChannelId, RootId"). + From("Drafts"). + Where(sq.And{ + sq.Or{ + sq.Gt{"CreateAt": createAt}, + sq.And{ + sq.Eq{"CreateAt": createAt}, + sq.Gt{"UserId": userId}, + }, + }, + }). + OrderBy("CreateAt", "UserId"). + Limit(100). + Suffix(") dj ON (d.UserId = dj.UserId AND d.ChannelId = dj.ChannelId AND d.RootId = dj.RootId)"), + ).Where(sq.Eq{"Message": ""}) + } + + if _, err := s.GetMasterX().ExecBuilder(builder); err != nil { + return errors.Wrapf(err, "failed to delete empty drafts") + } + + return nil +} diff --git a/server/channels/store/store.go b/server/channels/store/store.go index 1b60015ba5..9886c3a2c5 100644 --- a/server/channels/store/store.go +++ b/server/channels/store/store.go @@ -986,6 +986,8 @@ type DraftStore interface { Get(userID, channelID, rootID string, includeDeleted bool) (*model.Draft, error) Delete(userID, channelID, rootID string) error GetDraftsForUser(userID, teamID string) ([]*model.Draft, error) + GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt int64, userId string) (int64, string, error) + DeleteEmptyDraftsByCreateAtAndUserId(createAt int64, userId string) error } type PostAcknowledgementStore interface { diff --git a/server/channels/store/storetest/draft_store.go b/server/channels/store/storetest/draft_store.go index de449f228f..7ccc387107 100644 --- a/server/channels/store/storetest/draft_store.go +++ b/server/channels/store/storetest/draft_store.go @@ -5,6 +5,7 @@ package storetest import ( "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -19,6 +20,8 @@ func TestDraftStore(t *testing.T, ss store.Store, s SqlStore) { t.Run("DeleteDraft", func(t *testing.T) { testDeleteDraft(t, ss) }) t.Run("GetDraft", func(t *testing.T) { testGetDraft(t, ss) }) t.Run("GetDraftsForUser", func(t *testing.T) { testGetDraftsForUser(t, ss) }) + t.Run("GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration", func(t *testing.T) { testGetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(t, ss) }) + t.Run("DeleteEmptyDraftsByCreateAtAndUserId", func(t *testing.T) { testDeleteEmptyDraftsByCreateAtAndUserId(t, ss) }) } func testSaveDraft(t *testing.T, ss store.Store) { @@ -275,24 +278,6 @@ func testGetDraft(t *testing.T, ss store.Store) { assert.Equal(t, draft2.Message, draftResp.Message) assert.Equal(t, draft2.ChannelId, draftResp.ChannelId) }) - - t.Run("get draft including deleted", func(t *testing.T) { - draftResp, err := ss.Draft().Get(user.Id, channel.Id, "", false) - assert.NoError(t, err) - assert.Equal(t, draft1.Message, draftResp.Message) - assert.Equal(t, draft1.ChannelId, draftResp.ChannelId) - - err = ss.Draft().Delete(user.Id, channel.Id, "") - assert.NoError(t, err) - _, err = ss.Draft().Get(user.Id, channel.Id, "", false) - assert.Error(t, err) - assert.IsType(t, &store.ErrNotFound{}, err) - - draftResp, err = ss.Draft().Get(user.Id, channel.Id, "", true) - assert.NoError(t, err) - assert.Equal(t, draft1.Message, draftResp.Message) - assert.Equal(t, draft1.ChannelId, draftResp.ChannelId) - }) } func testGetDraftsForUser(t *testing.T, ss store.Store) { @@ -351,3 +336,226 @@ func testGetDraftsForUser(t *testing.T, ss store.Store) { assert.ElementsMatch(t, []*model.Draft{draft1, draft2}, draftResp) }) } + +func clearDrafts(t *testing.T, ss store.Store) { + t.Helper() + + _, err := ss.GetInternalMasterDB().Exec("DELETE FROM Drafts") + require.NoError(t, err) +} + +func makeDrafts(t *testing.T, ss store.Store, count int, message string) { + t.Helper() + + var delay time.Duration + if count > 100 { + // When creating more than one page of drafts, improve the odds we get + // some results with different CreateAt timetsamps. + delay = 5 * time.Millisecond + } + + for i := 1; i <= count; i++ { + _, err := ss.Draft().Upsert(&model.Draft{ + CreateAt: model.GetMillis(), + UpdateAt: model.GetMillis(), + UserId: model.NewId(), + ChannelId: model.NewId(), + Message: message, + }) + require.NoError(t, err) + + if delay > 0 { + time.Sleep(delay) + } + } +} + +func countDraftPages(t *testing.T, ss store.Store) int { + t.Helper() + + pages := 0 + createAt := int64(0) + userId := "" + + for { + nextCreateAt, nextUserId, err := ss.Draft().GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt, userId) + require.NoError(t, err) + + if nextCreateAt == 0 && nextUserId == "" { + break + } + + // Ensure we're always making progress. + if nextCreateAt == createAt { + require.Greater(t, nextUserId, userId) + } else { + require.Greater(t, nextCreateAt, createAt) + } + + pages++ + createAt = nextCreateAt + userId = nextUserId + } + + return pages +} + +func testGetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(t *testing.T, ss store.Store) { + t.Run("no drafts", func(t *testing.T) { + clearDrafts(t, ss) + + createAt, userId, err := ss.Draft().GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(0, "") + require.NoError(t, err) + assert.EqualValues(t, 0, createAt) + assert.Equal(t, "", userId) + + assert.Equal(t, 0, countDraftPages(t, ss), "incorrect number of pages") + }) + + t.Run("single page", func(t *testing.T) { + clearDrafts(t, ss) + + makeDrafts(t, ss, 100, model.NewRandomString(16)) + assert.Equal(t, 1, countDraftPages(t, ss), "incorrect number of pages") + }) + + t.Run("multiple pages", func(t *testing.T) { + clearDrafts(t, ss) + + makeDrafts(t, ss, 300, model.NewRandomString(16)) + assert.Equal(t, 3, countDraftPages(t, ss), "incorrect number of pages") + }) +} + +func testDeleteEmptyDraftsByCreateAtAndUserId(t *testing.T, ss store.Store) { + t.Run("nil parameters", func(t *testing.T) { + clearDrafts(t, ss) + + err := ss.Draft().DeleteEmptyDraftsByCreateAtAndUserId(0, "") + require.NoError(t, err) + }) + + t.Run("delete single page, all empty", func(t *testing.T) { + clearDrafts(t, ss) + makeDrafts(t, ss, 100, "") + + createAt, userId := int64(0), "" + nextCreateAt, nextUserId, err := ss.Draft().GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt, userId) + require.NoError(t, err) + err = ss.Draft().DeleteEmptyDraftsByCreateAtAndUserId(createAt, userId) + require.NoError(t, err) + createAt, userId = nextCreateAt, nextUserId + + assert.Equal(t, 0, countDraftPages(t, ss), "incorrect number of pages") + + nextCreateAt, nextUserId, err = ss.Draft().GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt, userId) + require.NoError(t, err) + assert.EqualValues(t, 0, nextCreateAt, "should have finished iterating through drafts") + assert.Equal(t, "", nextUserId, "should have finished iterating through drafts") + }) + + t.Run("delete multiple pages, all empty", func(t *testing.T) { + clearDrafts(t, ss) + makeDrafts(t, ss, 300, "") + + createAt, userId := int64(0), "" + nextCreateAt, nextUserId, err := ss.Draft().GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt, userId) + require.NoError(t, err) + err = ss.Draft().DeleteEmptyDraftsByCreateAtAndUserId(createAt, userId) + require.NoError(t, err) + createAt, userId = nextCreateAt, nextUserId + + assert.Equal(t, 2, countDraftPages(t, ss), "incorrect number of pages") + + nextCreateAt, nextUserId, err = ss.Draft().GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt, userId) + require.NoError(t, err) + err = ss.Draft().DeleteEmptyDraftsByCreateAtAndUserId(createAt, userId) + require.NoError(t, err) + createAt, userId = nextCreateAt, nextUserId + + assert.Equal(t, 1, countDraftPages(t, ss), "incorrect number of pages") + + nextCreateAt, nextUserId, err = ss.Draft().GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt, userId) + require.NoError(t, err) + err = ss.Draft().DeleteEmptyDraftsByCreateAtAndUserId(createAt, userId) + require.NoError(t, err) + createAt, userId = nextCreateAt, nextUserId + + assert.Equal(t, 0, countDraftPages(t, ss), "incorrect number of pages") + + nextCreateAt, nextUserId, err = ss.Draft().GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt, userId) + require.NoError(t, err) + assert.EqualValues(t, 0, nextCreateAt, "should have finished iterating through drafts") + assert.Equal(t, "", nextUserId, "should have finished iterating through drafts") + }) + + t.Run("delete multiple pages, some empty", func(t *testing.T) { + clearDrafts(t, ss) + makeDrafts(t, ss, 50, "") + makeDrafts(t, ss, 50, "message") + makeDrafts(t, ss, 50, "") + makeDrafts(t, ss, 50, "message") + makeDrafts(t, ss, 50, "") + makeDrafts(t, ss, 50, "message") + makeDrafts(t, ss, 50, "") + makeDrafts(t, ss, 50, "message") + makeDrafts(t, ss, 50, "message") + makeDrafts(t, ss, 50, "message") + + // Verify initially 5 pages + assert.Equal(t, 5, countDraftPages(t, ss), "incorrect number of pages") + + createAt, userId := int64(0), "" + + nextCreateAt, nextUserId, err := ss.Draft().GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt, userId) + require.NoError(t, err) + err = ss.Draft().DeleteEmptyDraftsByCreateAtAndUserId(createAt, userId) + require.NoError(t, err) + createAt, userId = nextCreateAt, nextUserId + + // Only deleted 50, so still 5 pages + assert.Equal(t, 5, countDraftPages(t, ss), "incorrect number of pages") + + nextCreateAt, nextUserId, err = ss.Draft().GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt, userId) + require.NoError(t, err) + err = ss.Draft().DeleteEmptyDraftsByCreateAtAndUserId(createAt, userId) + require.NoError(t, err) + createAt, userId = nextCreateAt, nextUserId + + // Now deleted 100, so down to 4 pages + assert.Equal(t, 4, countDraftPages(t, ss), "incorrect number of pages") + + nextCreateAt, nextUserId, err = ss.Draft().GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt, userId) + require.NoError(t, err) + err = ss.Draft().DeleteEmptyDraftsByCreateAtAndUserId(createAt, userId) + require.NoError(t, err) + createAt, userId = nextCreateAt, nextUserId + + // Only deleted 150 now, so still 4 pages + assert.Equal(t, 4, countDraftPages(t, ss), "incorrect number of pages") + + nextCreateAt, nextUserId, err = ss.Draft().GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt, userId) + require.NoError(t, err) + err = ss.Draft().DeleteEmptyDraftsByCreateAtAndUserId(createAt, userId) + require.NoError(t, err) + createAt, userId = nextCreateAt, nextUserId + + // Now deleted all 200 empty messages, so down to 3 pages + assert.Equal(t, 3, countDraftPages(t, ss), "incorrect number of pages") + + // Keep going through all pages to verify nothing else gets deleted. + + nextCreateAt, nextUserId, err = ss.Draft().GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt, userId) + require.NoError(t, err) + err = ss.Draft().DeleteEmptyDraftsByCreateAtAndUserId(createAt, userId) + require.NoError(t, err) + createAt, userId = nextCreateAt, nextUserId + + // Verify we're done iterating + + nextCreateAt, nextUserId, err = ss.Draft().GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt, userId) + require.NoError(t, err) + assert.EqualValues(t, 0, nextCreateAt, "should have finished iterating through drafts") + assert.Equal(t, "", nextUserId, "should have finished iterating through drafts") + }) +} diff --git a/server/channels/store/storetest/mocks/DraftStore.go b/server/channels/store/storetest/mocks/DraftStore.go index 5194142777..cd9fda065c 100644 --- a/server/channels/store/storetest/mocks/DraftStore.go +++ b/server/channels/store/storetest/mocks/DraftStore.go @@ -28,6 +28,20 @@ func (_m *DraftStore) Delete(userID string, channelID string, rootID string) err return r0 } +// DeleteEmptyDraftsByCreateAtAndUserId provides a mock function with given fields: createAt, userId +func (_m *DraftStore) DeleteEmptyDraftsByCreateAtAndUserId(createAt int64, userId string) error { + ret := _m.Called(createAt, userId) + + var r0 error + if rf, ok := ret.Get(0).(func(int64, string) error); ok { + r0 = rf(createAt, userId) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // Get provides a mock function with given fields: userID, channelID, rootID, includeDeleted func (_m *DraftStore) Get(userID string, channelID string, rootID string, includeDeleted bool) (*model.Draft, error) { ret := _m.Called(userID, channelID, rootID, includeDeleted) @@ -80,6 +94,37 @@ func (_m *DraftStore) GetDraftsForUser(userID string, teamID string) ([]*model.D return r0, r1 } +// GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration provides a mock function with given fields: createAt, userId +func (_m *DraftStore) GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt int64, userId string) (int64, string, error) { + ret := _m.Called(createAt, userId) + + var r0 int64 + var r1 string + var r2 error + if rf, ok := ret.Get(0).(func(int64, string) (int64, string, error)); ok { + return rf(createAt, userId) + } + if rf, ok := ret.Get(0).(func(int64, string) int64); ok { + r0 = rf(createAt, userId) + } else { + r0 = ret.Get(0).(int64) + } + + if rf, ok := ret.Get(1).(func(int64, string) string); ok { + r1 = rf(createAt, userId) + } else { + r1 = ret.Get(1).(string) + } + + if rf, ok := ret.Get(2).(func(int64, string) error); ok { + r2 = rf(createAt, userId) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + // Upsert provides a mock function with given fields: d func (_m *DraftStore) Upsert(d *model.Draft) (*model.Draft, error) { ret := _m.Called(d) diff --git a/server/channels/store/timerlayer/timerlayer.go b/server/channels/store/timerlayer/timerlayer.go index ab90339dac..b59768e2d9 100644 --- a/server/channels/store/timerlayer/timerlayer.go +++ b/server/channels/store/timerlayer/timerlayer.go @@ -3059,6 +3059,22 @@ func (s *TimerLayerDraftStore) Delete(userID string, channelID string, rootID st return err } +func (s *TimerLayerDraftStore) DeleteEmptyDraftsByCreateAtAndUserId(createAt int64, userId string) error { + start := time.Now() + + err := s.DraftStore.DeleteEmptyDraftsByCreateAtAndUserId(createAt, userId) + + elapsed := float64(time.Since(start)) / float64(time.Second) + if s.Root.Metrics != nil { + success := "false" + if err == nil { + success = "true" + } + s.Root.Metrics.ObserveStoreMethodDuration("DraftStore.DeleteEmptyDraftsByCreateAtAndUserId", success, elapsed) + } + return err +} + func (s *TimerLayerDraftStore) Get(userID string, channelID string, rootID string, includeDeleted bool) (*model.Draft, error) { start := time.Now() @@ -3091,6 +3107,22 @@ func (s *TimerLayerDraftStore) GetDraftsForUser(userID string, teamID string) ([ return result, err } +func (s *TimerLayerDraftStore) GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt int64, userId string) (int64, string, error) { + start := time.Now() + + result, resultVar1, err := s.DraftStore.GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt, userId) + + elapsed := float64(time.Since(start)) / float64(time.Second) + if s.Root.Metrics != nil { + success := "false" + if err == nil { + success = "true" + } + s.Root.Metrics.ObserveStoreMethodDuration("DraftStore.GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration", success, elapsed) + } + return result, resultVar1, err +} + func (s *TimerLayerDraftStore) Upsert(d *model.Draft) (*model.Draft, error) { start := time.Now() diff --git a/server/channels/testlib/store.go b/server/channels/testlib/store.go index 6937f0456b..f6a28a6490 100644 --- a/server/channels/testlib/store.go +++ b/server/channels/testlib/store.go @@ -71,6 +71,7 @@ func GetMockStoreForSetupFunctions() *mocks.Store { systemStore.On("GetByName", model.MigrationKeyAddPlayboosksManageRolesPermissions).Return(&model.System{Name: model.MigrationKeyAddPlayboosksManageRolesPermissions, Value: "true"}, nil) systemStore.On("GetByName", model.MigrationKeyAddCustomUserGroupsPermissionRestore).Return(&model.System{Name: model.MigrationKeyAddCustomUserGroupsPermissionRestore, Value: "true"}, nil) systemStore.On("GetByName", model.MigrationKeyAddReadChannelContentPermissions).Return(&model.System{Name: model.MigrationKeyAddReadChannelContentPermissions, Value: "true"}, nil) + systemStore.On("GetByName", model.MigrationKeyDeleteEmptyDrafts).Return(&model.System{Name: model.MigrationKeyDeleteEmptyDrafts, Value: "true"}, nil) systemStore.On("GetByName", "CustomGroupAdminRoleCreationMigrationComplete").Return(&model.System{Name: model.MigrationKeyAddPlayboosksManageRolesPermissions, Value: "true"}, nil) systemStore.On("GetByName", "products_boards").Return(&model.System{Name: "products_boards", Value: "true"}, nil) systemStore.On("GetByName", "elasticsearch_fix_channel_index_migration").Return(&model.System{Name: "elasticsearch_fix_channel_index_migration", Value: "true"}, nil) diff --git a/server/go.mod b/server/go.mod index 4a84441e28..67db70c55a 100644 --- a/server/go.mod +++ b/server/go.mod @@ -39,7 +39,7 @@ require ( github.com/mattermost/mattermost/server/public v0.0.9 github.com/mattermost/morph v1.0.5-0.20230511171014-e76e25978d56 github.com/mattermost/rsc v0.0.0-20160330161541-bbaefb05eaa0 - github.com/mattermost/squirrel v0.2.0 + github.com/mattermost/squirrel v0.4.0 github.com/mholt/archiver/v3 v3.5.1 github.com/microcosm-cc/bluemonday v1.0.24 github.com/minio/minio-go/v7 v7.0.59 diff --git a/server/go.sum b/server/go.sum index 6b5f0aec20..3ce9bbc093 100644 --- a/server/go.sum +++ b/server/go.sum @@ -446,8 +446,8 @@ github.com/mattermost/morph v1.0.5-0.20230511171014-e76e25978d56 h1:SjFYbWvmuf73 github.com/mattermost/morph v1.0.5-0.20230511171014-e76e25978d56/go.mod h1:gD+EaqX2UMyyuzmF4PFh4r33XneQ8Nzi+0E8nXjMa3A= github.com/mattermost/rsc v0.0.0-20160330161541-bbaefb05eaa0 h1:G9tL6JXRBMzjuD1kkBtcnd42kUiT6QDwxfFYu7adM6o= github.com/mattermost/rsc v0.0.0-20160330161541-bbaefb05eaa0/go.mod h1:nV5bfVpT//+B1RPD2JvRnxbkLmJEYXmRaaVl15fsXjs= -github.com/mattermost/squirrel v0.2.0 h1:8ZWeyf+MWQ2cL7hu9REZgLtz2IJi51qqZEovI3T3TT8= -github.com/mattermost/squirrel v0.2.0/go.mod h1:NPPtk+CdpWre4GxMGoOpzEVFVc0ZoEFyJBZGCtn9nSU= +github.com/mattermost/squirrel v0.4.0 h1:azf9LZ+8JUTAvwt/njB1utkPqWQ6e7Rje2ya5N0P2i4= +github.com/mattermost/squirrel v0.4.0/go.mod h1:NPPtk+CdpWre4GxMGoOpzEVFVc0ZoEFyJBZGCtn9nSU= github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= diff --git a/server/public/model/draft.go b/server/public/model/draft.go index 73d1e69f98..e4b6c6c319 100644 --- a/server/public/model/draft.go +++ b/server/public/model/draft.go @@ -12,7 +12,7 @@ import ( type Draft struct { CreateAt int64 `json:"create_at"` UpdateAt int64 `json:"update_at"` - DeleteAt int64 `json:"delete_at"` + DeleteAt int64 `json:"delete_at"` // Deprecated, we now just hard delete the rows UserId string `json:"user_id"` ChannelId string `json:"channel_id"` RootId string `json:"root_id"` diff --git a/server/public/model/job.go b/server/public/model/job.go index 393a20c20b..0092d95970 100644 --- a/server/public/model/job.go +++ b/server/public/model/job.go @@ -36,6 +36,7 @@ const ( JobTypeHostedPurchaseScreening = "hosted_purchase_screening" JobTypeS3PathMigration = "s3_path_migration" JobTypeCleanupDesktopTokens = "cleanup_desktop_tokens" + JobTypeDeleteEmptyDraftsMigration = "delete_empty_drafts_migration" JobStatusPending = "pending" JobStatusInProgress = "in_progress" diff --git a/server/public/model/migration.go b/server/public/model/migration.go index 480080b872..faa33a0670 100644 --- a/server/public/model/migration.go +++ b/server/public/model/migration.go @@ -43,4 +43,5 @@ const ( MigrationKeyAddReadChannelContentPermissions = "read_channel_content_permissions" MigrationKeyElasticsearchFixChannelIndex = "elasticsearch_fix_channel_index_migration" MigrationKeyS3Path = "s3_path_migration" + MigrationKeyDeleteEmptyDrafts = "delete_empty_drafts_migration" )