diff --git a/server/channels/api4/license_test.go b/server/channels/api4/license_test.go index ae1a42e149..df3ceb5493 100644 --- a/server/channels/api4/license_test.go +++ b/server/channels/api4/license_test.go @@ -53,7 +53,6 @@ func TestGetOldClientLicense(t *testing.T) { } func TestUploadLicenseFile(t *testing.T) { - t.Skip("MM-56359") th := Setup(t) defer th.TearDown() client := th.Client diff --git a/server/channels/jobs/batch_worker.go b/server/channels/jobs/batch_worker.go index 8113595168..0214eb1c55 100644 --- a/server/channels/jobs/batch_worker.go +++ b/server/channels/jobs/batch_worker.go @@ -4,7 +4,7 @@ package jobs import ( - "sync/atomic" + "sync" "time" "github.com/mattermost/mattermost/server/public/model" @@ -18,10 +18,13 @@ type BatchWorker struct { logger mlog.LoggerIFace store store.Store - stop chan struct{} - stopped chan bool - closed atomic.Bool - jobs chan model.Job + // stateMut protects stopCh and helps enforce + // ordering in case subsequent Run or Stop calls are made. + stateMut sync.Mutex + stopCh chan struct{} + stoppedCh chan bool + stopped bool + jobs chan model.Job timeBetweenBatches time.Duration doBatch func(rctx *request.Context, job *model.Job) bool @@ -38,31 +41,40 @@ func MakeBatchWorker( jobServer: jobServer, logger: jobServer.Logger(), store: store, - stop: make(chan struct{}), - stopped: make(chan bool, 1), + stoppedCh: make(chan bool, 1), jobs: make(chan model.Job), timeBetweenBatches: timeBetweenBatches, doBatch: doBatch, + stopped: true, } } // Run starts the worker dedicated to the unique migration batch job it will be given to process. func (worker *BatchWorker) Run() { - worker.logger.Debug("Worker started") + worker.stateMut.Lock() // We have to re-assign the stop channel again, because // it might happen that the job was restarted due to a config change. - if worker.closed.CompareAndSwap(true, false) { - worker.stop = make(chan struct{}) + if worker.stopped { + worker.stopped = false + worker.stopCh = make(chan struct{}) + } else { + worker.stateMut.Unlock() + return } + // Run is called from a separate goroutine and doesn't return. + // So we cannot Unlock in a defer clause. + worker.stateMut.Unlock() + + worker.logger.Debug("Worker started") defer func() { worker.logger.Debug("Worker finished") - worker.stopped <- true + worker.stoppedCh <- true }() for { select { - case <-worker.stop: + case <-worker.stopCh: worker.logger.Debug("Worker received stop signal") return case job := <-worker.jobs: @@ -73,14 +85,18 @@ func (worker *BatchWorker) Run() { // Stop interrupts the worker even if the migration has not yet completed. func (worker *BatchWorker) Stop() { + worker.stateMut.Lock() + defer worker.stateMut.Unlock() + // Set to close, and if already closed before, then return. - if !worker.closed.CompareAndSwap(false, true) { + if worker.stopped { return } + worker.stopped = true worker.logger.Debug("Worker stopping") - close(worker.stop) - <-worker.stopped + close(worker.stopCh) + <-worker.stoppedCh } // JobChannel is the means by which the jobs infrastructure provides the worker the job to execute. @@ -127,7 +143,7 @@ func (worker *BatchWorker) DoJob(job *model.Job) { for { select { - case <-worker.stop: + case <-worker.stopCh: logger.Info("Worker: Batch has been canceled via Worker Stop. Setting the job back to pending.") if err := worker.jobServer.SetJobPending(job); err != nil { worker.logger.Error("Worker: Failed to mark job as pending", mlog.Err(err)) diff --git a/server/channels/jobs/batch_worker_test.go b/server/channels/jobs/batch_worker_test.go index 7a1bb3c674..c022ea8db1 100644 --- a/server/channels/jobs/batch_worker_test.go +++ b/server/channels/jobs/batch_worker_test.go @@ -14,6 +14,20 @@ import ( "github.com/stretchr/testify/require" ) +// TestBatchWorkerRace tests race conditions during the start/stop +// cases of the batch worker. Use the -race flag while testing this. +func TestBatchWorkerRace(t *testing.T) { + th := Setup(t) + defer th.TearDown() + + worker := jobs.MakeBatchWorker(th.Server.Jobs, th.Server.Store(), 1*time.Second, func(rctx *request.Context, job *model.Job) bool { + return false + }) + + go worker.Run() + worker.Stop() +} + func TestBatchWorker(t *testing.T) { createBatchWorker := func(t *testing.T, th *TestHelper, doBatch func(rctx *request.Context, job *model.Job) bool) (*jobs.BatchWorker, *model.Job) { t.Helper() diff --git a/server/platform/services/searchengine/bleveengine/indexer/indexing_job.go b/server/platform/services/searchengine/bleveengine/indexer/indexing_job.go index 7f1091a7e9..ca70652ea2 100644 --- a/server/platform/services/searchengine/bleveengine/indexer/indexing_job.go +++ b/server/platform/services/searchengine/bleveengine/indexer/indexing_job.go @@ -7,7 +7,7 @@ import ( "context" "net/http" "strconv" - "sync/atomic" + "sync" "time" "github.com/mattermost/mattermost/server/public/model" @@ -27,14 +27,17 @@ const ( ) type BleveIndexerWorker struct { - name string - stop chan struct{} - stopped chan bool + name string + // stateMut protects stopCh and helps enforce + // ordering in case subsequent Run or Stop calls are made. + stateMut sync.Mutex + stopCh chan struct{} + stoppedCh chan bool jobs chan model.Job jobServer *jobs.JobServer logger mlog.LoggerIFace engine *bleveengine.BleveEngine - closed int32 + stopped bool } func MakeWorker(jobServer *jobs.JobServer, engine *bleveengine.BleveEngine) *BleveIndexerWorker { @@ -44,12 +47,12 @@ func MakeWorker(jobServer *jobs.JobServer, engine *bleveengine.BleveEngine) *Ble const workerName = "BleveIndexer" return &BleveIndexerWorker{ name: workerName, - stop: make(chan struct{}), - stopped: make(chan bool, 1), + stoppedCh: make(chan bool, 1), jobs: make(chan model.Job), jobServer: jobServer, logger: jobServer.Logger().With(mlog.String("worker_name", workerName)), engine: engine, + stopped: true, } } @@ -97,20 +100,30 @@ func (worker *BleveIndexerWorker) IsEnabled(cfg *model.Config) bool { } func (worker *BleveIndexerWorker) Run() { - // Set to open if closed before. We are not bothered about multiple opens. - if atomic.CompareAndSwapInt32(&worker.closed, 1, 0) { - worker.stop = make(chan struct{}) + worker.stateMut.Lock() + // We have to re-assign the stop channel again, because + // it might happen that the job was restarted due to a config change. + if worker.stopped { + worker.stopped = false + worker.stopCh = make(chan struct{}) + } else { + worker.stateMut.Unlock() + return } + // Run is called from a separate goroutine and doesn't return. + // So we cannot Unlock in a defer clause. + worker.stateMut.Unlock() + worker.logger.Debug("Worker Started") defer func() { worker.logger.Debug("Worker: Finished") - worker.stopped <- true + worker.stoppedCh <- true }() for { select { - case <-worker.stop: + case <-worker.stopCh: worker.logger.Debug("Worker: Received stop signal") return case job := <-worker.jobs: @@ -120,13 +133,17 @@ func (worker *BleveIndexerWorker) Run() { } func (worker *BleveIndexerWorker) Stop() { + worker.stateMut.Lock() + defer worker.stateMut.Unlock() + // Set to close, and if already closed before, then return. - if !atomic.CompareAndSwapInt32(&worker.closed, 0, 1) { + if worker.stopped { return } + worker.stopped = true worker.logger.Debug("Worker Stopping") - close(worker.stop) - <-worker.stopped + close(worker.stopCh) + <-worker.stoppedCh } func (worker *BleveIndexerWorker) DoJob(job *model.Job) { @@ -269,7 +286,7 @@ func (worker *BleveIndexerWorker) DoJob(job *model.Job) { } return - case <-worker.stop: + case <-worker.stopCh: logger.Info("Worker: Indexing has been canceled via Worker Stop") if err := worker.jobServer.SetJobCanceled(job); err != nil { logger.Error("Worker: Failed to mark job as canceled", mlog.Err(err))