MM-56358: Unskip racy test TestUploadLicenseFile (#26009)

This was already fixed before in https://github.com/mattermost/mattermost/pull/24971.
We just enable the test now.

https://mattermost.atlassian.net/browse/MM-56358

```release-note
NONE
```

Co-authored-by: Mattermost Build <build@mattermost.com>
This commit is contained in:
Agniva De Sarker 2024-02-13 21:03:17 +05:30 committed by GitHub
parent 845f28cb76
commit bdce9f42d8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 79 additions and 33 deletions

View File

@ -53,7 +53,6 @@ func TestGetOldClientLicense(t *testing.T) {
} }
func TestUploadLicenseFile(t *testing.T) { func TestUploadLicenseFile(t *testing.T) {
t.Skip("MM-56359")
th := Setup(t) th := Setup(t)
defer th.TearDown() defer th.TearDown()
client := th.Client client := th.Client

View File

@ -4,7 +4,7 @@
package jobs package jobs
import ( import (
"sync/atomic" "sync"
"time" "time"
"github.com/mattermost/mattermost/server/public/model" "github.com/mattermost/mattermost/server/public/model"
@ -18,10 +18,13 @@ type BatchWorker struct {
logger mlog.LoggerIFace logger mlog.LoggerIFace
store store.Store store store.Store
stop chan struct{} // stateMut protects stopCh and helps enforce
stopped chan bool // ordering in case subsequent Run or Stop calls are made.
closed atomic.Bool stateMut sync.Mutex
jobs chan model.Job stopCh chan struct{}
stoppedCh chan bool
stopped bool
jobs chan model.Job
timeBetweenBatches time.Duration timeBetweenBatches time.Duration
doBatch func(rctx *request.Context, job *model.Job) bool doBatch func(rctx *request.Context, job *model.Job) bool
@ -38,31 +41,40 @@ func MakeBatchWorker(
jobServer: jobServer, jobServer: jobServer,
logger: jobServer.Logger(), logger: jobServer.Logger(),
store: store, store: store,
stop: make(chan struct{}), stoppedCh: make(chan bool, 1),
stopped: make(chan bool, 1),
jobs: make(chan model.Job), jobs: make(chan model.Job),
timeBetweenBatches: timeBetweenBatches, timeBetweenBatches: timeBetweenBatches,
doBatch: doBatch, doBatch: doBatch,
stopped: true,
} }
} }
// Run starts the worker dedicated to the unique migration batch job it will be given to process. // Run starts the worker dedicated to the unique migration batch job it will be given to process.
func (worker *BatchWorker) Run() { func (worker *BatchWorker) Run() {
worker.logger.Debug("Worker started") worker.stateMut.Lock()
// We have to re-assign the stop channel again, because // We have to re-assign the stop channel again, because
// it might happen that the job was restarted due to a config change. // it might happen that the job was restarted due to a config change.
if worker.closed.CompareAndSwap(true, false) { if worker.stopped {
worker.stop = make(chan struct{}) worker.stopped = false
worker.stopCh = make(chan struct{})
} else {
worker.stateMut.Unlock()
return
} }
// Run is called from a separate goroutine and doesn't return.
// So we cannot Unlock in a defer clause.
worker.stateMut.Unlock()
worker.logger.Debug("Worker started")
defer func() { defer func() {
worker.logger.Debug("Worker finished") worker.logger.Debug("Worker finished")
worker.stopped <- true worker.stoppedCh <- true
}() }()
for { for {
select { select {
case <-worker.stop: case <-worker.stopCh:
worker.logger.Debug("Worker received stop signal") worker.logger.Debug("Worker received stop signal")
return return
case job := <-worker.jobs: case job := <-worker.jobs:
@ -73,14 +85,18 @@ func (worker *BatchWorker) Run() {
// Stop interrupts the worker even if the migration has not yet completed. // Stop interrupts the worker even if the migration has not yet completed.
func (worker *BatchWorker) Stop() { func (worker *BatchWorker) Stop() {
worker.stateMut.Lock()
defer worker.stateMut.Unlock()
// Set to close, and if already closed before, then return. // Set to close, and if already closed before, then return.
if !worker.closed.CompareAndSwap(false, true) { if worker.stopped {
return return
} }
worker.stopped = true
worker.logger.Debug("Worker stopping") worker.logger.Debug("Worker stopping")
close(worker.stop) close(worker.stopCh)
<-worker.stopped <-worker.stoppedCh
} }
// JobChannel is the means by which the jobs infrastructure provides the worker the job to execute. // JobChannel is the means by which the jobs infrastructure provides the worker the job to execute.
@ -127,7 +143,7 @@ func (worker *BatchWorker) DoJob(job *model.Job) {
for { for {
select { select {
case <-worker.stop: case <-worker.stopCh:
logger.Info("Worker: Batch has been canceled via Worker Stop. Setting the job back to pending.") logger.Info("Worker: Batch has been canceled via Worker Stop. Setting the job back to pending.")
if err := worker.jobServer.SetJobPending(job); err != nil { if err := worker.jobServer.SetJobPending(job); err != nil {
worker.logger.Error("Worker: Failed to mark job as pending", mlog.Err(err)) worker.logger.Error("Worker: Failed to mark job as pending", mlog.Err(err))

View File

@ -14,6 +14,20 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
// TestBatchWorkerRace tests race conditions during the start/stop
// cases of the batch worker. Use the -race flag while testing this.
func TestBatchWorkerRace(t *testing.T) {
th := Setup(t)
defer th.TearDown()
worker := jobs.MakeBatchWorker(th.Server.Jobs, th.Server.Store(), 1*time.Second, func(rctx *request.Context, job *model.Job) bool {
return false
})
go worker.Run()
worker.Stop()
}
func TestBatchWorker(t *testing.T) { func TestBatchWorker(t *testing.T) {
createBatchWorker := func(t *testing.T, th *TestHelper, doBatch func(rctx *request.Context, job *model.Job) bool) (*jobs.BatchWorker, *model.Job) { createBatchWorker := func(t *testing.T, th *TestHelper, doBatch func(rctx *request.Context, job *model.Job) bool) (*jobs.BatchWorker, *model.Job) {
t.Helper() t.Helper()

View File

@ -7,7 +7,7 @@ import (
"context" "context"
"net/http" "net/http"
"strconv" "strconv"
"sync/atomic" "sync"
"time" "time"
"github.com/mattermost/mattermost/server/public/model" "github.com/mattermost/mattermost/server/public/model"
@ -27,14 +27,17 @@ const (
) )
type BleveIndexerWorker struct { type BleveIndexerWorker struct {
name string name string
stop chan struct{} // stateMut protects stopCh and helps enforce
stopped chan bool // ordering in case subsequent Run or Stop calls are made.
stateMut sync.Mutex
stopCh chan struct{}
stoppedCh chan bool
jobs chan model.Job jobs chan model.Job
jobServer *jobs.JobServer jobServer *jobs.JobServer
logger mlog.LoggerIFace logger mlog.LoggerIFace
engine *bleveengine.BleveEngine engine *bleveengine.BleveEngine
closed int32 stopped bool
} }
func MakeWorker(jobServer *jobs.JobServer, engine *bleveengine.BleveEngine) *BleveIndexerWorker { func MakeWorker(jobServer *jobs.JobServer, engine *bleveengine.BleveEngine) *BleveIndexerWorker {
@ -44,12 +47,12 @@ func MakeWorker(jobServer *jobs.JobServer, engine *bleveengine.BleveEngine) *Ble
const workerName = "BleveIndexer" const workerName = "BleveIndexer"
return &BleveIndexerWorker{ return &BleveIndexerWorker{
name: workerName, name: workerName,
stop: make(chan struct{}), stoppedCh: make(chan bool, 1),
stopped: make(chan bool, 1),
jobs: make(chan model.Job), jobs: make(chan model.Job),
jobServer: jobServer, jobServer: jobServer,
logger: jobServer.Logger().With(mlog.String("worker_name", workerName)), logger: jobServer.Logger().With(mlog.String("worker_name", workerName)),
engine: engine, engine: engine,
stopped: true,
} }
} }
@ -97,20 +100,30 @@ func (worker *BleveIndexerWorker) IsEnabled(cfg *model.Config) bool {
} }
func (worker *BleveIndexerWorker) Run() { func (worker *BleveIndexerWorker) Run() {
// Set to open if closed before. We are not bothered about multiple opens. worker.stateMut.Lock()
if atomic.CompareAndSwapInt32(&worker.closed, 1, 0) { // We have to re-assign the stop channel again, because
worker.stop = make(chan struct{}) // it might happen that the job was restarted due to a config change.
if worker.stopped {
worker.stopped = false
worker.stopCh = make(chan struct{})
} else {
worker.stateMut.Unlock()
return
} }
// Run is called from a separate goroutine and doesn't return.
// So we cannot Unlock in a defer clause.
worker.stateMut.Unlock()
worker.logger.Debug("Worker Started") worker.logger.Debug("Worker Started")
defer func() { defer func() {
worker.logger.Debug("Worker: Finished") worker.logger.Debug("Worker: Finished")
worker.stopped <- true worker.stoppedCh <- true
}() }()
for { for {
select { select {
case <-worker.stop: case <-worker.stopCh:
worker.logger.Debug("Worker: Received stop signal") worker.logger.Debug("Worker: Received stop signal")
return return
case job := <-worker.jobs: case job := <-worker.jobs:
@ -120,13 +133,17 @@ func (worker *BleveIndexerWorker) Run() {
} }
func (worker *BleveIndexerWorker) Stop() { func (worker *BleveIndexerWorker) Stop() {
worker.stateMut.Lock()
defer worker.stateMut.Unlock()
// Set to close, and if already closed before, then return. // Set to close, and if already closed before, then return.
if !atomic.CompareAndSwapInt32(&worker.closed, 0, 1) { if worker.stopped {
return return
} }
worker.stopped = true
worker.logger.Debug("Worker Stopping") worker.logger.Debug("Worker Stopping")
close(worker.stop) close(worker.stopCh)
<-worker.stopped <-worker.stoppedCh
} }
func (worker *BleveIndexerWorker) DoJob(job *model.Job) { func (worker *BleveIndexerWorker) DoJob(job *model.Job) {
@ -269,7 +286,7 @@ func (worker *BleveIndexerWorker) DoJob(job *model.Job) {
} }
return return
case <-worker.stop: case <-worker.stopCh:
logger.Info("Worker: Indexing has been canceled via Worker Stop") logger.Info("Worker: Indexing has been canceled via Worker Stop")
if err := worker.jobServer.SetJobCanceled(job); err != nil { if err := worker.jobServer.SetJobCanceled(job); err != nil {
logger.Error("Worker: Failed to mark job as canceled", mlog.Err(err)) logger.Error("Worker: Failed to mark job as canceled", mlog.Err(err))