mirror of
https://github.com/mattermost/mattermost.git
synced 2025-02-25 18:55:24 -06:00
[MM-53428] Delete empty drafts on upsert (#24046)
* [MM-53428] Delete empty drafts on upsert * Add migrations to fix existing drafts * Fix CI * Delete empty drafts entirely from the DB * Fix lint * Implement batch migration for deleting drafts * Missing store layers * Add updated mock * Remove unnecessary test * PR feedback * Add check for cluster migration * Fix MySQL * Don't check for len<2 * Bit of PR feedback * Use query builder for parameters * PR feedback * More PR feedback * Merge'd * unit test GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration * simplified builder interface * fix DeleteEmptyDraftsByCreateAtAndUserId for MySQL * rework as batch migration worker * fix typo * log ip address on version mismatches too * simplify reset semantics * remove trace log in favour of low spam * document parameters for clarity --------- Co-authored-by: Mattermost Build <build@mattermost.com> Co-authored-by: Jesse Hallam <jesse.hallam@gmail.com>
This commit is contained in:
@@ -56,6 +56,15 @@ func (a *App) UpsertDraft(c *request.Context, draft *model.Draft, connectionID s
|
||||
return nil, model.NewAppError("CreateDraft", "app.user.get.app_error", nil, nErr.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
// If the draft is empty, just delete it
|
||||
if draft.Message == "" {
|
||||
deleteErr := a.Srv().Store().Draft().Delete(draft.UserId, draft.ChannelId, draft.RootId)
|
||||
if deleteErr != nil {
|
||||
return nil, model.NewAppError("CreateDraft", "app.draft.save.app_error", nil, deleteErr.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
dt, nErr := a.Srv().Store().Draft().Upsert(draft)
|
||||
if nErr != nil {
|
||||
return nil, model.NewAppError("CreateDraft", "app.draft.save.app_error", nil, nErr.Error(), http.StatusInternalServerError)
|
||||
|
||||
@@ -600,6 +600,27 @@ func (s *Server) doCloudS3PathMigrations(c *request.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) doDeleteEmptyDraftsMigration(c *request.Context) {
|
||||
// If the migration is already marked as completed, don't do it again.
|
||||
if _, err := s.Store().System().GetByName(model.MigrationKeyDeleteEmptyDrafts); err == nil {
|
||||
return
|
||||
}
|
||||
|
||||
jobs, err := s.Store().Job().GetAllByTypeAndStatus(c, model.JobTypeDeleteEmptyDraftsMigration, model.JobStatusPending)
|
||||
if err != nil {
|
||||
mlog.Fatal("failed to get jobs by type and status", mlog.Err(err))
|
||||
return
|
||||
}
|
||||
if len(jobs) > 0 {
|
||||
return
|
||||
}
|
||||
|
||||
if _, appErr := s.Jobs.CreateJobOnce(c, model.JobTypeDeleteEmptyDraftsMigration, nil); appErr != nil {
|
||||
mlog.Fatal("failed to start job for deleting empty drafts", mlog.Err(appErr))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (a *App) DoAppMigrations() {
|
||||
a.Srv().doAppMigrations()
|
||||
}
|
||||
@@ -625,4 +646,5 @@ func (s *Server) doAppMigrations() {
|
||||
s.doPostPriorityConfigDefaultTrueMigration()
|
||||
s.doElasticsearchFixChannelIndex(c)
|
||||
s.doCloudS3PathMigrations(c)
|
||||
s.doDeleteEmptyDraftsMigration(c)
|
||||
}
|
||||
|
||||
@@ -40,6 +40,7 @@ import (
|
||||
"github.com/mattermost/mattermost/server/v8/channels/jobs"
|
||||
"github.com/mattermost/mattermost/server/v8/channels/jobs/active_users"
|
||||
"github.com/mattermost/mattermost/server/v8/channels/jobs/cleanup_desktop_tokens"
|
||||
"github.com/mattermost/mattermost/server/v8/channels/jobs/delete_empty_drafts_migration"
|
||||
"github.com/mattermost/mattermost/server/v8/channels/jobs/expirynotify"
|
||||
"github.com/mattermost/mattermost/server/v8/channels/jobs/export_delete"
|
||||
"github.com/mattermost/mattermost/server/v8/channels/jobs/export_process"
|
||||
@@ -1573,6 +1574,11 @@ func (s *Server) initJobs() {
|
||||
s3_path_migration.MakeWorker(s.Jobs, s.Store(), s.FileBackend()),
|
||||
nil)
|
||||
|
||||
s.Jobs.RegisterJobType(
|
||||
model.JobTypeDeleteEmptyDraftsMigration,
|
||||
delete_empty_drafts_migration.MakeWorker(s.Jobs, s.Store(), New(ServerConnector(s.Channels()))),
|
||||
nil)
|
||||
|
||||
s.Jobs.RegisterJobType(
|
||||
model.JobTypeExportDelete,
|
||||
export_delete.MakeWorker(s.Jobs, New(ServerConnector(s.Channels()))),
|
||||
|
||||
236
server/channels/jobs/batch_migration_worker.go
Normal file
236
server/channels/jobs/batch_migration_worker.go
Normal file
@@ -0,0 +1,236 @@
|
||||
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
|
||||
// See LICENSE.txt for license information.
|
||||
|
||||
package jobs
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/mattermost/mattermost/server/public/model"
|
||||
"github.com/mattermost/mattermost/server/public/shared/mlog"
|
||||
"github.com/mattermost/mattermost/server/public/shared/request"
|
||||
"github.com/mattermost/mattermost/server/v8/channels/store"
|
||||
)
|
||||
|
||||
type BatchMigrationWorkerAppIFace interface {
|
||||
GetClusterStatus() []*model.ClusterInfo
|
||||
}
|
||||
|
||||
// BatchMigrationWorker processes database migration jobs in batches to help avoid table locks.
|
||||
//
|
||||
// It uses the jobs infrastructure to ensure only one node in the cluster runs the migration at
|
||||
// any given time, avoids running the migration until the cluster is uniform, and automatically
|
||||
// resets the migration if the cluster version diverges after starting.
|
||||
//
|
||||
// In principle, the job infrastructure is overkill for this kind of work, as there's a worker
|
||||
// created per migration. There's also complication with edge cases, like having to restart the
|
||||
// server in order to retry a failed migration job. Refactoring the job infrastructure is left as
|
||||
// a future exercise.
|
||||
type BatchMigrationWorker struct {
|
||||
jobServer *JobServer
|
||||
logger mlog.LoggerIFace
|
||||
store store.Store
|
||||
app BatchMigrationWorkerAppIFace
|
||||
|
||||
stop chan bool
|
||||
stopped chan bool
|
||||
jobs chan model.Job
|
||||
|
||||
migrationKey string
|
||||
timeBetweenBatches time.Duration
|
||||
doMigrationBatch func(data model.StringMap, store store.Store) (model.StringMap, bool, error)
|
||||
}
|
||||
|
||||
// MakeBatchMigrationWorker creates a worker to process the given migration batch function.
|
||||
func MakeBatchMigrationWorker(jobServer *JobServer, store store.Store, app BatchMigrationWorkerAppIFace, migrationKey string, timeBetweenBatches time.Duration, doMigrationBatch func(data model.StringMap, store store.Store) (model.StringMap, bool, error)) model.Worker {
|
||||
worker := &BatchMigrationWorker{
|
||||
jobServer: jobServer,
|
||||
logger: jobServer.Logger().With(mlog.String("worker_name", migrationKey)),
|
||||
store: store,
|
||||
app: app,
|
||||
stop: make(chan bool, 1),
|
||||
stopped: make(chan bool, 1),
|
||||
jobs: make(chan model.Job),
|
||||
migrationKey: migrationKey,
|
||||
timeBetweenBatches: timeBetweenBatches,
|
||||
doMigrationBatch: doMigrationBatch,
|
||||
}
|
||||
return worker
|
||||
}
|
||||
|
||||
// Run starts the worker dedicated to the unique migration batch job it will be given to process.
|
||||
func (worker *BatchMigrationWorker) Run() {
|
||||
worker.logger.Debug("Worker started")
|
||||
// We have to re-assign the stop channel again, because
|
||||
// it might happen that the job was restarted due to a config change.
|
||||
worker.stop = make(chan bool, 1)
|
||||
|
||||
defer func() {
|
||||
worker.logger.Debug("Worker finished")
|
||||
worker.stopped <- true
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-worker.stop:
|
||||
worker.logger.Debug("Worker received stop signal")
|
||||
return
|
||||
case job := <-worker.jobs:
|
||||
worker.DoJob(&job)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stop interrupts the worker even if the migration has not yet completed.
|
||||
func (worker *BatchMigrationWorker) Stop() {
|
||||
worker.logger.Debug("Worker stopping")
|
||||
close(worker.stop)
|
||||
<-worker.stopped
|
||||
}
|
||||
|
||||
// JobChannel is the means by which the jobs infrastructure provides the worker the job to execute.
|
||||
func (worker *BatchMigrationWorker) JobChannel() chan<- model.Job {
|
||||
return worker.jobs
|
||||
}
|
||||
|
||||
// IsEnabled is always true for batch migrations.
|
||||
func (worker *BatchMigrationWorker) IsEnabled(_ *model.Config) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// checkIsClusterInSync returns true if all nodes in the cluster are running the same version,
|
||||
// logging a warning on the first mismatch found.
|
||||
func (worker *BatchMigrationWorker) checkIsClusterInSync() bool {
|
||||
clusterStatus := worker.app.GetClusterStatus()
|
||||
for i := 1; i < len(clusterStatus); i++ {
|
||||
if clusterStatus[i].SchemaVersion != clusterStatus[0].SchemaVersion {
|
||||
worker.logger.Warn(
|
||||
"Worker: cluster not in sync",
|
||||
mlog.String("schema_version_a", clusterStatus[0].SchemaVersion),
|
||||
mlog.String("schema_version_b", clusterStatus[1].SchemaVersion),
|
||||
mlog.String("server_ip_a", clusterStatus[0].IPAddress),
|
||||
mlog.String("server_ip_b", clusterStatus[1].IPAddress),
|
||||
)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// DoJob executes the job picked up through the job channel.
|
||||
//
|
||||
// Note that this is a lot of distracting machinery here to claim the job, then double check the
|
||||
// status, and keep the status up to date in line with job infrastrcuture semantics. Unless an
|
||||
// error occurs, this worker should hold onto the job until its completed.
|
||||
func (worker *BatchMigrationWorker) DoJob(job *model.Job) {
|
||||
logger := worker.logger.With(mlog.Any("job", job))
|
||||
logger.Debug("Worker received a new candidate job.")
|
||||
defer worker.jobServer.HandleJobPanic(logger, job)
|
||||
|
||||
if claimed, err := worker.jobServer.ClaimJob(job); err != nil {
|
||||
logger.Warn("Worker experienced an error while trying to claim job", mlog.Err(err))
|
||||
return
|
||||
} else if !claimed {
|
||||
return
|
||||
}
|
||||
|
||||
c := request.EmptyContext(logger)
|
||||
var appErr *model.AppError
|
||||
|
||||
// We get the job again because ClaimJob changes the job status.
|
||||
job, appErr = worker.jobServer.GetJob(c, job.Id)
|
||||
if appErr != nil {
|
||||
worker.logger.Error("Worker: job execution error", mlog.Err(appErr))
|
||||
worker.setJobError(logger, job, appErr)
|
||||
return
|
||||
}
|
||||
|
||||
if job.Data == nil {
|
||||
job.Data = make(model.StringMap)
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-worker.stop:
|
||||
logger.Info("Worker: Migration has been canceled via Worker Stop. Setting the job back to pending.")
|
||||
if err := worker.jobServer.SetJobPending(job); err != nil {
|
||||
worker.logger.Error("Worker: Failed to mark job as pending", mlog.Err(err))
|
||||
}
|
||||
return
|
||||
case <-time.After(worker.timeBetweenBatches):
|
||||
// Ensure the cluster remains in sync, otherwise we restart the job to
|
||||
// ensure a complete migration. Technically, the cluster could go out of
|
||||
// sync briefly within a batch, but we accept that risk.
|
||||
if !worker.checkIsClusterInSync() {
|
||||
worker.logger.Warn("Worker: Resetting job")
|
||||
worker.resetJob(logger, job)
|
||||
return
|
||||
}
|
||||
|
||||
nextData, done, err := worker.doMigrationBatch(job.Data, worker.store)
|
||||
if err != nil {
|
||||
worker.logger.Error("Worker: Failed to do migration batch. Exiting", mlog.Err(err))
|
||||
worker.setJobError(logger, job, model.NewAppError("doMigrationBatch", model.NoTranslation, nil, "", http.StatusInternalServerError).Wrap(err))
|
||||
return
|
||||
} else if done {
|
||||
logger.Info("Worker: Job is complete")
|
||||
worker.setJobSuccess(logger, job)
|
||||
worker.markAsComplete()
|
||||
return
|
||||
}
|
||||
|
||||
job.Data = nextData
|
||||
|
||||
// Migrations currently don't support reporting meaningful progress.
|
||||
worker.jobServer.SetJobProgress(job, 0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// resetJob erases the data tracking the next batch to execute and returns the job status to
|
||||
// pending to allow the job infrastructure to requeue it.
|
||||
func (worker *BatchMigrationWorker) resetJob(logger mlog.LoggerIFace, job *model.Job) {
|
||||
job.Data = nil
|
||||
job.Progress = 0
|
||||
job.Status = model.JobStatusPending
|
||||
|
||||
if _, err := worker.store.Job().UpdateOptimistically(job, model.JobStatusInProgress); err != nil {
|
||||
worker.logger.Error("Worker: Failed to reset job data. May resume instead of restarting.", mlog.Err(err))
|
||||
}
|
||||
}
|
||||
|
||||
// setJobSuccess records the job as successful.
|
||||
func (worker *BatchMigrationWorker) setJobSuccess(logger mlog.LoggerIFace, job *model.Job) {
|
||||
if err := worker.jobServer.SetJobProgress(job, 100); err != nil {
|
||||
logger.Error("Worker: Failed to update progress for job", mlog.Err(err))
|
||||
worker.setJobError(logger, job, err)
|
||||
}
|
||||
|
||||
if err := worker.jobServer.SetJobSuccess(job); err != nil {
|
||||
logger.Error("Worker: Failed to set success for job", mlog.Err(err))
|
||||
worker.setJobError(logger, job, err)
|
||||
}
|
||||
}
|
||||
|
||||
// setJobError puts the job into an error state, preventing the job from running again.
|
||||
func (worker *BatchMigrationWorker) setJobError(logger mlog.LoggerIFace, job *model.Job, appError *model.AppError) {
|
||||
if err := worker.jobServer.SetJobError(job, appError); err != nil {
|
||||
logger.Error("Worker: Failed to set job error", mlog.Err(err))
|
||||
}
|
||||
}
|
||||
|
||||
// markAsComplete records a discrete migration key to prevent this job from ever running again.
|
||||
func (worker *BatchMigrationWorker) markAsComplete() {
|
||||
system := model.System{
|
||||
Name: worker.migrationKey,
|
||||
Value: "true",
|
||||
}
|
||||
|
||||
// Note that if this fails, then the job would have still succeeded. We will spuriously
|
||||
// run the job again in the future, but as migrations are idempotent it won't be an issue.
|
||||
if err := worker.jobServer.Store.System().Save(&system); err != nil {
|
||||
worker.logger.Error("Worker: Failed to mark migration as completed in the systems table.", mlog.String("migration_key", worker.migrationKey), mlog.Err(err))
|
||||
}
|
||||
}
|
||||
310
server/channels/jobs/batch_migration_worker_test.go
Normal file
310
server/channels/jobs/batch_migration_worker_test.go
Normal file
@@ -0,0 +1,310 @@
|
||||
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
|
||||
// See LICENSE.txt for license information.
|
||||
|
||||
package jobs_test
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/mattermost/mattermost/server/public/model"
|
||||
"github.com/mattermost/mattermost/server/public/shared/mlog"
|
||||
"github.com/mattermost/mattermost/server/public/shared/request"
|
||||
"github.com/mattermost/mattermost/server/v8/channels/jobs"
|
||||
"github.com/mattermost/mattermost/server/v8/channels/store"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type MockApp struct {
|
||||
clusterInfo []*model.ClusterInfo
|
||||
}
|
||||
|
||||
func (ma MockApp) GetClusterStatus() []*model.ClusterInfo {
|
||||
return ma.clusterInfo
|
||||
}
|
||||
|
||||
func (ma *MockApp) SetInSync() {
|
||||
ma.clusterInfo = nil
|
||||
ma.clusterInfo = append(ma.clusterInfo, &model.ClusterInfo{
|
||||
SchemaVersion: "a",
|
||||
})
|
||||
ma.clusterInfo = append(ma.clusterInfo, &model.ClusterInfo{
|
||||
SchemaVersion: "a",
|
||||
})
|
||||
}
|
||||
|
||||
func (ma *MockApp) SetOutOfSync() {
|
||||
ma.clusterInfo = nil
|
||||
ma.clusterInfo = append(ma.clusterInfo, &model.ClusterInfo{
|
||||
SchemaVersion: "a",
|
||||
})
|
||||
ma.clusterInfo = append(ma.clusterInfo, &model.ClusterInfo{
|
||||
SchemaVersion: "b",
|
||||
})
|
||||
}
|
||||
|
||||
func TestBatchMigrationWorker(t *testing.T) {
|
||||
waitDone := func(t *testing.T, done chan bool, msg string) {
|
||||
t.Helper()
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
select {
|
||||
case <-done:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}, 5*time.Second, 100*time.Millisecond, msg)
|
||||
}
|
||||
|
||||
setupBatchWorker := func(t *testing.T, th *TestHelper, mockApp *MockApp, doMigrationBatch func(model.StringMap, store.Store) (model.StringMap, bool, error)) (model.Worker, *model.Job) {
|
||||
t.Helper()
|
||||
|
||||
migrationKey := model.NewId()
|
||||
timeBetweenBatches := 1 * time.Second
|
||||
|
||||
worker := jobs.MakeBatchMigrationWorker(
|
||||
th.Server.Jobs,
|
||||
th.Server.Store(),
|
||||
mockApp,
|
||||
migrationKey,
|
||||
timeBetweenBatches,
|
||||
doMigrationBatch,
|
||||
)
|
||||
th.Server.Jobs.RegisterJobType(migrationKey, worker, nil)
|
||||
|
||||
ctx := request.EmptyContext(mlog.CreateConsoleTestLogger(t))
|
||||
job, appErr := th.Server.Jobs.CreateJob(ctx, migrationKey, nil)
|
||||
require.Nil(t, appErr)
|
||||
|
||||
done := make(chan bool)
|
||||
go func() {
|
||||
defer close(done)
|
||||
worker.Run()
|
||||
}()
|
||||
|
||||
// When ending the test, ensure we wait for the worker to finish.
|
||||
t.Cleanup(func() {
|
||||
waitDone(t, done, "worker did not stop running")
|
||||
})
|
||||
|
||||
// Give the worker time to start running
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
return worker, job
|
||||
}
|
||||
|
||||
stopWorker := func(t *testing.T, worker model.Worker) {
|
||||
t.Helper()
|
||||
|
||||
stopped := make(chan bool, 1)
|
||||
go func() {
|
||||
worker.Stop()
|
||||
close(stopped)
|
||||
}()
|
||||
|
||||
waitDone(t, stopped, "worker did not stop")
|
||||
}
|
||||
|
||||
waitForJobStatus := func(t *testing.T, th *TestHelper, job *model.Job, status string) {
|
||||
t.Helper()
|
||||
|
||||
require.Eventuallyf(t, func() bool {
|
||||
ctx := request.EmptyContext(mlog.CreateConsoleTestLogger(t))
|
||||
actualJob, appErr := th.Server.Jobs.GetJob(ctx, job.Id)
|
||||
require.Nil(t, appErr)
|
||||
require.Equal(t, job.Id, actualJob.Id)
|
||||
|
||||
return actualJob.Status == status
|
||||
}, 5*time.Second, 250*time.Millisecond, "job never transitioned to %s", status)
|
||||
}
|
||||
|
||||
assertJobReset := func(t *testing.T, th *TestHelper, job *model.Job) {
|
||||
ctx := request.EmptyContext(mlog.CreateConsoleTestLogger(t))
|
||||
actualJob, appErr := th.Server.Jobs.GetJob(ctx, job.Id)
|
||||
require.Nil(t, appErr)
|
||||
assert.Empty(t, actualJob.Progress)
|
||||
assert.Empty(t, actualJob.Data)
|
||||
}
|
||||
|
||||
getBatchNumberFromData := func(t *testing.T, data model.StringMap) int {
|
||||
t.Helper()
|
||||
|
||||
if data["batch_number"] == "" {
|
||||
data["batch_number"] = "1"
|
||||
}
|
||||
batchNumber, err := strconv.Atoi(data["batch_number"])
|
||||
require.NoError(t, err)
|
||||
|
||||
return batchNumber
|
||||
}
|
||||
|
||||
getDataFromBatchNumber := func(batchNumber int) model.StringMap {
|
||||
data := make(model.StringMap)
|
||||
data["batch_number"] = strconv.Itoa(batchNumber)
|
||||
|
||||
return data
|
||||
}
|
||||
|
||||
t.Run("clusters not in sync before first batch", func(t *testing.T) {
|
||||
th := Setup(t).InitBasic()
|
||||
defer th.TearDown()
|
||||
|
||||
mockApp := &MockApp{}
|
||||
mockApp.SetOutOfSync()
|
||||
|
||||
var worker model.Worker
|
||||
var job *model.Job
|
||||
worker, job = setupBatchWorker(t, th, mockApp, func(model.StringMap, store.Store) (model.StringMap, bool, error) {
|
||||
require.Fail(t, "migration batch should never run while clusters not in sync")
|
||||
|
||||
return nil, false, nil
|
||||
})
|
||||
|
||||
// Give the worker time to start running
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
// Queue the work to be done
|
||||
worker.JobChannel() <- *job
|
||||
|
||||
waitForJobStatus(t, th, job, model.JobStatusPending)
|
||||
assertJobReset(t, th, job)
|
||||
|
||||
stopWorker(t, worker)
|
||||
})
|
||||
|
||||
t.Run("stop after first batch", func(t *testing.T) {
|
||||
th := Setup(t).InitBasic()
|
||||
defer th.TearDown()
|
||||
|
||||
mockApp := &MockApp{}
|
||||
|
||||
var worker model.Worker
|
||||
var job *model.Job
|
||||
worker, job = setupBatchWorker(t, th, mockApp, func(data model.StringMap, s store.Store) (model.StringMap, bool, error) {
|
||||
batchNumber := getBatchNumberFromData(t, data)
|
||||
|
||||
require.Equal(t, 1, batchNumber, "only batch 1 should have run")
|
||||
|
||||
// Shut down the worker after the first batch to prevent subsequent ones.
|
||||
go worker.Stop()
|
||||
|
||||
batchNumber++
|
||||
|
||||
return getDataFromBatchNumber(batchNumber), false, nil
|
||||
})
|
||||
|
||||
// Queue the work to be done
|
||||
worker.JobChannel() <- *job
|
||||
|
||||
waitForJobStatus(t, th, job, model.JobStatusPending)
|
||||
})
|
||||
|
||||
t.Run("stop after second batch", func(t *testing.T) {
|
||||
th := Setup(t).InitBasic()
|
||||
defer th.TearDown()
|
||||
|
||||
mockApp := &MockApp{}
|
||||
|
||||
var worker model.Worker
|
||||
var job *model.Job
|
||||
worker, job = setupBatchWorker(t, th, mockApp, func(data model.StringMap, s store.Store) (model.StringMap, bool, error) {
|
||||
batchNumber := getBatchNumberFromData(t, data)
|
||||
|
||||
require.LessOrEqual(t, batchNumber, 2, "only batches 1 and 2 should have run")
|
||||
|
||||
// Shut down the worker after the first batch to prevent subsequent ones.
|
||||
go worker.Stop()
|
||||
batchNumber++
|
||||
|
||||
return getDataFromBatchNumber(batchNumber), false, nil
|
||||
})
|
||||
|
||||
// Queue the work to be done
|
||||
worker.JobChannel() <- *job
|
||||
|
||||
waitForJobStatus(t, th, job, model.JobStatusPending)
|
||||
})
|
||||
|
||||
t.Run("clusters not in sync after first batch", func(t *testing.T) {
|
||||
th := Setup(t).InitBasic()
|
||||
defer th.TearDown()
|
||||
|
||||
mockApp := &MockApp{}
|
||||
|
||||
var worker model.Worker
|
||||
var job *model.Job
|
||||
worker, job = setupBatchWorker(t, th, mockApp, func(data model.StringMap, s store.Store) (model.StringMap, bool, error) {
|
||||
batchNumber := getBatchNumberFromData(t, data)
|
||||
require.Equal(t, 1, batchNumber, "only batch 1 should have run")
|
||||
|
||||
mockApp.SetOutOfSync()
|
||||
batchNumber++
|
||||
|
||||
return getDataFromBatchNumber(batchNumber), false, nil
|
||||
})
|
||||
|
||||
// Give the worker time to start running
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
// Queue the work to be done
|
||||
worker.JobChannel() <- *job
|
||||
|
||||
waitForJobStatus(t, th, job, model.JobStatusPending)
|
||||
assertJobReset(t, th, job)
|
||||
|
||||
stopWorker(t, worker)
|
||||
})
|
||||
|
||||
t.Run("done after first batch", func(t *testing.T) {
|
||||
th := Setup(t).InitBasic()
|
||||
defer th.TearDown()
|
||||
|
||||
mockApp := &MockApp{}
|
||||
|
||||
var worker model.Worker
|
||||
var job *model.Job
|
||||
worker, job = setupBatchWorker(t, th, mockApp, func(data model.StringMap, s store.Store) (model.StringMap, bool, error) {
|
||||
batchNumber := getBatchNumberFromData(t, data)
|
||||
require.Equal(t, 1, batchNumber, "only batch 1 should have run")
|
||||
|
||||
// Shut down the worker after the first batch to prevent subsequent ones.
|
||||
go worker.Stop()
|
||||
batchNumber++
|
||||
|
||||
return getDataFromBatchNumber(batchNumber), true, nil
|
||||
})
|
||||
|
||||
// Queue the work to be done
|
||||
worker.JobChannel() <- *job
|
||||
|
||||
waitForJobStatus(t, th, job, model.JobStatusSuccess)
|
||||
})
|
||||
|
||||
t.Run("done after three batches", func(t *testing.T) {
|
||||
th := Setup(t).InitBasic()
|
||||
defer th.TearDown()
|
||||
|
||||
mockApp := &MockApp{}
|
||||
|
||||
var worker model.Worker
|
||||
var job *model.Job
|
||||
worker, job = setupBatchWorker(t, th, mockApp, func(data model.StringMap, s store.Store) (model.StringMap, bool, error) {
|
||||
batchNumber := getBatchNumberFromData(t, data)
|
||||
require.LessOrEqual(t, batchNumber, 3, "only 3 batches should have run")
|
||||
|
||||
// Shut down the worker after the first batch to prevent subsequent ones.
|
||||
go worker.Stop()
|
||||
batchNumber++
|
||||
|
||||
return getDataFromBatchNumber(batchNumber), true, nil
|
||||
})
|
||||
|
||||
// Queue the work to be done
|
||||
worker.JobChannel() <- *job
|
||||
|
||||
waitForJobStatus(t, th, job, model.JobStatusSuccess)
|
||||
})
|
||||
}
|
||||
@@ -0,0 +1,85 @@
|
||||
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
|
||||
// See LICENSE.txt for license information.
|
||||
|
||||
package delete_empty_drafts_migration
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/mattermost/mattermost/server/public/model"
|
||||
"github.com/mattermost/mattermost/server/v8/channels/jobs"
|
||||
"github.com/mattermost/mattermost/server/v8/channels/store"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
const (
|
||||
timeBetweenBatches = 1 * time.Second
|
||||
)
|
||||
|
||||
// MakeWorker creates a batch migration worker to delete empty drafts.
|
||||
func MakeWorker(jobServer *jobs.JobServer, store store.Store, app jobs.BatchMigrationWorkerAppIFace) model.Worker {
|
||||
return jobs.MakeBatchMigrationWorker(
|
||||
jobServer,
|
||||
store,
|
||||
app,
|
||||
model.MigrationKeyDeleteEmptyDrafts,
|
||||
timeBetweenBatches,
|
||||
doDeleteEmptyDraftsMigrationBatch,
|
||||
)
|
||||
}
|
||||
|
||||
// parseJobMetadata parses the opaque job metadata to return the information needed to decide which
|
||||
// batch to process next.
|
||||
func parseJobMetadata(data model.StringMap) (int64, string, error) {
|
||||
createAt := int64(0)
|
||||
if data["create_at"] != "" {
|
||||
parsedCreateAt, parseErr := strconv.ParseInt(data["create_at"], 10, 64)
|
||||
if parseErr != nil {
|
||||
return 0, "", errors.Wrap(parseErr, "failed to parse create_at")
|
||||
}
|
||||
createAt = parsedCreateAt
|
||||
}
|
||||
|
||||
userID := data["user_id"]
|
||||
|
||||
return createAt, userID, nil
|
||||
}
|
||||
|
||||
// makeJobMetadata encodes the information needed to decide which batch to process next back into
|
||||
// the opaque job metadata.
|
||||
func makeJobMetadata(createAt int64, userID string) model.StringMap {
|
||||
data := make(model.StringMap)
|
||||
data["create_at"] = strconv.FormatInt(createAt, 10)
|
||||
data["user_id"] = userID
|
||||
|
||||
return data
|
||||
}
|
||||
|
||||
// doDeleteEmptyDraftsMigrationBatch iterates through all drafts, deleting empty drafts within each
|
||||
// batch keyed by the compound primary key (createAt, userID)
|
||||
func doDeleteEmptyDraftsMigrationBatch(data model.StringMap, store store.Store) (model.StringMap, bool, error) {
|
||||
createAt, userID, err := parseJobMetadata(data)
|
||||
if err != nil {
|
||||
return nil, false, errors.Wrap(err, "failed to parse job metadata")
|
||||
}
|
||||
|
||||
// Determine the /next/ (createAt, userId) by finding the last record in the batch we're
|
||||
// about to delete.
|
||||
nextCreateAt, nextUserID, err := store.Draft().GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt, userID)
|
||||
if err != nil {
|
||||
return nil, false, errors.Wrapf(err, "failed to get the next batch (create_at=%v, user_id=%v)", createAt, userID)
|
||||
}
|
||||
|
||||
// If we get the nil values, it means the batch was empty and we're done.
|
||||
if nextCreateAt == 0 && nextUserID == "" {
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
err = store.Draft().DeleteEmptyDraftsByCreateAtAndUserId(createAt, userID)
|
||||
if err != nil {
|
||||
return nil, false, errors.Wrapf(err, "failed to delete empty drafts (create_at=%v, user_id=%v)", createAt, userID)
|
||||
}
|
||||
|
||||
return makeJobMetadata(nextCreateAt, nextUserID), false, nil
|
||||
}
|
||||
@@ -0,0 +1,180 @@
|
||||
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
|
||||
// See LICENSE.txt for license information.
|
||||
|
||||
package delete_empty_drafts_migration
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/mattermost/mattermost/server/public/model"
|
||||
"github.com/mattermost/mattermost/server/v8/channels/store/storetest"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestJobMetadata(t *testing.T) {
|
||||
t.Run("parse nil data", func(t *testing.T) {
|
||||
var data model.StringMap
|
||||
createAt, userID, err := parseJobMetadata(data)
|
||||
require.NoError(t, err)
|
||||
assert.Empty(t, createAt)
|
||||
assert.Empty(t, userID)
|
||||
})
|
||||
|
||||
t.Run("parse invalid create_at", func(t *testing.T) {
|
||||
data := make(model.StringMap)
|
||||
data["user_id"] = "user_id"
|
||||
data["create_at"] = "invalid"
|
||||
_, _, err := parseJobMetadata(data)
|
||||
require.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("parse valid", func(t *testing.T) {
|
||||
data := make(model.StringMap)
|
||||
data["user_id"] = "user_id"
|
||||
data["create_at"] = "1695918431"
|
||||
|
||||
createAt, userID, err := parseJobMetadata(data)
|
||||
require.NoError(t, err)
|
||||
assert.EqualValues(t, 1695918431, createAt)
|
||||
assert.Equal(t, "user_id", userID)
|
||||
})
|
||||
|
||||
t.Run("parse/make", func(t *testing.T) {
|
||||
data := makeJobMetadata(1695918431, "user_id")
|
||||
assert.Equal(t, "1695918431", data["create_at"])
|
||||
assert.Equal(t, "user_id", data["user_id"])
|
||||
|
||||
createAt, userID, err := parseJobMetadata(data)
|
||||
require.NoError(t, err)
|
||||
assert.EqualValues(t, 1695918431, createAt)
|
||||
assert.Equal(t, "user_id", userID)
|
||||
})
|
||||
}
|
||||
|
||||
func TestDoDeleteEmptyDraftsMigrationBatch(t *testing.T) {
|
||||
t.Run("invalid job metadata", func(t *testing.T) {
|
||||
mockStore := &storetest.Store{}
|
||||
t.Cleanup(func() {
|
||||
mockStore.AssertExpectations(t)
|
||||
})
|
||||
|
||||
data := make(model.StringMap)
|
||||
data["user_id"] = "user_id"
|
||||
data["create_at"] = "invalid"
|
||||
data, done, err := doDeleteEmptyDraftsMigrationBatch(data, mockStore)
|
||||
require.Error(t, err)
|
||||
assert.False(t, done)
|
||||
assert.Nil(t, data)
|
||||
})
|
||||
|
||||
t.Run("failure getting next offset", func(t *testing.T) {
|
||||
mockStore := &storetest.Store{}
|
||||
t.Cleanup(func() {
|
||||
mockStore.AssertExpectations(t)
|
||||
})
|
||||
|
||||
createAt, userID := int64(1695920000), "user_id_1"
|
||||
nextCreateAt, nextUserID := int64(0), ""
|
||||
|
||||
mockStore.DraftStore.On("GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration", createAt, userID).Return(nextCreateAt, nextUserID, errors.New("failure"))
|
||||
|
||||
data, done, err := doDeleteEmptyDraftsMigrationBatch(makeJobMetadata(createAt, userID), mockStore)
|
||||
require.EqualError(t, err, "failed to get the next batch (create_at=1695920000, user_id=user_id_1): failure")
|
||||
assert.False(t, done)
|
||||
assert.Nil(t, data)
|
||||
})
|
||||
|
||||
t.Run("failure deleting batch", func(t *testing.T) {
|
||||
mockStore := &storetest.Store{}
|
||||
t.Cleanup(func() {
|
||||
mockStore.AssertExpectations(t)
|
||||
})
|
||||
|
||||
createAt, userID := int64(1695920000), "user_id_1"
|
||||
nextCreateAt, nextUserID := int64(1695922034), "user_id_2"
|
||||
|
||||
mockStore.DraftStore.On("GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration", createAt, userID).Return(nextCreateAt, nextUserID, nil)
|
||||
mockStore.DraftStore.On("DeleteEmptyDraftsByCreateAtAndUserId", createAt, userID).Return(errors.New("failure"))
|
||||
|
||||
data, done, err := doDeleteEmptyDraftsMigrationBatch(makeJobMetadata(createAt, userID), mockStore)
|
||||
require.EqualError(t, err, "failed to delete empty drafts (create_at=1695920000, user_id=user_id_1): failure")
|
||||
assert.False(t, done)
|
||||
assert.Nil(t, data)
|
||||
})
|
||||
|
||||
t.Run("do first batch (nil job metadata)", func(t *testing.T) {
|
||||
mockStore := &storetest.Store{}
|
||||
t.Cleanup(func() {
|
||||
mockStore.AssertExpectations(t)
|
||||
})
|
||||
|
||||
createAt, userID := int64(0), ""
|
||||
nextCreateAt, nextUserID := int64(1695922034), "user_id_2"
|
||||
|
||||
mockStore.DraftStore.On("GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration", createAt, userID).Return(nextCreateAt, nextUserID, nil)
|
||||
mockStore.DraftStore.On("DeleteEmptyDraftsByCreateAtAndUserId", createAt, userID).Return(nil)
|
||||
|
||||
data, done, err := doDeleteEmptyDraftsMigrationBatch(nil, mockStore)
|
||||
require.NoError(t, err)
|
||||
assert.False(t, done)
|
||||
assert.Equal(t, model.StringMap{
|
||||
"create_at": "1695922034",
|
||||
"user_id": "user_id_2",
|
||||
}, data)
|
||||
})
|
||||
|
||||
t.Run("do first batch (empty job metadata)", func(t *testing.T) {
|
||||
mockStore := &storetest.Store{}
|
||||
t.Cleanup(func() {
|
||||
mockStore.AssertExpectations(t)
|
||||
})
|
||||
|
||||
createAt, userID := int64(0), ""
|
||||
nextCreateAt, nextUserID := int64(1695922034), "user_id_2"
|
||||
|
||||
mockStore.DraftStore.On("GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration", createAt, userID).Return(nextCreateAt, nextUserID, nil)
|
||||
mockStore.DraftStore.On("DeleteEmptyDraftsByCreateAtAndUserId", createAt, userID).Return(nil)
|
||||
|
||||
data, done, err := doDeleteEmptyDraftsMigrationBatch(model.StringMap{}, mockStore)
|
||||
require.NoError(t, err)
|
||||
assert.False(t, done)
|
||||
assert.Equal(t, makeJobMetadata(nextCreateAt, nextUserID), data)
|
||||
})
|
||||
|
||||
t.Run("do batch", func(t *testing.T) {
|
||||
mockStore := &storetest.Store{}
|
||||
t.Cleanup(func() {
|
||||
mockStore.AssertExpectations(t)
|
||||
})
|
||||
|
||||
createAt, userID := int64(1695922000), "user_id_1"
|
||||
nextCreateAt, nextUserID := int64(1695922034), "user_id_2"
|
||||
|
||||
mockStore.DraftStore.On("GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration", createAt, userID).Return(nextCreateAt, nextUserID, nil)
|
||||
mockStore.DraftStore.On("DeleteEmptyDraftsByCreateAtAndUserId", createAt, userID).Return(nil)
|
||||
|
||||
data, done, err := doDeleteEmptyDraftsMigrationBatch(makeJobMetadata(createAt, userID), mockStore)
|
||||
require.NoError(t, err)
|
||||
assert.False(t, done)
|
||||
assert.Equal(t, makeJobMetadata(nextCreateAt, nextUserID), data)
|
||||
})
|
||||
|
||||
t.Run("done batches", func(t *testing.T) {
|
||||
mockStore := &storetest.Store{}
|
||||
t.Cleanup(func() {
|
||||
mockStore.AssertExpectations(t)
|
||||
})
|
||||
|
||||
createAt, userID := int64(1695922000), "user_id_1"
|
||||
nextCreateAt, nextUserID := int64(0), ""
|
||||
|
||||
mockStore.DraftStore.On("GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration", createAt, userID).Return(nextCreateAt, nextUserID, nil)
|
||||
|
||||
data, done, err := doDeleteEmptyDraftsMigrationBatch(makeJobMetadata(createAt, userID), mockStore)
|
||||
require.NoError(t, err)
|
||||
assert.True(t, done)
|
||||
assert.Nil(t, data)
|
||||
})
|
||||
}
|
||||
223
server/channels/jobs/helper_test.go
Normal file
223
server/channels/jobs/helper_test.go
Normal file
@@ -0,0 +1,223 @@
|
||||
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
|
||||
// See LICENSE.txt for license information.
|
||||
|
||||
package jobs_test
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/mattermost/mattermost/server/public/model"
|
||||
"github.com/mattermost/mattermost/server/public/shared/mlog"
|
||||
"github.com/mattermost/mattermost/server/public/shared/request"
|
||||
"github.com/mattermost/mattermost/server/v8/channels/app"
|
||||
"github.com/mattermost/mattermost/server/v8/channels/store"
|
||||
"github.com/mattermost/mattermost/server/v8/config"
|
||||
)
|
||||
|
||||
type TestHelper struct {
|
||||
App *app.App
|
||||
Context *request.Context
|
||||
Server *app.Server
|
||||
BasicTeam *model.Team
|
||||
BasicUser *model.User
|
||||
BasicUser2 *model.User
|
||||
|
||||
SystemAdminUser *model.User
|
||||
LogBuffer *mlog.Buffer
|
||||
TestLogger *mlog.Logger
|
||||
IncludeCacheLayer bool
|
||||
ConfigStore *config.Store
|
||||
|
||||
tempWorkspace string
|
||||
}
|
||||
|
||||
func setupTestHelper(dbStore store.Store, enterprise bool, includeCacheLayer bool, options []app.Option, tb testing.TB) *TestHelper {
|
||||
tempWorkspace, err := os.MkdirTemp("", "jobstest")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
configStore := config.NewTestMemoryStore()
|
||||
memoryConfig := configStore.Get()
|
||||
memoryConfig.SqlSettings = *mainHelper.GetSQLSettings()
|
||||
*memoryConfig.PluginSettings.Directory = filepath.Join(tempWorkspace, "plugins")
|
||||
*memoryConfig.PluginSettings.ClientDirectory = filepath.Join(tempWorkspace, "webapp")
|
||||
*memoryConfig.PluginSettings.AutomaticPrepackagedPlugins = false
|
||||
*memoryConfig.LogSettings.EnableSentry = false // disable error reporting during tests
|
||||
*memoryConfig.AnnouncementSettings.AdminNoticesEnabled = false
|
||||
*memoryConfig.AnnouncementSettings.UserNoticesEnabled = false
|
||||
configStore.Set(memoryConfig)
|
||||
|
||||
buffer := &mlog.Buffer{}
|
||||
|
||||
options = append(options, app.ConfigStore(configStore))
|
||||
if includeCacheLayer {
|
||||
// Adds the cache layer to the test store
|
||||
options = append(options, app.StoreOverrideWithCache(dbStore))
|
||||
} else {
|
||||
options = append(options, app.StoreOverride(dbStore))
|
||||
}
|
||||
|
||||
testLogger, _ := mlog.NewLogger()
|
||||
logCfg, _ := config.MloggerConfigFromLoggerConfig(&memoryConfig.LogSettings, nil, config.GetLogFileLocation)
|
||||
if errCfg := testLogger.ConfigureTargets(logCfg, nil); errCfg != nil {
|
||||
panic("failed to configure test logger: " + errCfg.Error())
|
||||
}
|
||||
if errW := mlog.AddWriterTarget(testLogger, buffer, true, mlog.StdAll...); errW != nil {
|
||||
panic("failed to add writer target to test logger: " + errW.Error())
|
||||
}
|
||||
// lock logger config so server init cannot override it during testing.
|
||||
testLogger.LockConfiguration()
|
||||
options = append(options, app.SetLogger(testLogger))
|
||||
|
||||
s, err := app.NewServer(options...)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
th := &TestHelper{
|
||||
App: app.New(app.ServerConnector(s.Channels())),
|
||||
Context: request.EmptyContext(testLogger),
|
||||
Server: s,
|
||||
LogBuffer: buffer,
|
||||
TestLogger: testLogger,
|
||||
IncludeCacheLayer: includeCacheLayer,
|
||||
ConfigStore: configStore,
|
||||
}
|
||||
th.Context.SetLogger(testLogger)
|
||||
|
||||
prevListenAddress := *th.App.Config().ServiceSettings.ListenAddress
|
||||
th.App.UpdateConfig(func(cfg *model.Config) { *cfg.ServiceSettings.ListenAddress = "localhost:0" })
|
||||
serverErr := th.Server.Start()
|
||||
if serverErr != nil {
|
||||
panic(serverErr)
|
||||
}
|
||||
|
||||
th.App.UpdateConfig(func(cfg *model.Config) { *cfg.ServiceSettings.ListenAddress = prevListenAddress })
|
||||
|
||||
th.App.Srv().Store().MarkSystemRanUnitTests()
|
||||
|
||||
return th
|
||||
}
|
||||
|
||||
func Setup(tb testing.TB, options ...app.Option) *TestHelper {
|
||||
if testing.Short() {
|
||||
tb.SkipNow()
|
||||
}
|
||||
dbStore := mainHelper.GetStore()
|
||||
dbStore.DropAllTables()
|
||||
dbStore.MarkSystemRanUnitTests()
|
||||
mainHelper.PreloadMigrations()
|
||||
|
||||
return setupTestHelper(dbStore, false, true, options, tb)
|
||||
}
|
||||
|
||||
var initBasicOnce sync.Once
|
||||
var userCache struct {
|
||||
SystemAdminUser *model.User
|
||||
BasicUser *model.User
|
||||
BasicUser2 *model.User
|
||||
}
|
||||
|
||||
func (th *TestHelper) InitBasic() *TestHelper {
|
||||
// create users once and cache them because password hashing is slow
|
||||
initBasicOnce.Do(func() {
|
||||
th.SystemAdminUser = th.CreateUser()
|
||||
th.App.UpdateUserRoles(th.Context, th.SystemAdminUser.Id, model.SystemUserRoleId+" "+model.SystemAdminRoleId, false)
|
||||
th.SystemAdminUser, _ = th.App.GetUser(th.SystemAdminUser.Id)
|
||||
userCache.SystemAdminUser = th.SystemAdminUser.DeepCopy()
|
||||
|
||||
th.BasicUser = th.CreateUser()
|
||||
th.BasicUser, _ = th.App.GetUser(th.BasicUser.Id)
|
||||
userCache.BasicUser = th.BasicUser.DeepCopy()
|
||||
|
||||
th.BasicUser2 = th.CreateUser()
|
||||
th.BasicUser2, _ = th.App.GetUser(th.BasicUser2.Id)
|
||||
userCache.BasicUser2 = th.BasicUser2.DeepCopy()
|
||||
})
|
||||
// restore cached users
|
||||
th.SystemAdminUser = userCache.SystemAdminUser.DeepCopy()
|
||||
th.BasicUser = userCache.BasicUser.DeepCopy()
|
||||
th.BasicUser2 = userCache.BasicUser2.DeepCopy()
|
||||
|
||||
users := []*model.User{th.SystemAdminUser, th.BasicUser, th.BasicUser2}
|
||||
mainHelper.GetSQLStore().User().InsertUsers(users)
|
||||
|
||||
th.BasicTeam = th.CreateTeam()
|
||||
return th
|
||||
}
|
||||
|
||||
func (th *TestHelper) CreateTeam() *model.Team {
|
||||
id := model.NewId()
|
||||
team := &model.Team{
|
||||
DisplayName: "dn_" + id,
|
||||
Name: "name" + id,
|
||||
Email: "success+" + id + "@simulator.amazonses.com",
|
||||
Type: model.TeamOpen,
|
||||
}
|
||||
|
||||
var err *model.AppError
|
||||
if team, err = th.App.CreateTeam(th.Context, team); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return team
|
||||
}
|
||||
|
||||
func (th *TestHelper) CreateUser() *model.User {
|
||||
return th.CreateUserOrGuest(false)
|
||||
}
|
||||
|
||||
func (th *TestHelper) CreateUserOrGuest(guest bool) *model.User {
|
||||
id := model.NewId()
|
||||
|
||||
user := &model.User{
|
||||
Email: "success+" + id + "@simulator.amazonses.com",
|
||||
Username: "un_" + id,
|
||||
Nickname: "nn_" + id,
|
||||
Password: "Password1",
|
||||
EmailVerified: true,
|
||||
}
|
||||
|
||||
var err *model.AppError
|
||||
if guest {
|
||||
if user, err = th.App.CreateGuest(th.Context, user); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
} else {
|
||||
if user, err = th.App.CreateUser(th.Context, user); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
return user
|
||||
}
|
||||
|
||||
func (th *TestHelper) ShutdownApp() {
|
||||
done := make(chan bool)
|
||||
go func() {
|
||||
th.Server.Shutdown()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(30 * time.Second):
|
||||
// panic instead of fatal to terminate all tests in this package, otherwise the
|
||||
// still running App could spuriously fail subsequent tests.
|
||||
panic("failed to shutdown App within 30 seconds")
|
||||
}
|
||||
}
|
||||
|
||||
func (th *TestHelper) TearDown() {
|
||||
if th.IncludeCacheLayer {
|
||||
// Clean all the caches
|
||||
th.App.Srv().InvalidateAllCaches()
|
||||
}
|
||||
th.ShutdownApp()
|
||||
if th.tempWorkspace != "" {
|
||||
os.RemoveAll(th.tempWorkspace)
|
||||
}
|
||||
}
|
||||
24
server/channels/jobs/main_test.go
Normal file
24
server/channels/jobs/main_test.go
Normal file
@@ -0,0 +1,24 @@
|
||||
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
|
||||
// See LICENSE.txt for license information.
|
||||
|
||||
package jobs_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/mattermost/mattermost/server/v8/channels/testlib"
|
||||
)
|
||||
|
||||
var mainHelper *testlib.MainHelper
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
var options = testlib.HelperOptions{
|
||||
EnableStore: true,
|
||||
EnableResources: true,
|
||||
}
|
||||
|
||||
mainHelper = testlib.NewMainHelperWithOptions(&options)
|
||||
defer mainHelper.Close()
|
||||
|
||||
mainHelper.Main(m)
|
||||
}
|
||||
@@ -3335,6 +3335,24 @@ func (s *OpenTracingLayerDraftStore) Delete(userID string, channelID string, roo
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *OpenTracingLayerDraftStore) DeleteEmptyDraftsByCreateAtAndUserId(createAt int64, userId string) error {
|
||||
origCtx := s.Root.Store.Context()
|
||||
span, newCtx := tracing.StartSpanWithParentByContext(s.Root.Store.Context(), "DraftStore.DeleteEmptyDraftsByCreateAtAndUserId")
|
||||
s.Root.Store.SetContext(newCtx)
|
||||
defer func() {
|
||||
s.Root.Store.SetContext(origCtx)
|
||||
}()
|
||||
|
||||
defer span.Finish()
|
||||
err := s.DraftStore.DeleteEmptyDraftsByCreateAtAndUserId(createAt, userId)
|
||||
if err != nil {
|
||||
span.LogFields(spanlog.Error(err))
|
||||
ext.Error.Set(span, true)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *OpenTracingLayerDraftStore) Get(userID string, channelID string, rootID string, includeDeleted bool) (*model.Draft, error) {
|
||||
origCtx := s.Root.Store.Context()
|
||||
span, newCtx := tracing.StartSpanWithParentByContext(s.Root.Store.Context(), "DraftStore.Get")
|
||||
@@ -3371,6 +3389,24 @@ func (s *OpenTracingLayerDraftStore) GetDraftsForUser(userID string, teamID stri
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (s *OpenTracingLayerDraftStore) GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt int64, userId string) (int64, string, error) {
|
||||
origCtx := s.Root.Store.Context()
|
||||
span, newCtx := tracing.StartSpanWithParentByContext(s.Root.Store.Context(), "DraftStore.GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration")
|
||||
s.Root.Store.SetContext(newCtx)
|
||||
defer func() {
|
||||
s.Root.Store.SetContext(origCtx)
|
||||
}()
|
||||
|
||||
defer span.Finish()
|
||||
result, resultVar1, err := s.DraftStore.GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt, userId)
|
||||
if err != nil {
|
||||
span.LogFields(spanlog.Error(err))
|
||||
ext.Error.Set(span, true)
|
||||
}
|
||||
|
||||
return result, resultVar1, err
|
||||
}
|
||||
|
||||
func (s *OpenTracingLayerDraftStore) Upsert(d *model.Draft) (*model.Draft, error) {
|
||||
origCtx := s.Root.Store.Context()
|
||||
span, newCtx := tracing.StartSpanWithParentByContext(s.Root.Store.Context(), "DraftStore.Upsert")
|
||||
|
||||
@@ -3724,6 +3724,27 @@ func (s *RetryLayerDraftStore) Delete(userID string, channelID string, rootID st
|
||||
|
||||
}
|
||||
|
||||
func (s *RetryLayerDraftStore) DeleteEmptyDraftsByCreateAtAndUserId(createAt int64, userId string) error {
|
||||
|
||||
tries := 0
|
||||
for {
|
||||
err := s.DraftStore.DeleteEmptyDraftsByCreateAtAndUserId(createAt, userId)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
if !isRepeatableError(err) {
|
||||
return err
|
||||
}
|
||||
tries++
|
||||
if tries >= 3 {
|
||||
err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures")
|
||||
return err
|
||||
}
|
||||
timepkg.Sleep(100 * timepkg.Millisecond)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (s *RetryLayerDraftStore) Get(userID string, channelID string, rootID string, includeDeleted bool) (*model.Draft, error) {
|
||||
|
||||
tries := 0
|
||||
@@ -3766,6 +3787,27 @@ func (s *RetryLayerDraftStore) GetDraftsForUser(userID string, teamID string) ([
|
||||
|
||||
}
|
||||
|
||||
func (s *RetryLayerDraftStore) GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt int64, userId string) (int64, string, error) {
|
||||
|
||||
tries := 0
|
||||
for {
|
||||
result, resultVar1, err := s.DraftStore.GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt, userId)
|
||||
if err == nil {
|
||||
return result, resultVar1, nil
|
||||
}
|
||||
if !isRepeatableError(err) {
|
||||
return result, resultVar1, err
|
||||
}
|
||||
tries++
|
||||
if tries >= 3 {
|
||||
err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures")
|
||||
return result, resultVar1, err
|
||||
}
|
||||
timepkg.Sleep(100 * timepkg.Millisecond)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (s *RetryLayerDraftStore) Upsert(d *model.Draft) (*model.Draft, error) {
|
||||
|
||||
tries := 0
|
||||
|
||||
@@ -159,11 +159,8 @@ func (s *SqlDraftStore) GetDraftsForUser(userID, teamID string) ([]*model.Draft,
|
||||
}
|
||||
|
||||
func (s *SqlDraftStore) Delete(userID, channelID, rootID string) error {
|
||||
time := model.GetMillis()
|
||||
query := s.getQueryBuilder().
|
||||
Update("Drafts").
|
||||
Set("UpdateAt", time).
|
||||
Set("DeleteAt", time).
|
||||
Delete("Drafts").
|
||||
Where(sq.Eq{
|
||||
"UserId": userID,
|
||||
"ChannelId": channelID,
|
||||
@@ -236,3 +233,90 @@ func (s *SqlDraftStore) determineMaxDraftSize() int {
|
||||
|
||||
return maxDraftSize
|
||||
}
|
||||
|
||||
func (s *SqlDraftStore) GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt int64, userId string) (int64, string, error) {
|
||||
var drafts []struct {
|
||||
CreateAt int64
|
||||
UserId string
|
||||
}
|
||||
|
||||
query := s.getQueryBuilder().
|
||||
Select("CreateAt", "UserId").
|
||||
From("Drafts").
|
||||
Where(sq.Or{
|
||||
sq.Gt{"CreateAt": createAt},
|
||||
sq.And{
|
||||
sq.Eq{"CreateAt": createAt},
|
||||
sq.Gt{"UserId": userId},
|
||||
},
|
||||
}).
|
||||
OrderBy("CreateAt", "UserId ASC").
|
||||
Limit(100)
|
||||
|
||||
err := s.GetReplicaX().SelectBuilder(&drafts, query)
|
||||
if err != nil {
|
||||
return 0, "", errors.Wrap(err, "failed to get the list of drafts")
|
||||
}
|
||||
|
||||
if len(drafts) == 0 {
|
||||
return 0, "", nil
|
||||
}
|
||||
|
||||
lastElement := drafts[len(drafts)-1]
|
||||
return lastElement.CreateAt, lastElement.UserId, nil
|
||||
}
|
||||
|
||||
func (s *SqlDraftStore) DeleteEmptyDraftsByCreateAtAndUserId(createAt int64, userId string) error {
|
||||
var builder Builder
|
||||
if s.DriverName() == model.DatabaseDriverPostgres {
|
||||
builder = s.getQueryBuilder().
|
||||
Delete("Drafts d").
|
||||
PrefixExpr(s.getQueryBuilder().Select().
|
||||
Prefix("WITH dd AS (").
|
||||
Columns("UserId", "ChannelId", "RootId").
|
||||
From("Drafts").
|
||||
Where(sq.Or{
|
||||
sq.Gt{"CreateAt": createAt},
|
||||
sq.And{
|
||||
sq.Eq{"CreateAt": createAt},
|
||||
sq.Gt{"UserId": userId},
|
||||
},
|
||||
}).
|
||||
OrderBy("CreateAt", "UserId").
|
||||
Limit(100).
|
||||
Suffix(")"),
|
||||
).
|
||||
Using("dd").
|
||||
Where("d.UserId = dd.UserId").
|
||||
Where("d.ChannelId = dd.ChannelId").
|
||||
Where("d.RootId = dd.RootId").
|
||||
Where("d.Message = ''")
|
||||
} else if s.DriverName() == model.DatabaseDriverMysql {
|
||||
builder = s.getQueryBuilder().
|
||||
Delete("Drafts d").
|
||||
What("d.*").
|
||||
JoinClause(s.getQueryBuilder().Select().
|
||||
Prefix("INNER JOIN (").
|
||||
Columns("UserId, ChannelId, RootId").
|
||||
From("Drafts").
|
||||
Where(sq.And{
|
||||
sq.Or{
|
||||
sq.Gt{"CreateAt": createAt},
|
||||
sq.And{
|
||||
sq.Eq{"CreateAt": createAt},
|
||||
sq.Gt{"UserId": userId},
|
||||
},
|
||||
},
|
||||
}).
|
||||
OrderBy("CreateAt", "UserId").
|
||||
Limit(100).
|
||||
Suffix(") dj ON (d.UserId = dj.UserId AND d.ChannelId = dj.ChannelId AND d.RootId = dj.RootId)"),
|
||||
).Where(sq.Eq{"Message": ""})
|
||||
}
|
||||
|
||||
if _, err := s.GetMasterX().ExecBuilder(builder); err != nil {
|
||||
return errors.Wrapf(err, "failed to delete empty drafts")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -986,6 +986,8 @@ type DraftStore interface {
|
||||
Get(userID, channelID, rootID string, includeDeleted bool) (*model.Draft, error)
|
||||
Delete(userID, channelID, rootID string) error
|
||||
GetDraftsForUser(userID, teamID string) ([]*model.Draft, error)
|
||||
GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt int64, userId string) (int64, string, error)
|
||||
DeleteEmptyDraftsByCreateAtAndUserId(createAt int64, userId string) error
|
||||
}
|
||||
|
||||
type PostAcknowledgementStore interface {
|
||||
|
||||
@@ -5,6 +5,7 @@ package storetest
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
@@ -19,6 +20,8 @@ func TestDraftStore(t *testing.T, ss store.Store, s SqlStore) {
|
||||
t.Run("DeleteDraft", func(t *testing.T) { testDeleteDraft(t, ss) })
|
||||
t.Run("GetDraft", func(t *testing.T) { testGetDraft(t, ss) })
|
||||
t.Run("GetDraftsForUser", func(t *testing.T) { testGetDraftsForUser(t, ss) })
|
||||
t.Run("GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration", func(t *testing.T) { testGetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(t, ss) })
|
||||
t.Run("DeleteEmptyDraftsByCreateAtAndUserId", func(t *testing.T) { testDeleteEmptyDraftsByCreateAtAndUserId(t, ss) })
|
||||
}
|
||||
|
||||
func testSaveDraft(t *testing.T, ss store.Store) {
|
||||
@@ -275,24 +278,6 @@ func testGetDraft(t *testing.T, ss store.Store) {
|
||||
assert.Equal(t, draft2.Message, draftResp.Message)
|
||||
assert.Equal(t, draft2.ChannelId, draftResp.ChannelId)
|
||||
})
|
||||
|
||||
t.Run("get draft including deleted", func(t *testing.T) {
|
||||
draftResp, err := ss.Draft().Get(user.Id, channel.Id, "", false)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, draft1.Message, draftResp.Message)
|
||||
assert.Equal(t, draft1.ChannelId, draftResp.ChannelId)
|
||||
|
||||
err = ss.Draft().Delete(user.Id, channel.Id, "")
|
||||
assert.NoError(t, err)
|
||||
_, err = ss.Draft().Get(user.Id, channel.Id, "", false)
|
||||
assert.Error(t, err)
|
||||
assert.IsType(t, &store.ErrNotFound{}, err)
|
||||
|
||||
draftResp, err = ss.Draft().Get(user.Id, channel.Id, "", true)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, draft1.Message, draftResp.Message)
|
||||
assert.Equal(t, draft1.ChannelId, draftResp.ChannelId)
|
||||
})
|
||||
}
|
||||
|
||||
func testGetDraftsForUser(t *testing.T, ss store.Store) {
|
||||
@@ -351,3 +336,226 @@ func testGetDraftsForUser(t *testing.T, ss store.Store) {
|
||||
assert.ElementsMatch(t, []*model.Draft{draft1, draft2}, draftResp)
|
||||
})
|
||||
}
|
||||
|
||||
func clearDrafts(t *testing.T, ss store.Store) {
|
||||
t.Helper()
|
||||
|
||||
_, err := ss.GetInternalMasterDB().Exec("DELETE FROM Drafts")
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func makeDrafts(t *testing.T, ss store.Store, count int, message string) {
|
||||
t.Helper()
|
||||
|
||||
var delay time.Duration
|
||||
if count > 100 {
|
||||
// When creating more than one page of drafts, improve the odds we get
|
||||
// some results with different CreateAt timetsamps.
|
||||
delay = 5 * time.Millisecond
|
||||
}
|
||||
|
||||
for i := 1; i <= count; i++ {
|
||||
_, err := ss.Draft().Upsert(&model.Draft{
|
||||
CreateAt: model.GetMillis(),
|
||||
UpdateAt: model.GetMillis(),
|
||||
UserId: model.NewId(),
|
||||
ChannelId: model.NewId(),
|
||||
Message: message,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
if delay > 0 {
|
||||
time.Sleep(delay)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func countDraftPages(t *testing.T, ss store.Store) int {
|
||||
t.Helper()
|
||||
|
||||
pages := 0
|
||||
createAt := int64(0)
|
||||
userId := ""
|
||||
|
||||
for {
|
||||
nextCreateAt, nextUserId, err := ss.Draft().GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt, userId)
|
||||
require.NoError(t, err)
|
||||
|
||||
if nextCreateAt == 0 && nextUserId == "" {
|
||||
break
|
||||
}
|
||||
|
||||
// Ensure we're always making progress.
|
||||
if nextCreateAt == createAt {
|
||||
require.Greater(t, nextUserId, userId)
|
||||
} else {
|
||||
require.Greater(t, nextCreateAt, createAt)
|
||||
}
|
||||
|
||||
pages++
|
||||
createAt = nextCreateAt
|
||||
userId = nextUserId
|
||||
}
|
||||
|
||||
return pages
|
||||
}
|
||||
|
||||
func testGetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(t *testing.T, ss store.Store) {
|
||||
t.Run("no drafts", func(t *testing.T) {
|
||||
clearDrafts(t, ss)
|
||||
|
||||
createAt, userId, err := ss.Draft().GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(0, "")
|
||||
require.NoError(t, err)
|
||||
assert.EqualValues(t, 0, createAt)
|
||||
assert.Equal(t, "", userId)
|
||||
|
||||
assert.Equal(t, 0, countDraftPages(t, ss), "incorrect number of pages")
|
||||
})
|
||||
|
||||
t.Run("single page", func(t *testing.T) {
|
||||
clearDrafts(t, ss)
|
||||
|
||||
makeDrafts(t, ss, 100, model.NewRandomString(16))
|
||||
assert.Equal(t, 1, countDraftPages(t, ss), "incorrect number of pages")
|
||||
})
|
||||
|
||||
t.Run("multiple pages", func(t *testing.T) {
|
||||
clearDrafts(t, ss)
|
||||
|
||||
makeDrafts(t, ss, 300, model.NewRandomString(16))
|
||||
assert.Equal(t, 3, countDraftPages(t, ss), "incorrect number of pages")
|
||||
})
|
||||
}
|
||||
|
||||
func testDeleteEmptyDraftsByCreateAtAndUserId(t *testing.T, ss store.Store) {
|
||||
t.Run("nil parameters", func(t *testing.T) {
|
||||
clearDrafts(t, ss)
|
||||
|
||||
err := ss.Draft().DeleteEmptyDraftsByCreateAtAndUserId(0, "")
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("delete single page, all empty", func(t *testing.T) {
|
||||
clearDrafts(t, ss)
|
||||
makeDrafts(t, ss, 100, "")
|
||||
|
||||
createAt, userId := int64(0), ""
|
||||
nextCreateAt, nextUserId, err := ss.Draft().GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt, userId)
|
||||
require.NoError(t, err)
|
||||
err = ss.Draft().DeleteEmptyDraftsByCreateAtAndUserId(createAt, userId)
|
||||
require.NoError(t, err)
|
||||
createAt, userId = nextCreateAt, nextUserId
|
||||
|
||||
assert.Equal(t, 0, countDraftPages(t, ss), "incorrect number of pages")
|
||||
|
||||
nextCreateAt, nextUserId, err = ss.Draft().GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt, userId)
|
||||
require.NoError(t, err)
|
||||
assert.EqualValues(t, 0, nextCreateAt, "should have finished iterating through drafts")
|
||||
assert.Equal(t, "", nextUserId, "should have finished iterating through drafts")
|
||||
})
|
||||
|
||||
t.Run("delete multiple pages, all empty", func(t *testing.T) {
|
||||
clearDrafts(t, ss)
|
||||
makeDrafts(t, ss, 300, "")
|
||||
|
||||
createAt, userId := int64(0), ""
|
||||
nextCreateAt, nextUserId, err := ss.Draft().GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt, userId)
|
||||
require.NoError(t, err)
|
||||
err = ss.Draft().DeleteEmptyDraftsByCreateAtAndUserId(createAt, userId)
|
||||
require.NoError(t, err)
|
||||
createAt, userId = nextCreateAt, nextUserId
|
||||
|
||||
assert.Equal(t, 2, countDraftPages(t, ss), "incorrect number of pages")
|
||||
|
||||
nextCreateAt, nextUserId, err = ss.Draft().GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt, userId)
|
||||
require.NoError(t, err)
|
||||
err = ss.Draft().DeleteEmptyDraftsByCreateAtAndUserId(createAt, userId)
|
||||
require.NoError(t, err)
|
||||
createAt, userId = nextCreateAt, nextUserId
|
||||
|
||||
assert.Equal(t, 1, countDraftPages(t, ss), "incorrect number of pages")
|
||||
|
||||
nextCreateAt, nextUserId, err = ss.Draft().GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt, userId)
|
||||
require.NoError(t, err)
|
||||
err = ss.Draft().DeleteEmptyDraftsByCreateAtAndUserId(createAt, userId)
|
||||
require.NoError(t, err)
|
||||
createAt, userId = nextCreateAt, nextUserId
|
||||
|
||||
assert.Equal(t, 0, countDraftPages(t, ss), "incorrect number of pages")
|
||||
|
||||
nextCreateAt, nextUserId, err = ss.Draft().GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt, userId)
|
||||
require.NoError(t, err)
|
||||
assert.EqualValues(t, 0, nextCreateAt, "should have finished iterating through drafts")
|
||||
assert.Equal(t, "", nextUserId, "should have finished iterating through drafts")
|
||||
})
|
||||
|
||||
t.Run("delete multiple pages, some empty", func(t *testing.T) {
|
||||
clearDrafts(t, ss)
|
||||
makeDrafts(t, ss, 50, "")
|
||||
makeDrafts(t, ss, 50, "message")
|
||||
makeDrafts(t, ss, 50, "")
|
||||
makeDrafts(t, ss, 50, "message")
|
||||
makeDrafts(t, ss, 50, "")
|
||||
makeDrafts(t, ss, 50, "message")
|
||||
makeDrafts(t, ss, 50, "")
|
||||
makeDrafts(t, ss, 50, "message")
|
||||
makeDrafts(t, ss, 50, "message")
|
||||
makeDrafts(t, ss, 50, "message")
|
||||
|
||||
// Verify initially 5 pages
|
||||
assert.Equal(t, 5, countDraftPages(t, ss), "incorrect number of pages")
|
||||
|
||||
createAt, userId := int64(0), ""
|
||||
|
||||
nextCreateAt, nextUserId, err := ss.Draft().GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt, userId)
|
||||
require.NoError(t, err)
|
||||
err = ss.Draft().DeleteEmptyDraftsByCreateAtAndUserId(createAt, userId)
|
||||
require.NoError(t, err)
|
||||
createAt, userId = nextCreateAt, nextUserId
|
||||
|
||||
// Only deleted 50, so still 5 pages
|
||||
assert.Equal(t, 5, countDraftPages(t, ss), "incorrect number of pages")
|
||||
|
||||
nextCreateAt, nextUserId, err = ss.Draft().GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt, userId)
|
||||
require.NoError(t, err)
|
||||
err = ss.Draft().DeleteEmptyDraftsByCreateAtAndUserId(createAt, userId)
|
||||
require.NoError(t, err)
|
||||
createAt, userId = nextCreateAt, nextUserId
|
||||
|
||||
// Now deleted 100, so down to 4 pages
|
||||
assert.Equal(t, 4, countDraftPages(t, ss), "incorrect number of pages")
|
||||
|
||||
nextCreateAt, nextUserId, err = ss.Draft().GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt, userId)
|
||||
require.NoError(t, err)
|
||||
err = ss.Draft().DeleteEmptyDraftsByCreateAtAndUserId(createAt, userId)
|
||||
require.NoError(t, err)
|
||||
createAt, userId = nextCreateAt, nextUserId
|
||||
|
||||
// Only deleted 150 now, so still 4 pages
|
||||
assert.Equal(t, 4, countDraftPages(t, ss), "incorrect number of pages")
|
||||
|
||||
nextCreateAt, nextUserId, err = ss.Draft().GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt, userId)
|
||||
require.NoError(t, err)
|
||||
err = ss.Draft().DeleteEmptyDraftsByCreateAtAndUserId(createAt, userId)
|
||||
require.NoError(t, err)
|
||||
createAt, userId = nextCreateAt, nextUserId
|
||||
|
||||
// Now deleted all 200 empty messages, so down to 3 pages
|
||||
assert.Equal(t, 3, countDraftPages(t, ss), "incorrect number of pages")
|
||||
|
||||
// Keep going through all pages to verify nothing else gets deleted.
|
||||
|
||||
nextCreateAt, nextUserId, err = ss.Draft().GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt, userId)
|
||||
require.NoError(t, err)
|
||||
err = ss.Draft().DeleteEmptyDraftsByCreateAtAndUserId(createAt, userId)
|
||||
require.NoError(t, err)
|
||||
createAt, userId = nextCreateAt, nextUserId
|
||||
|
||||
// Verify we're done iterating
|
||||
|
||||
nextCreateAt, nextUserId, err = ss.Draft().GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt, userId)
|
||||
require.NoError(t, err)
|
||||
assert.EqualValues(t, 0, nextCreateAt, "should have finished iterating through drafts")
|
||||
assert.Equal(t, "", nextUserId, "should have finished iterating through drafts")
|
||||
})
|
||||
}
|
||||
|
||||
@@ -28,6 +28,20 @@ func (_m *DraftStore) Delete(userID string, channelID string, rootID string) err
|
||||
return r0
|
||||
}
|
||||
|
||||
// DeleteEmptyDraftsByCreateAtAndUserId provides a mock function with given fields: createAt, userId
|
||||
func (_m *DraftStore) DeleteEmptyDraftsByCreateAtAndUserId(createAt int64, userId string) error {
|
||||
ret := _m.Called(createAt, userId)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(int64, string) error); ok {
|
||||
r0 = rf(createAt, userId)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// Get provides a mock function with given fields: userID, channelID, rootID, includeDeleted
|
||||
func (_m *DraftStore) Get(userID string, channelID string, rootID string, includeDeleted bool) (*model.Draft, error) {
|
||||
ret := _m.Called(userID, channelID, rootID, includeDeleted)
|
||||
@@ -80,6 +94,37 @@ func (_m *DraftStore) GetDraftsForUser(userID string, teamID string) ([]*model.D
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration provides a mock function with given fields: createAt, userId
|
||||
func (_m *DraftStore) GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt int64, userId string) (int64, string, error) {
|
||||
ret := _m.Called(createAt, userId)
|
||||
|
||||
var r0 int64
|
||||
var r1 string
|
||||
var r2 error
|
||||
if rf, ok := ret.Get(0).(func(int64, string) (int64, string, error)); ok {
|
||||
return rf(createAt, userId)
|
||||
}
|
||||
if rf, ok := ret.Get(0).(func(int64, string) int64); ok {
|
||||
r0 = rf(createAt, userId)
|
||||
} else {
|
||||
r0 = ret.Get(0).(int64)
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(1).(func(int64, string) string); ok {
|
||||
r1 = rf(createAt, userId)
|
||||
} else {
|
||||
r1 = ret.Get(1).(string)
|
||||
}
|
||||
|
||||
if rf, ok := ret.Get(2).(func(int64, string) error); ok {
|
||||
r2 = rf(createAt, userId)
|
||||
} else {
|
||||
r2 = ret.Error(2)
|
||||
}
|
||||
|
||||
return r0, r1, r2
|
||||
}
|
||||
|
||||
// Upsert provides a mock function with given fields: d
|
||||
func (_m *DraftStore) Upsert(d *model.Draft) (*model.Draft, error) {
|
||||
ret := _m.Called(d)
|
||||
|
||||
@@ -3059,6 +3059,22 @@ func (s *TimerLayerDraftStore) Delete(userID string, channelID string, rootID st
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *TimerLayerDraftStore) DeleteEmptyDraftsByCreateAtAndUserId(createAt int64, userId string) error {
|
||||
start := time.Now()
|
||||
|
||||
err := s.DraftStore.DeleteEmptyDraftsByCreateAtAndUserId(createAt, userId)
|
||||
|
||||
elapsed := float64(time.Since(start)) / float64(time.Second)
|
||||
if s.Root.Metrics != nil {
|
||||
success := "false"
|
||||
if err == nil {
|
||||
success = "true"
|
||||
}
|
||||
s.Root.Metrics.ObserveStoreMethodDuration("DraftStore.DeleteEmptyDraftsByCreateAtAndUserId", success, elapsed)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *TimerLayerDraftStore) Get(userID string, channelID string, rootID string, includeDeleted bool) (*model.Draft, error) {
|
||||
start := time.Now()
|
||||
|
||||
@@ -3091,6 +3107,22 @@ func (s *TimerLayerDraftStore) GetDraftsForUser(userID string, teamID string) ([
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (s *TimerLayerDraftStore) GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt int64, userId string) (int64, string, error) {
|
||||
start := time.Now()
|
||||
|
||||
result, resultVar1, err := s.DraftStore.GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration(createAt, userId)
|
||||
|
||||
elapsed := float64(time.Since(start)) / float64(time.Second)
|
||||
if s.Root.Metrics != nil {
|
||||
success := "false"
|
||||
if err == nil {
|
||||
success = "true"
|
||||
}
|
||||
s.Root.Metrics.ObserveStoreMethodDuration("DraftStore.GetLastCreateAtAndUserIdValuesForEmptyDraftsMigration", success, elapsed)
|
||||
}
|
||||
return result, resultVar1, err
|
||||
}
|
||||
|
||||
func (s *TimerLayerDraftStore) Upsert(d *model.Draft) (*model.Draft, error) {
|
||||
start := time.Now()
|
||||
|
||||
|
||||
@@ -71,6 +71,7 @@ func GetMockStoreForSetupFunctions() *mocks.Store {
|
||||
systemStore.On("GetByName", model.MigrationKeyAddPlayboosksManageRolesPermissions).Return(&model.System{Name: model.MigrationKeyAddPlayboosksManageRolesPermissions, Value: "true"}, nil)
|
||||
systemStore.On("GetByName", model.MigrationKeyAddCustomUserGroupsPermissionRestore).Return(&model.System{Name: model.MigrationKeyAddCustomUserGroupsPermissionRestore, Value: "true"}, nil)
|
||||
systemStore.On("GetByName", model.MigrationKeyAddReadChannelContentPermissions).Return(&model.System{Name: model.MigrationKeyAddReadChannelContentPermissions, Value: "true"}, nil)
|
||||
systemStore.On("GetByName", model.MigrationKeyDeleteEmptyDrafts).Return(&model.System{Name: model.MigrationKeyDeleteEmptyDrafts, Value: "true"}, nil)
|
||||
systemStore.On("GetByName", "CustomGroupAdminRoleCreationMigrationComplete").Return(&model.System{Name: model.MigrationKeyAddPlayboosksManageRolesPermissions, Value: "true"}, nil)
|
||||
systemStore.On("GetByName", "products_boards").Return(&model.System{Name: "products_boards", Value: "true"}, nil)
|
||||
systemStore.On("GetByName", "elasticsearch_fix_channel_index_migration").Return(&model.System{Name: "elasticsearch_fix_channel_index_migration", Value: "true"}, nil)
|
||||
|
||||
@@ -39,7 +39,7 @@ require (
|
||||
github.com/mattermost/mattermost/server/public v0.0.9
|
||||
github.com/mattermost/morph v1.0.5-0.20230511171014-e76e25978d56
|
||||
github.com/mattermost/rsc v0.0.0-20160330161541-bbaefb05eaa0
|
||||
github.com/mattermost/squirrel v0.2.0
|
||||
github.com/mattermost/squirrel v0.4.0
|
||||
github.com/mholt/archiver/v3 v3.5.1
|
||||
github.com/microcosm-cc/bluemonday v1.0.24
|
||||
github.com/minio/minio-go/v7 v7.0.59
|
||||
|
||||
@@ -446,8 +446,8 @@ github.com/mattermost/morph v1.0.5-0.20230511171014-e76e25978d56 h1:SjFYbWvmuf73
|
||||
github.com/mattermost/morph v1.0.5-0.20230511171014-e76e25978d56/go.mod h1:gD+EaqX2UMyyuzmF4PFh4r33XneQ8Nzi+0E8nXjMa3A=
|
||||
github.com/mattermost/rsc v0.0.0-20160330161541-bbaefb05eaa0 h1:G9tL6JXRBMzjuD1kkBtcnd42kUiT6QDwxfFYu7adM6o=
|
||||
github.com/mattermost/rsc v0.0.0-20160330161541-bbaefb05eaa0/go.mod h1:nV5bfVpT//+B1RPD2JvRnxbkLmJEYXmRaaVl15fsXjs=
|
||||
github.com/mattermost/squirrel v0.2.0 h1:8ZWeyf+MWQ2cL7hu9REZgLtz2IJi51qqZEovI3T3TT8=
|
||||
github.com/mattermost/squirrel v0.2.0/go.mod h1:NPPtk+CdpWre4GxMGoOpzEVFVc0ZoEFyJBZGCtn9nSU=
|
||||
github.com/mattermost/squirrel v0.4.0 h1:azf9LZ+8JUTAvwt/njB1utkPqWQ6e7Rje2ya5N0P2i4=
|
||||
github.com/mattermost/squirrel v0.4.0/go.mod h1:NPPtk+CdpWre4GxMGoOpzEVFVc0ZoEFyJBZGCtn9nSU=
|
||||
github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
|
||||
github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
|
||||
github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
|
||||
|
||||
@@ -12,7 +12,7 @@ import (
|
||||
type Draft struct {
|
||||
CreateAt int64 `json:"create_at"`
|
||||
UpdateAt int64 `json:"update_at"`
|
||||
DeleteAt int64 `json:"delete_at"`
|
||||
DeleteAt int64 `json:"delete_at"` // Deprecated, we now just hard delete the rows
|
||||
UserId string `json:"user_id"`
|
||||
ChannelId string `json:"channel_id"`
|
||||
RootId string `json:"root_id"`
|
||||
|
||||
@@ -36,6 +36,7 @@ const (
|
||||
JobTypeHostedPurchaseScreening = "hosted_purchase_screening"
|
||||
JobTypeS3PathMigration = "s3_path_migration"
|
||||
JobTypeCleanupDesktopTokens = "cleanup_desktop_tokens"
|
||||
JobTypeDeleteEmptyDraftsMigration = "delete_empty_drafts_migration"
|
||||
|
||||
JobStatusPending = "pending"
|
||||
JobStatusInProgress = "in_progress"
|
||||
|
||||
@@ -43,4 +43,5 @@ const (
|
||||
MigrationKeyAddReadChannelContentPermissions = "read_channel_content_permissions"
|
||||
MigrationKeyElasticsearchFixChannelIndex = "elasticsearch_fix_channel_index_migration"
|
||||
MigrationKeyS3Path = "s3_path_migration"
|
||||
MigrationKeyDeleteEmptyDrafts = "delete_empty_drafts_migration"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user