MM-53747: Do not start if job is in-progress as well. (#24115)

We missed this out last time. It's possible in an HA
scenario for a second pod to start later while the other
job is in-progress. In that case, it would schedule
two jobs.

https://mattermost.atlassian.net/browse/MM-53747

```release-note
NONE
```
This commit is contained in:
Agniva De Sarker 2023-07-26 20:32:50 +05:30 committed by GitHub
parent f10487c511
commit b47754e268
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 247 additions and 6 deletions

View File

@ -594,7 +594,7 @@ func (s *Server) doCloudS3PathMigrations() {
return return
} }
if _, appErr := s.Jobs.CreateJob(model.JobTypeS3PathMigration, nil); appErr != nil { if _, appErr := s.Jobs.CreateJobOnce(model.JobTypeS3PathMigration, nil); appErr != nil {
mlog.Fatal("failed to start job for migrating s3 file paths", mlog.Err(appErr)) mlog.Fatal("failed to start job for migrating s3 file paths", mlog.Err(appErr))
return return
} }

View File

@ -22,6 +22,32 @@ const (
) )
func (srv *JobServer) CreateJob(jobType string, jobData map[string]string) (*model.Job, *model.AppError) { func (srv *JobServer) CreateJob(jobType string, jobData map[string]string) (*model.Job, *model.AppError) {
job, appErr := srv._createJob(jobType, jobData)
if appErr != nil {
return nil, appErr
}
if _, err := srv.Store.Job().Save(job); err != nil {
return nil, model.NewAppError("CreateJob", "app.job.save.app_error", nil, "", http.StatusInternalServerError).Wrap(err)
}
return job, nil
}
func (srv *JobServer) CreateJobOnce(jobType string, jobData map[string]string) (*model.Job, *model.AppError) {
job, appErr := srv._createJob(jobType, jobData)
if appErr != nil {
return nil, appErr
}
if _, err := srv.Store.Job().SaveOnce(job); err != nil {
return nil, model.NewAppError("CreateJob", "app.job.save.app_error", nil, "", http.StatusInternalServerError).Wrap(err)
}
return job, nil
}
func (srv *JobServer) _createJob(jobType string, jobData map[string]string) (*model.Job, *model.AppError) {
job := model.Job{ job := model.Job{
Id: model.NewId(), Id: model.NewId(),
Type: jobType, Type: jobType,
@ -38,10 +64,6 @@ func (srv *JobServer) CreateJob(jobType string, jobData map[string]string) (*mod
return nil, model.NewAppError("Job.IsValid", "model.job.is_valid.type.app_error", nil, "id="+job.Id, http.StatusBadRequest) return nil, model.NewAppError("Job.IsValid", "model.job.is_valid.type.app_error", nil, "id="+job.Id, http.StatusBadRequest)
} }
if _, err := srv.Store.Job().Save(&job); err != nil {
return nil, model.NewAppError("CreateJob", "app.job.save.app_error", nil, "", http.StatusInternalServerError).Wrap(err)
}
return &job, nil return &job, nil
} }

View File

@ -4988,6 +4988,24 @@ func (s *OpenTracingLayerJobStore) Save(job *model.Job) (*model.Job, error) {
return result, err return result, err
} }
func (s *OpenTracingLayerJobStore) SaveOnce(job *model.Job) (*model.Job, error) {
origCtx := s.Root.Store.Context()
span, newCtx := tracing.StartSpanWithParentByContext(s.Root.Store.Context(), "JobStore.SaveOnce")
s.Root.Store.SetContext(newCtx)
defer func() {
s.Root.Store.SetContext(origCtx)
}()
defer span.Finish()
result, err := s.JobStore.SaveOnce(job)
if err != nil {
span.LogFields(spanlog.Error(err))
ext.Error.Set(span, true)
}
return result, err
}
func (s *OpenTracingLayerJobStore) UpdateOptimistically(job *model.Job, currentStatus string) (bool, error) { func (s *OpenTracingLayerJobStore) UpdateOptimistically(job *model.Job, currentStatus string) (bool, error) {
origCtx := s.Root.Store.Context() origCtx := s.Root.Store.Context()
span, newCtx := tracing.StartSpanWithParentByContext(s.Root.Store.Context(), "JobStore.UpdateOptimistically") span, newCtx := tracing.StartSpanWithParentByContext(s.Root.Store.Context(), "JobStore.UpdateOptimistically")

View File

@ -5635,6 +5635,27 @@ func (s *RetryLayerJobStore) Save(job *model.Job) (*model.Job, error) {
} }
func (s *RetryLayerJobStore) SaveOnce(job *model.Job) (*model.Job, error) {
tries := 0
for {
result, err := s.JobStore.SaveOnce(job)
if err == nil {
return result, nil
}
if !isRepeatableError(err) {
return result, err
}
tries++
if tries >= 3 {
err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures")
return result, err
}
timepkg.Sleep(100 * timepkg.Millisecond)
}
}
func (s *RetryLayerJobStore) UpdateOptimistically(job *model.Job, currentStatus string) (bool, error) { func (s *RetryLayerJobStore) UpdateOptimistically(job *model.Job, currentStatus string) (bool, error) {
tries := 0 tries := 0

View File

@ -10,6 +10,8 @@ import (
"strings" "strings"
"time" "time"
"github.com/go-sql-driver/mysql"
"github.com/lib/pq"
sq "github.com/mattermost/squirrel" sq "github.com/mattermost/squirrel"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -48,7 +50,70 @@ func (jss SqlJobStore) Save(job *model.Job) (*model.Job, error) {
} }
if _, err = jss.GetMasterX().Exec(queryString, args...); err != nil { if _, err = jss.GetMasterX().Exec(queryString, args...); err != nil {
return nil, errors.Wrap(err, "failed to save Preference") return nil, errors.Wrap(err, "failed to save Job")
}
return job, nil
}
func (jss SqlJobStore) SaveOnce(job *model.Job) (*model.Job, error) {
jsonData, err := json.Marshal(job.Data)
if err != nil {
return nil, errors.Wrap(err, "failed marshalling job data")
}
if jss.IsBinaryParamEnabled() {
jsonData = AppendBinaryFlag(jsonData)
}
tx, err := jss.GetMasterX().BeginXWithIsolation(&sql.TxOptions{
Isolation: sql.LevelSerializable,
})
if err != nil {
return nil, errors.Wrap(err, "begin_transaction")
}
defer finalizeTransactionX(tx, &err)
query, args, err := jss.getQueryBuilder().
Select("COUNT(*)").
From("Jobs").
Where(sq.Eq{
"Status": []string{model.JobStatusPending, model.JobStatusInProgress},
"Type": job.Type,
}).ToSql()
if err != nil {
return nil, errors.Wrap(err, "job_tosql")
}
var count int64
err = tx.Get(&count, query, args...)
if err != nil {
return nil, errors.Wrapf(err, "failed to count pending and in-progress jobs with type=%s", job.Type)
}
if count > 0 {
return nil, nil
}
query, args, err = jss.getQueryBuilder().
Insert("Jobs").
Columns("Id", "Type", "Priority", "CreateAt", "StartAt", "LastActivityAt", "Status", "Progress", "Data").
Values(job.Id, job.Type, job.Priority, job.CreateAt, job.StartAt, job.LastActivityAt, job.Status, job.Progress, jsonData).ToSql()
if err != nil {
return nil, errors.Wrap(err, "failed to generate sqlquery")
}
if _, err = tx.Exec(query, args...); err != nil {
if isRepeatableError(err) {
return nil, nil
}
return nil, errors.Wrap(err, "failed to save Job")
}
if err = tx.Commit(); err != nil {
if isRepeatableError(err) {
return nil, nil
}
return nil, errors.Wrap(err, "commit_transaction")
} }
return job, nil return job, nil
@ -342,3 +407,24 @@ func (jss SqlJobStore) Cleanup(expiryTime int64, batchSize int) error {
return nil return nil
} }
const mySQLDeadlockCode = uint16(1213)
// isRepeatableError is a bit of copied code from retrylayer.go.
// A little copying is fine because we don't want to import another package
// in the store layer
func isRepeatableError(err error) bool {
var pqErr *pq.Error
var mysqlErr *mysql.MySQLError
switch {
case errors.As(err, &pqErr):
if pqErr.Code == "40001" || pqErr.Code == "40P01" {
return true
}
case errors.As(err, &mysqlErr):
if mysqlErr.Number == mySQLDeadlockCode {
return true
}
}
return false
}

View File

@ -713,6 +713,10 @@ type ReactionStore interface {
type JobStore interface { type JobStore interface {
Save(job *model.Job) (*model.Job, error) Save(job *model.Job) (*model.Job, error)
// SaveOnce will only insert the job with the same category once.
// If this method is called concurrently with another job of the same type,
// then nil, nil is returned.
SaveOnce(job *model.Job) (*model.Job, error)
UpdateOptimistically(job *model.Job, currentStatus string) (bool, error) UpdateOptimistically(job *model.Job, currentStatus string) (bool, error)
UpdateStatus(id string, status string) (*model.Job, error) UpdateStatus(id string, status string) (*model.Job, error)
UpdateStatusOptimistically(id string, currentStatus string, newStatus string) (bool, error) UpdateStatusOptimistically(id string, currentStatus string, newStatus string) (bool, error)

View File

@ -5,10 +5,12 @@ package storetest
import ( import (
"errors" "errors"
"sync"
"testing" "testing"
"time" "time"
"github.com/lib/pq"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -18,6 +20,7 @@ import (
func TestJobStore(t *testing.T, ss store.Store) { func TestJobStore(t *testing.T, ss store.Store) {
t.Run("JobSaveGet", func(t *testing.T) { testJobSaveGet(t, ss) }) t.Run("JobSaveGet", func(t *testing.T) { testJobSaveGet(t, ss) })
t.Run("JobSaveOnce", func(t *testing.T) { testJobSaveOnce(t, ss) })
t.Run("JobGetAllByType", func(t *testing.T) { testJobGetAllByType(t, ss) }) t.Run("JobGetAllByType", func(t *testing.T) { testJobGetAllByType(t, ss) })
t.Run("JobGetAllByTypeAndStatus", func(t *testing.T) { testJobGetAllByTypeAndStatus(t, ss) }) t.Run("JobGetAllByTypeAndStatus", func(t *testing.T) { testJobGetAllByTypeAndStatus(t, ss) })
t.Run("JobGetAllByTypePage", func(t *testing.T) { testJobGetAllByTypePage(t, ss) }) t.Run("JobGetAllByTypePage", func(t *testing.T) { testJobGetAllByTypePage(t, ss) })
@ -56,6 +59,51 @@ func testJobSaveGet(t *testing.T, ss store.Store) {
require.Equal(t, "12345", received.Data["Total"]) require.Equal(t, "12345", received.Data["Total"])
} }
func testJobSaveOnce(t *testing.T, ss store.Store) {
var wg sync.WaitGroup
ids := make([]string, 2)
for i := 0; i < 2; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
job := &model.Job{
Id: model.NewId(),
Type: model.JobTypeS3PathMigration,
Status: model.JobStatusPending,
Data: map[string]string{
"Processed": "0",
"Total": "12345",
"LastProcessed": "abcd",
},
}
job, err := ss.Job().SaveOnce(job)
if err != nil {
var pqErr *pq.Error
if errors.As(err, &pqErr) {
t.Logf("%#v\n", pqErr)
}
}
require.NoError(t, err)
if job != nil {
ids[i] = job.Id
}
}(i)
}
wg.Wait()
cnt, err := ss.Job().GetCountByStatusAndType(model.JobStatusPending, model.JobTypeS3PathMigration)
require.NoError(t, err)
assert.Equal(t, 1, int(cnt))
for _, id := range ids {
ss.Job().Delete(id)
}
}
func testJobGetAllByType(t *testing.T, ss store.Store) { func testJobGetAllByType(t *testing.T, ss store.Store) {
jobType := model.NewId() jobType := model.NewId()

View File

@ -336,6 +336,32 @@ func (_m *JobStore) Save(job *model.Job) (*model.Job, error) {
return r0, r1 return r0, r1
} }
// SaveOnce provides a mock function with given fields: job
func (_m *JobStore) SaveOnce(job *model.Job) (*model.Job, error) {
ret := _m.Called(job)
var r0 *model.Job
var r1 error
if rf, ok := ret.Get(0).(func(*model.Job) (*model.Job, error)); ok {
return rf(job)
}
if rf, ok := ret.Get(0).(func(*model.Job) *model.Job); ok {
r0 = rf(job)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*model.Job)
}
}
if rf, ok := ret.Get(1).(func(*model.Job) error); ok {
r1 = rf(job)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// UpdateOptimistically provides a mock function with given fields: job, currentStatus // UpdateOptimistically provides a mock function with given fields: job, currentStatus
func (_m *JobStore) UpdateOptimistically(job *model.Job, currentStatus string) (bool, error) { func (_m *JobStore) UpdateOptimistically(job *model.Job, currentStatus string) (bool, error) {
ret := _m.Called(job, currentStatus) ret := _m.Called(job, currentStatus)

View File

@ -4534,6 +4534,22 @@ func (s *TimerLayerJobStore) Save(job *model.Job) (*model.Job, error) {
return result, err return result, err
} }
func (s *TimerLayerJobStore) SaveOnce(job *model.Job) (*model.Job, error) {
start := time.Now()
result, err := s.JobStore.SaveOnce(job)
elapsed := float64(time.Since(start)) / float64(time.Second)
if s.Root.Metrics != nil {
success := "false"
if err == nil {
success = "true"
}
s.Root.Metrics.ObserveStoreMethodDuration("JobStore.SaveOnce", success, elapsed)
}
return result, err
}
func (s *TimerLayerJobStore) UpdateOptimistically(job *model.Job, currentStatus string) (bool, error) { func (s *TimerLayerJobStore) UpdateOptimistically(job *model.Job, currentStatus string) (bool, error) {
start := time.Now() start := time.Now()