diff --git a/server/channels/app/migrations.go b/server/channels/app/migrations.go index 8281533963..0ddf32f0b7 100644 --- a/server/channels/app/migrations.go +++ b/server/channels/app/migrations.go @@ -594,7 +594,7 @@ func (s *Server) doCloudS3PathMigrations() { return } - if _, appErr := s.Jobs.CreateJob(model.JobTypeS3PathMigration, nil); appErr != nil { + if _, appErr := s.Jobs.CreateJobOnce(model.JobTypeS3PathMigration, nil); appErr != nil { mlog.Fatal("failed to start job for migrating s3 file paths", mlog.Err(appErr)) return } diff --git a/server/channels/jobs/jobs.go b/server/channels/jobs/jobs.go index 3715eb1713..f9f6c89600 100644 --- a/server/channels/jobs/jobs.go +++ b/server/channels/jobs/jobs.go @@ -22,6 +22,32 @@ const ( ) func (srv *JobServer) CreateJob(jobType string, jobData map[string]string) (*model.Job, *model.AppError) { + job, appErr := srv._createJob(jobType, jobData) + if appErr != nil { + return nil, appErr + } + + if _, err := srv.Store.Job().Save(job); err != nil { + return nil, model.NewAppError("CreateJob", "app.job.save.app_error", nil, "", http.StatusInternalServerError).Wrap(err) + } + + return job, nil +} + +func (srv *JobServer) CreateJobOnce(jobType string, jobData map[string]string) (*model.Job, *model.AppError) { + job, appErr := srv._createJob(jobType, jobData) + if appErr != nil { + return nil, appErr + } + + if _, err := srv.Store.Job().SaveOnce(job); err != nil { + return nil, model.NewAppError("CreateJob", "app.job.save.app_error", nil, "", http.StatusInternalServerError).Wrap(err) + } + + return job, nil +} + +func (srv *JobServer) _createJob(jobType string, jobData map[string]string) (*model.Job, *model.AppError) { job := model.Job{ Id: model.NewId(), Type: jobType, @@ -38,10 +64,6 @@ func (srv *JobServer) CreateJob(jobType string, jobData map[string]string) (*mod return nil, model.NewAppError("Job.IsValid", "model.job.is_valid.type.app_error", nil, "id="+job.Id, http.StatusBadRequest) } - if _, err := srv.Store.Job().Save(&job); err != nil { - return nil, model.NewAppError("CreateJob", "app.job.save.app_error", nil, "", http.StatusInternalServerError).Wrap(err) - } - return &job, nil } diff --git a/server/channels/store/opentracinglayer/opentracinglayer.go b/server/channels/store/opentracinglayer/opentracinglayer.go index 98deb2f11a..0ab2b248fd 100644 --- a/server/channels/store/opentracinglayer/opentracinglayer.go +++ b/server/channels/store/opentracinglayer/opentracinglayer.go @@ -4988,6 +4988,24 @@ func (s *OpenTracingLayerJobStore) Save(job *model.Job) (*model.Job, error) { return result, err } +func (s *OpenTracingLayerJobStore) SaveOnce(job *model.Job) (*model.Job, error) { + origCtx := s.Root.Store.Context() + span, newCtx := tracing.StartSpanWithParentByContext(s.Root.Store.Context(), "JobStore.SaveOnce") + s.Root.Store.SetContext(newCtx) + defer func() { + s.Root.Store.SetContext(origCtx) + }() + + defer span.Finish() + result, err := s.JobStore.SaveOnce(job) + if err != nil { + span.LogFields(spanlog.Error(err)) + ext.Error.Set(span, true) + } + + return result, err +} + func (s *OpenTracingLayerJobStore) UpdateOptimistically(job *model.Job, currentStatus string) (bool, error) { origCtx := s.Root.Store.Context() span, newCtx := tracing.StartSpanWithParentByContext(s.Root.Store.Context(), "JobStore.UpdateOptimistically") diff --git a/server/channels/store/retrylayer/retrylayer.go b/server/channels/store/retrylayer/retrylayer.go index f84a28457a..3f965768e9 100644 --- a/server/channels/store/retrylayer/retrylayer.go +++ b/server/channels/store/retrylayer/retrylayer.go @@ -5635,6 +5635,27 @@ func (s *RetryLayerJobStore) Save(job *model.Job) (*model.Job, error) { } +func (s *RetryLayerJobStore) SaveOnce(job *model.Job) (*model.Job, error) { + + tries := 0 + for { + result, err := s.JobStore.SaveOnce(job) + if err == nil { + return result, nil + } + if !isRepeatableError(err) { + return result, err + } + tries++ + if tries >= 3 { + err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures") + return result, err + } + timepkg.Sleep(100 * timepkg.Millisecond) + } + +} + func (s *RetryLayerJobStore) UpdateOptimistically(job *model.Job, currentStatus string) (bool, error) { tries := 0 diff --git a/server/channels/store/sqlstore/job_store.go b/server/channels/store/sqlstore/job_store.go index 1afb340bfb..9b56598682 100644 --- a/server/channels/store/sqlstore/job_store.go +++ b/server/channels/store/sqlstore/job_store.go @@ -10,6 +10,8 @@ import ( "strings" "time" + "github.com/go-sql-driver/mysql" + "github.com/lib/pq" sq "github.com/mattermost/squirrel" "github.com/pkg/errors" @@ -48,7 +50,70 @@ func (jss SqlJobStore) Save(job *model.Job) (*model.Job, error) { } if _, err = jss.GetMasterX().Exec(queryString, args...); err != nil { - return nil, errors.Wrap(err, "failed to save Preference") + return nil, errors.Wrap(err, "failed to save Job") + } + + return job, nil +} + +func (jss SqlJobStore) SaveOnce(job *model.Job) (*model.Job, error) { + jsonData, err := json.Marshal(job.Data) + if err != nil { + return nil, errors.Wrap(err, "failed marshalling job data") + } + if jss.IsBinaryParamEnabled() { + jsonData = AppendBinaryFlag(jsonData) + } + + tx, err := jss.GetMasterX().BeginXWithIsolation(&sql.TxOptions{ + Isolation: sql.LevelSerializable, + }) + if err != nil { + return nil, errors.Wrap(err, "begin_transaction") + } + defer finalizeTransactionX(tx, &err) + + query, args, err := jss.getQueryBuilder(). + Select("COUNT(*)"). + From("Jobs"). + Where(sq.Eq{ + "Status": []string{model.JobStatusPending, model.JobStatusInProgress}, + "Type": job.Type, + }).ToSql() + if err != nil { + return nil, errors.Wrap(err, "job_tosql") + } + + var count int64 + err = tx.Get(&count, query, args...) + if err != nil { + return nil, errors.Wrapf(err, "failed to count pending and in-progress jobs with type=%s", job.Type) + } + + if count > 0 { + return nil, nil + } + + query, args, err = jss.getQueryBuilder(). + Insert("Jobs"). + Columns("Id", "Type", "Priority", "CreateAt", "StartAt", "LastActivityAt", "Status", "Progress", "Data"). + Values(job.Id, job.Type, job.Priority, job.CreateAt, job.StartAt, job.LastActivityAt, job.Status, job.Progress, jsonData).ToSql() + if err != nil { + return nil, errors.Wrap(err, "failed to generate sqlquery") + } + + if _, err = tx.Exec(query, args...); err != nil { + if isRepeatableError(err) { + return nil, nil + } + return nil, errors.Wrap(err, "failed to save Job") + } + + if err = tx.Commit(); err != nil { + if isRepeatableError(err) { + return nil, nil + } + return nil, errors.Wrap(err, "commit_transaction") } return job, nil @@ -342,3 +407,24 @@ func (jss SqlJobStore) Cleanup(expiryTime int64, batchSize int) error { return nil } + +const mySQLDeadlockCode = uint16(1213) + +// isRepeatableError is a bit of copied code from retrylayer.go. +// A little copying is fine because we don't want to import another package +// in the store layer +func isRepeatableError(err error) bool { + var pqErr *pq.Error + var mysqlErr *mysql.MySQLError + switch { + case errors.As(err, &pqErr): + if pqErr.Code == "40001" || pqErr.Code == "40P01" { + return true + } + case errors.As(err, &mysqlErr): + if mysqlErr.Number == mySQLDeadlockCode { + return true + } + } + return false +} diff --git a/server/channels/store/store.go b/server/channels/store/store.go index 2828c91527..a03469b867 100644 --- a/server/channels/store/store.go +++ b/server/channels/store/store.go @@ -713,6 +713,10 @@ type ReactionStore interface { type JobStore interface { Save(job *model.Job) (*model.Job, error) + // SaveOnce will only insert the job with the same category once. + // If this method is called concurrently with another job of the same type, + // then nil, nil is returned. + SaveOnce(job *model.Job) (*model.Job, error) UpdateOptimistically(job *model.Job, currentStatus string) (bool, error) UpdateStatus(id string, status string) (*model.Job, error) UpdateStatusOptimistically(id string, currentStatus string, newStatus string) (bool, error) diff --git a/server/channels/store/storetest/job_store.go b/server/channels/store/storetest/job_store.go index 4ef27ee059..2f7557aca2 100644 --- a/server/channels/store/storetest/job_store.go +++ b/server/channels/store/storetest/job_store.go @@ -5,10 +5,12 @@ package storetest import ( "errors" + "sync" "testing" "time" + "github.com/lib/pq" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -18,6 +20,7 @@ import ( func TestJobStore(t *testing.T, ss store.Store) { t.Run("JobSaveGet", func(t *testing.T) { testJobSaveGet(t, ss) }) + t.Run("JobSaveOnce", func(t *testing.T) { testJobSaveOnce(t, ss) }) t.Run("JobGetAllByType", func(t *testing.T) { testJobGetAllByType(t, ss) }) t.Run("JobGetAllByTypeAndStatus", func(t *testing.T) { testJobGetAllByTypeAndStatus(t, ss) }) t.Run("JobGetAllByTypePage", func(t *testing.T) { testJobGetAllByTypePage(t, ss) }) @@ -56,6 +59,51 @@ func testJobSaveGet(t *testing.T, ss store.Store) { require.Equal(t, "12345", received.Data["Total"]) } +func testJobSaveOnce(t *testing.T, ss store.Store) { + var wg sync.WaitGroup + + ids := make([]string, 2) + for i := 0; i < 2; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + job := &model.Job{ + Id: model.NewId(), + Type: model.JobTypeS3PathMigration, + Status: model.JobStatusPending, + Data: map[string]string{ + "Processed": "0", + "Total": "12345", + "LastProcessed": "abcd", + }, + } + + job, err := ss.Job().SaveOnce(job) + if err != nil { + var pqErr *pq.Error + if errors.As(err, &pqErr) { + t.Logf("%#v\n", pqErr) + } + } + require.NoError(t, err) + + if job != nil { + ids[i] = job.Id + } + }(i) + } + + wg.Wait() + + cnt, err := ss.Job().GetCountByStatusAndType(model.JobStatusPending, model.JobTypeS3PathMigration) + require.NoError(t, err) + assert.Equal(t, 1, int(cnt)) + + for _, id := range ids { + ss.Job().Delete(id) + } +} + func testJobGetAllByType(t *testing.T, ss store.Store) { jobType := model.NewId() diff --git a/server/channels/store/storetest/mocks/JobStore.go b/server/channels/store/storetest/mocks/JobStore.go index e2b2f90ad9..c008f34339 100644 --- a/server/channels/store/storetest/mocks/JobStore.go +++ b/server/channels/store/storetest/mocks/JobStore.go @@ -336,6 +336,32 @@ func (_m *JobStore) Save(job *model.Job) (*model.Job, error) { return r0, r1 } +// SaveOnce provides a mock function with given fields: job +func (_m *JobStore) SaveOnce(job *model.Job) (*model.Job, error) { + ret := _m.Called(job) + + var r0 *model.Job + var r1 error + if rf, ok := ret.Get(0).(func(*model.Job) (*model.Job, error)); ok { + return rf(job) + } + if rf, ok := ret.Get(0).(func(*model.Job) *model.Job); ok { + r0 = rf(job) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*model.Job) + } + } + + if rf, ok := ret.Get(1).(func(*model.Job) error); ok { + r1 = rf(job) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // UpdateOptimistically provides a mock function with given fields: job, currentStatus func (_m *JobStore) UpdateOptimistically(job *model.Job, currentStatus string) (bool, error) { ret := _m.Called(job, currentStatus) diff --git a/server/channels/store/timerlayer/timerlayer.go b/server/channels/store/timerlayer/timerlayer.go index 6b08162fe8..e12927a33c 100644 --- a/server/channels/store/timerlayer/timerlayer.go +++ b/server/channels/store/timerlayer/timerlayer.go @@ -4534,6 +4534,22 @@ func (s *TimerLayerJobStore) Save(job *model.Job) (*model.Job, error) { return result, err } +func (s *TimerLayerJobStore) SaveOnce(job *model.Job) (*model.Job, error) { + start := time.Now() + + result, err := s.JobStore.SaveOnce(job) + + elapsed := float64(time.Since(start)) / float64(time.Second) + if s.Root.Metrics != nil { + success := "false" + if err == nil { + success = "true" + } + s.Root.Metrics.ObserveStoreMethodDuration("JobStore.SaveOnce", success, elapsed) + } + return result, err +} + func (s *TimerLayerJobStore) UpdateOptimistically(job *model.Job, currentStatus string) (bool, error) { start := time.Now()