From f934502a562a763d817b10fe9d6d1f1ad9430bb6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Espino?= Date: Sat, 15 Jun 2019 17:55:06 +0200 Subject: [PATCH] Migrate Jobs store to sync by default (#11183) * Migrate Jobs store to sync by default * Fixing compilation * Fixing compilation * Fixing govet --- api4/job_test.go | 16 +- app/job.go | 18 +- app/job_test.go | 10 +- jobs/jobs.go | 103 +++++---- jobs/jobs_watcher.go | 94 ++++---- migrations/helper_test.go | 17 +- migrations/migrations.go | 29 +-- migrations/migrations_test.go | 10 +- store/sqlstore/job_store.go | 354 +++++++++++------------------- store/store.go | 24 +- store/storetest/job_store.go | 285 +++++++++++------------- store/storetest/mocks/JobStore.go | 229 +++++++++++++------ 12 files changed, 570 insertions(+), 619 deletions(-) diff --git a/api4/job_test.go b/api4/job_test.go index 2ba69c37b0..61172897de 100644 --- a/api4/job_test.go +++ b/api4/job_test.go @@ -8,7 +8,7 @@ import ( "testing" "github.com/mattermost/mattermost-server/model" - "github.com/mattermost/mattermost-server/store" + "github.com/stretchr/testify/require" ) func TestCreateJob(t *testing.T) { @@ -46,9 +46,8 @@ func TestGetJob(t *testing.T) { Id: model.NewId(), Status: model.JOB_STATUS_PENDING, } - if result := <-th.App.Srv.Store.Job().Save(job); result.Err != nil { - t.Fatal(result.Err) - } + _, err := th.App.Srv.Store.Job().Save(job) + require.Nil(t, err) defer th.App.Srv.Store.Job().Delete(job.Id) @@ -95,7 +94,8 @@ func TestGetJobs(t *testing.T) { } for _, job := range jobs { - store.Must(th.App.Srv.Store.Job().Save(job)) + _, err := th.App.Srv.Store.Job().Save(job) + require.Nil(t, err) defer th.App.Srv.Store.Job().Delete(job.Id) } @@ -151,7 +151,8 @@ func TestGetJobsByType(t *testing.T) { } for _, job := range jobs { - store.Must(th.App.Srv.Store.Job().Save(job)) + _, err := th.App.Srv.Store.Job().Save(job) + require.Nil(t, err) defer th.App.Srv.Store.Job().Delete(job.Id) } @@ -208,7 +209,8 @@ func TestCancelJob(t *testing.T) { } for _, job := range jobs { - store.Must(th.App.Srv.Store.Job().Save(job)) + _, err := th.App.Srv.Store.Job().Save(job) + require.Nil(t, err) defer th.App.Srv.Store.Job().Delete(job.Id) } diff --git a/app/job.go b/app/job.go index 50ce346f09..da61119e9e 100644 --- a/app/job.go +++ b/app/job.go @@ -8,11 +8,7 @@ import ( ) func (a *App) GetJob(id string) (*model.Job, *model.AppError) { - result := <-a.Srv.Store.Job().Get(id) - if result.Err != nil { - return nil, result.Err - } - return result.Data.(*model.Job), nil + return a.Srv.Store.Job().Get(id) } func (a *App) GetJobsPage(page int, perPage int) ([]*model.Job, *model.AppError) { @@ -20,11 +16,7 @@ func (a *App) GetJobsPage(page int, perPage int) ([]*model.Job, *model.AppError) } func (a *App) GetJobs(offset int, limit int) ([]*model.Job, *model.AppError) { - result := <-a.Srv.Store.Job().GetAllPage(offset, limit) - if result.Err != nil { - return nil, result.Err - } - return result.Data.([]*model.Job), nil + return a.Srv.Store.Job().GetAllPage(offset, limit) } func (a *App) GetJobsByTypePage(jobType string, page int, perPage int) ([]*model.Job, *model.AppError) { @@ -32,11 +24,7 @@ func (a *App) GetJobsByTypePage(jobType string, page int, perPage int) ([]*model } func (a *App) GetJobsByType(jobType string, offset int, limit int) ([]*model.Job, *model.AppError) { - result := <-a.Srv.Store.Job().GetAllByTypePage(jobType, offset, limit) - if result.Err != nil { - return nil, result.Err - } - return result.Data.([]*model.Job), nil + return a.Srv.Store.Job().GetAllByTypePage(jobType, offset, limit) } func (a *App) CreateJob(job *model.Job) (*model.Job, *model.AppError) { diff --git a/app/job_test.go b/app/job_test.go index ca3d1c7f61..6a3b0f6357 100644 --- a/app/job_test.go +++ b/app/job_test.go @@ -7,7 +7,7 @@ import ( "testing" "github.com/mattermost/mattermost-server/model" - "github.com/mattermost/mattermost-server/store" + "github.com/stretchr/testify/require" ) func TestGetJob(t *testing.T) { @@ -18,9 +18,8 @@ func TestGetJob(t *testing.T) { Id: model.NewId(), Status: model.NewId(), } - if result := <-th.App.Srv.Store.Job().Save(status); result.Err != nil { - t.Fatal(result.Err) - } + _, err := th.App.Srv.Store.Job().Save(status) + require.Nil(t, err) defer th.App.Srv.Store.Job().Delete(status.Id) @@ -56,7 +55,8 @@ func TestGetJobByType(t *testing.T) { } for _, status := range statuses { - store.Must(th.App.Srv.Store.Job().Save(status)) + _, err := th.App.Srv.Store.Job().Save(status) + require.Nil(t, err) defer th.App.Srv.Store.Job().Delete(status.Id) } diff --git a/jobs/jobs.go b/jobs/jobs.go index ddbc4489b1..8c97a450fc 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -31,50 +31,42 @@ func (srv *JobServer) CreateJob(jobType string, jobData map[string]string) (*mod return nil, err } - if result := <-srv.Store.Job().Save(&job); result.Err != nil { - return nil, result.Err + if _, err := srv.Store.Job().Save(&job); err != nil { + return nil, err } return &job, nil } func (srv *JobServer) GetJob(id string) (*model.Job, *model.AppError) { - if result := <-srv.Store.Job().Get(id); result.Err != nil { - return nil, result.Err - } else { - return result.Data.(*model.Job), nil - } + return srv.Store.Job().Get(id) } func (srv *JobServer) ClaimJob(job *model.Job) (bool, *model.AppError) { - if result := <-srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_IN_PROGRESS); result.Err != nil { - return false, result.Err - } else { - success := result.Data.(bool) - return success, nil - } + return srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_IN_PROGRESS) } func (srv *JobServer) SetJobProgress(job *model.Job, progress int64) *model.AppError { job.Status = model.JOB_STATUS_IN_PROGRESS job.Progress = progress - if result := <-srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil { - return result.Err - } else { - return nil + if _, err := srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); err != nil { + return err } + return nil } func (srv *JobServer) SetJobSuccess(job *model.Job) *model.AppError { - result := <-srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_SUCCESS) - return result.Err + if _, err := srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_SUCCESS); err != nil { + return err + } + return nil } func (srv *JobServer) SetJobError(job *model.Job, jobError *model.AppError) *model.AppError { if jobError == nil { - result := <-srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_ERROR) - return result.Err + _, err := srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_ERROR) + return err } job.Status = model.JOB_STATUS_ERROR @@ -84,17 +76,18 @@ func (srv *JobServer) SetJobError(job *model.Job, jobError *model.AppError) *mod } job.Data["error"] = jobError.Message + " — " + jobError.DetailedError - if result := <-srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil { - return result.Err - } else { - if !result.Data.(bool) { - if result := <-srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil { - return result.Err - } else { - if !result.Data.(bool) { - return model.NewAppError("Jobs.SetJobError", "jobs.set_job_error.update.error", nil, "id="+job.Id, http.StatusInternalServerError) - } - } + updated, err := srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS) + if err != nil { + return err + } + + if !updated { + updated, err = srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_CANCEL_REQUESTED) + if err != nil { + return err + } + if !updated { + return model.NewAppError("Jobs.SetJobError", "jobs.set_job_error.update.error", nil, "id="+job.Id, http.StatusInternalServerError) } } @@ -102,27 +95,36 @@ func (srv *JobServer) SetJobError(job *model.Job, jobError *model.AppError) *mod } func (srv *JobServer) SetJobCanceled(job *model.Job) *model.AppError { - result := <-srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_CANCELED) - return result.Err + if _, err := srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_CANCELED); err != nil { + return err + } + return nil } func (srv *JobServer) UpdateInProgressJobData(job *model.Job) *model.AppError { job.Status = model.JOB_STATUS_IN_PROGRESS job.LastActivityAt = model.GetMillis() - result := <-srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS) - return result.Err + if _, err := srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); err != nil { + return err + } + return nil } func (srv *JobServer) RequestCancellation(jobId string) *model.AppError { - if result := <-srv.Store.Job().UpdateStatusOptimistically(jobId, model.JOB_STATUS_PENDING, model.JOB_STATUS_CANCELED); result.Err != nil { - return result.Err - } else if result.Data.(bool) { + updated, err := srv.Store.Job().UpdateStatusOptimistically(jobId, model.JOB_STATUS_PENDING, model.JOB_STATUS_CANCELED) + if err != nil { + return err + } + if updated { return nil } - if result := <-srv.Store.Job().UpdateStatusOptimistically(jobId, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil { - return result.Err - } else if result.Data.(bool) { + updated, err = srv.Store.Job().UpdateStatusOptimistically(jobId, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_CANCEL_REQUESTED) + if err != nil { + return err + } + + if updated { return nil } @@ -137,8 +139,7 @@ func (srv *JobServer) CancellationWatcher(ctx context.Context, jobId string, can return case <-time.After(CANCEL_WATCHER_POLLING_INTERVAL * time.Millisecond): mlog.Debug(fmt.Sprintf("CancellationWatcher for Job: %v polling.", jobId)) - if result := <-srv.Store.Job().Get(jobId); result.Err == nil { - jobStatus := result.Data.(*model.Job) + if jobStatus, err := srv.Store.Job().Get(jobId); err == nil { if jobStatus.Status == model.JOB_STATUS_CANCEL_REQUESTED { close(cancelChan) return @@ -159,17 +160,13 @@ func GenerateNextStartDateTime(now time.Time, nextStartTime time.Time) *time.Tim } func (srv *JobServer) CheckForPendingJobsByType(jobType string) (bool, *model.AppError) { - if result := <-srv.Store.Job().GetCountByStatusAndType(model.JOB_STATUS_PENDING, jobType); result.Err != nil { - return false, result.Err - } else { - return result.Data.(int64) > 0, nil + count, err := srv.Store.Job().GetCountByStatusAndType(model.JOB_STATUS_PENDING, jobType) + if err != nil { + return false, err } + return count > 0, nil } func (srv *JobServer) GetLastSuccessfulJobByType(jobType string) (*model.Job, *model.AppError) { - if result := <-srv.Store.Job().GetNewestJobByStatusAndType(model.JOB_STATUS_SUCCESS, jobType); result.Err != nil { - return nil, result.Err - } else { - return result.Data.(*model.Job), nil - } + return srv.Store.Job().GetNewestJobByStatusAndType(model.JOB_STATUS_SUCCESS, jobType) } diff --git a/jobs/jobs_watcher.go b/jobs/jobs_watcher.go index 523d02b990..632c4e3bbd 100644 --- a/jobs/jobs_watcher.go +++ b/jobs/jobs_watcher.go @@ -66,60 +66,60 @@ func (watcher *Watcher) Stop() { } func (watcher *Watcher) PollAndNotify() { - if result := <-watcher.srv.Store.Job().GetAllByStatus(model.JOB_STATUS_PENDING); result.Err != nil { - mlog.Error(fmt.Sprintf("Error occurred getting all pending statuses: %v", result.Err.Error())) - } else { - jobs := result.Data.([]*model.Job) + jobs, err := watcher.srv.Store.Job().GetAllByStatus(model.JOB_STATUS_PENDING) + if err != nil { + mlog.Error(fmt.Sprintf("Error occurred getting all pending statuses: %v", err.Error())) + return + } - for _, job := range jobs { - if job.Type == model.JOB_TYPE_DATA_RETENTION { - if watcher.workers.DataRetention != nil { - select { - case watcher.workers.DataRetention.JobChannel() <- *job: - default: - } + for _, job := range jobs { + if job.Type == model.JOB_TYPE_DATA_RETENTION { + if watcher.workers.DataRetention != nil { + select { + case watcher.workers.DataRetention.JobChannel() <- *job: + default: } - } else if job.Type == model.JOB_TYPE_MESSAGE_EXPORT { - if watcher.workers.MessageExport != nil { - select { - case watcher.workers.MessageExport.JobChannel() <- *job: - default: - } + } + } else if job.Type == model.JOB_TYPE_MESSAGE_EXPORT { + if watcher.workers.MessageExport != nil { + select { + case watcher.workers.MessageExport.JobChannel() <- *job: + default: } - } else if job.Type == model.JOB_TYPE_ELASTICSEARCH_POST_INDEXING { - if watcher.workers.ElasticsearchIndexing != nil { - select { - case watcher.workers.ElasticsearchIndexing.JobChannel() <- *job: - default: - } + } + } else if job.Type == model.JOB_TYPE_ELASTICSEARCH_POST_INDEXING { + if watcher.workers.ElasticsearchIndexing != nil { + select { + case watcher.workers.ElasticsearchIndexing.JobChannel() <- *job: + default: } - } else if job.Type == model.JOB_TYPE_ELASTICSEARCH_POST_AGGREGATION { - if watcher.workers.ElasticsearchAggregation != nil { - select { - case watcher.workers.ElasticsearchAggregation.JobChannel() <- *job: - default: - } + } + } else if job.Type == model.JOB_TYPE_ELASTICSEARCH_POST_AGGREGATION { + if watcher.workers.ElasticsearchAggregation != nil { + select { + case watcher.workers.ElasticsearchAggregation.JobChannel() <- *job: + default: } - } else if job.Type == model.JOB_TYPE_LDAP_SYNC { - if watcher.workers.LdapSync != nil { - select { - case watcher.workers.LdapSync.JobChannel() <- *job: - default: - } + } + } else if job.Type == model.JOB_TYPE_LDAP_SYNC { + if watcher.workers.LdapSync != nil { + select { + case watcher.workers.LdapSync.JobChannel() <- *job: + default: } - } else if job.Type == model.JOB_TYPE_MIGRATIONS { - if watcher.workers.Migrations != nil { - select { - case watcher.workers.Migrations.JobChannel() <- *job: - default: - } + } + } else if job.Type == model.JOB_TYPE_MIGRATIONS { + if watcher.workers.Migrations != nil { + select { + case watcher.workers.Migrations.JobChannel() <- *job: + default: } - } else if job.Type == model.JOB_TYPE_PLUGINS { - if watcher.workers.Plugins != nil { - select { - case watcher.workers.Plugins.JobChannel() <- *job: - default: - } + } + } else if job.Type == model.JOB_TYPE_PLUGINS { + if watcher.workers.Plugins != nil { + select { + case watcher.workers.Plugins.JobChannel() <- *job: + default: } } } diff --git a/migrations/helper_test.go b/migrations/helper_test.go index 9bda859720..6f895b4f18 100644 --- a/migrations/helper_test.go +++ b/migrations/helper_test.go @@ -267,16 +267,15 @@ func (me *TestHelper) ResetRoleMigration() { } func (me *TestHelper) DeleteAllJobsByTypeAndMigrationKey(jobType string, migrationKey string) { - if res := <-me.App.Srv.Store.Job().GetAllByType(model.JOB_TYPE_MIGRATIONS); res.Err != nil { - panic(res.Err) - } else { - jobs := res.Data.([]*model.Job) + jobs, err := me.App.Srv.Store.Job().GetAllByType(model.JOB_TYPE_MIGRATIONS) + if err != nil { + panic(err) + } - for _, job := range jobs { - if key, ok := job.Data[JOB_DATA_KEY_MIGRATION]; ok && key == migrationKey { - if res := <-me.App.Srv.Store.Job().Delete(job.Id); res.Err != nil { - panic(res.Err) - } + for _, job := range jobs { + if key, ok := job.Data[JOB_DATA_KEY_MIGRATION]; ok && key == migrationKey { + if _, err = me.App.Srv.Store.Job().Delete(job.Id); err != nil { + panic(err) } } } diff --git a/migrations/migrations.go b/migrations/migrations.go index 59d71f25d2..39bcd8c591 100644 --- a/migrations/migrations.go +++ b/migrations/migrations.go @@ -40,21 +40,22 @@ func GetMigrationState(migration string, store store.Store) (string, *model.Job, return MIGRATION_STATE_COMPLETED, nil, nil } - if result := <-store.Job().GetAllByType(model.JOB_TYPE_MIGRATIONS); result.Err != nil { - return "", nil, result.Err - } else { - for _, job := range result.Data.([]*model.Job) { - if key, ok := job.Data[JOB_DATA_KEY_MIGRATION]; ok { - if key != migration { - continue - } + jobs, err := store.Job().GetAllByType(model.JOB_TYPE_MIGRATIONS) + if err != nil { + return "", nil, err + } - switch job.Status { - case model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_PENDING: - return MIGRATION_STATE_IN_PROGRESS, job, nil - default: - return MIGRATION_STATE_UNSCHEDULED, job, nil - } + for _, job := range jobs { + if key, ok := job.Data[JOB_DATA_KEY_MIGRATION]; ok { + if key != migration { + continue + } + + switch job.Status { + case model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_PENDING: + return MIGRATION_STATE_IN_PROGRESS, job, nil + default: + return MIGRATION_STATE_UNSCHEDULED, job, nil } } } diff --git a/migrations/migrations_test.go b/migrations/migrations_test.go index db07073c50..a21c222d49 100644 --- a/migrations/migrations_test.go +++ b/migrations/migrations_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/mattermost/mattermost-server/model" ) @@ -52,7 +53,8 @@ func TestGetMigrationState(t *testing.T) { Type: model.JOB_TYPE_MIGRATIONS, } - j1 = (<-th.App.Srv.Store.Job().Save(j1)).Data.(*model.Job) + j1, err = th.App.Srv.Store.Job().Save(j1) + require.Nil(t, err) state, job, err = GetMigrationState(migrationKey, th.App.Srv.Store) assert.Nil(t, err) @@ -70,7 +72,8 @@ func TestGetMigrationState(t *testing.T) { Type: model.JOB_TYPE_MIGRATIONS, } - j2 = (<-th.App.Srv.Store.Job().Save(j2)).Data.(*model.Job) + j2, err = th.App.Srv.Store.Job().Save(j2) + require.Nil(t, err) state, job, err = GetMigrationState(migrationKey, th.App.Srv.Store) assert.Nil(t, err) @@ -88,7 +91,8 @@ func TestGetMigrationState(t *testing.T) { Type: model.JOB_TYPE_MIGRATIONS, } - j3 = (<-th.App.Srv.Store.Job().Save(j3)).Data.(*model.Job) + j3, err = th.App.Srv.Store.Job().Save(j3) + require.Nil(t, err) state, job, err = GetMigrationState(migrationKey, th.App.Srv.Store) assert.Nil(t, err) diff --git a/store/sqlstore/job_store.go b/store/sqlstore/job_store.go index 691b096385..d157d38934 100644 --- a/store/sqlstore/job_store.go +++ b/store/sqlstore/job_store.go @@ -34,267 +34,161 @@ func (jss SqlJobStore) CreateIndexesIfNotExists() { jss.CreateIndexIfNotExists("idx_jobs_type", "Jobs", "Type") } -func (jss SqlJobStore) Save(job *model.Job) store.StoreChannel { - return store.Do(func(result *store.StoreResult) { - if err := jss.GetMaster().Insert(job); err != nil { - result.Err = model.NewAppError("SqlJobStore.Save", "store.sql_job.save.app_error", nil, "id="+job.Id+", "+err.Error(), http.StatusInternalServerError) - } else { - result.Data = job - } - }) +func (jss SqlJobStore) Save(job *model.Job) (*model.Job, *model.AppError) { + if err := jss.GetMaster().Insert(job); err != nil { + return nil, model.NewAppError("SqlJobStore.Save", "store.sql_job.save.app_error", nil, "id="+job.Id+", "+err.Error(), http.StatusInternalServerError) + } + return job, nil } -func (jss SqlJobStore) UpdateOptimistically(job *model.Job, currentStatus string) store.StoreChannel { - return store.Do(func(result *store.StoreResult) { - if sqlResult, err := jss.GetMaster().Exec( - `UPDATE - Jobs - SET - LastActivityAt = :LastActivityAt, - Status = :Status, - Progress = :Progress, - Data = :Data - WHERE - Id = :Id - AND - Status = :OldStatus`, - map[string]interface{}{ - "Id": job.Id, - "OldStatus": currentStatus, - "LastActivityAt": model.GetMillis(), - "Status": job.Status, - "Data": job.DataToJson(), - "Progress": job.Progress, - }); err != nil { - result.Err = model.NewAppError("SqlJobStore.UpdateOptimistically", "store.sql_job.update.app_error", nil, "id="+job.Id+", "+err.Error(), http.StatusInternalServerError) - } else { - rows, err := sqlResult.RowsAffected() +func (jss SqlJobStore) UpdateOptimistically(job *model.Job, currentStatus string) (bool, *model.AppError) { + query := "UPDATE Jobs SET LastActivityAt = :LastActivityAt, Status = :Status, Progress = :Progress, Data = :Data WHERE Id = :Id AND Status = :OldStatus" + params := map[string]interface{}{ + "Id": job.Id, + "OldStatus": currentStatus, + "LastActivityAt": model.GetMillis(), + "Status": job.Status, + "Data": job.DataToJson(), + "Progress": job.Progress, + } + sqlResult, err := jss.GetMaster().Exec(query, params) + if err != nil { + return false, model.NewAppError("SqlJobStore.UpdateOptimistically", "store.sql_job.update.app_error", nil, "id="+job.Id+", "+err.Error(), http.StatusInternalServerError) + } - if err != nil { - result.Err = model.NewAppError("SqlJobStore.UpdateStatus", "store.sql_job.update.app_error", nil, "id="+job.Id+", "+err.Error(), http.StatusInternalServerError) - } else { - if rows == 1 { - result.Data = true - } else { - result.Data = false - } - } - } - }) + rows, err := sqlResult.RowsAffected() + + if err != nil { + return false, model.NewAppError("SqlJobStore.UpdateStatus", "store.sql_job.update.app_error", nil, "id="+job.Id+", "+err.Error(), http.StatusInternalServerError) + } + + if rows != 1 { + return false, nil + } + + return true, nil } -func (jss SqlJobStore) UpdateStatus(id string, status string) store.StoreChannel { - return store.Do(func(result *store.StoreResult) { - job := &model.Job{ - Id: id, - Status: status, - LastActivityAt: model.GetMillis(), - } +func (jss SqlJobStore) UpdateStatus(id string, status string) (*model.Job, *model.AppError) { + job := &model.Job{ + Id: id, + Status: status, + LastActivityAt: model.GetMillis(), + } - if _, err := jss.GetMaster().UpdateColumns(func(col *gorp.ColumnMap) bool { - return col.ColumnName == "Status" || col.ColumnName == "LastActivityAt" - }, job); err != nil { - result.Err = model.NewAppError("SqlJobStore.UpdateStatus", "store.sql_job.update.app_error", nil, "id="+id+", "+err.Error(), http.StatusInternalServerError) - } + if _, err := jss.GetMaster().UpdateColumns(func(col *gorp.ColumnMap) bool { + return col.ColumnName == "Status" || col.ColumnName == "LastActivityAt" + }, job); err != nil { + return nil, model.NewAppError("SqlJobStore.UpdateStatus", "store.sql_job.update.app_error", nil, "id="+id+", "+err.Error(), http.StatusInternalServerError) + } - if result.Err == nil { - result.Data = job - } - }) + return job, nil } -func (jss SqlJobStore) UpdateStatusOptimistically(id string, currentStatus string, newStatus string) store.StoreChannel { - return store.Do(func(result *store.StoreResult) { - var startAtClause string - if newStatus == model.JOB_STATUS_IN_PROGRESS { - startAtClause = `StartAt = :StartAt,` - } +func (jss SqlJobStore) UpdateStatusOptimistically(id string, currentStatus string, newStatus string) (bool, *model.AppError) { + var startAtClause string + if newStatus == model.JOB_STATUS_IN_PROGRESS { + startAtClause = "StartAt = :StartAt," + } + query := "UPDATE Jobs SET " + startAtClause + " Status = :NewStatus, LastActivityAt = :LastActivityAt WHERE Id = :Id AND Status = :OldStatus" + params := map[string]interface{}{ + "Id": id, + "OldStatus": currentStatus, + "NewStatus": newStatus, + "StartAt": model.GetMillis(), + "LastActivityAt": model.GetMillis(), + } - if sqlResult, err := jss.GetMaster().Exec( - `UPDATE - Jobs - SET `+startAtClause+` - Status = :NewStatus, - LastActivityAt = :LastActivityAt - WHERE - Id = :Id - AND - Status = :OldStatus`, map[string]interface{}{"Id": id, "OldStatus": currentStatus, "NewStatus": newStatus, "StartAt": model.GetMillis(), "LastActivityAt": model.GetMillis()}); err != nil { - result.Err = model.NewAppError("SqlJobStore.UpdateStatus", "store.sql_job.update.app_error", nil, "id="+id+", "+err.Error(), http.StatusInternalServerError) - } else { - rows, err := sqlResult.RowsAffected() + sqlResult, err := jss.GetMaster().Exec(query, params) + if err != nil { + return false, model.NewAppError("SqlJobStore.UpdateStatus", "store.sql_job.update.app_error", nil, "id="+id+", "+err.Error(), http.StatusInternalServerError) + } + rows, err := sqlResult.RowsAffected() + if err != nil { + return false, model.NewAppError("SqlJobStore.UpdateStatus", "store.sql_job.update.app_error", nil, "id="+id+", "+err.Error(), http.StatusInternalServerError) + } + if rows != 1 { + return false, nil + } - if err != nil { - result.Err = model.NewAppError("SqlJobStore.UpdateStatus", "store.sql_job.update.app_error", nil, "id="+id+", "+err.Error(), http.StatusInternalServerError) - } else { - if rows == 1 { - result.Data = true - } else { - result.Data = false - } - } - } - }) + return true, nil } -func (jss SqlJobStore) Get(id string) store.StoreChannel { - return store.Do(func(result *store.StoreResult) { - var status *model.Job +func (jss SqlJobStore) Get(id string) (*model.Job, *model.AppError) { + query := "SELECT * FROM Jobs WHERE Id = :Id" - if err := jss.GetReplica().SelectOne(&status, - `SELECT - * - FROM - Jobs - WHERE - Id = :Id`, map[string]interface{}{"Id": id}); err != nil { - if err == sql.ErrNoRows { - result.Err = model.NewAppError("SqlJobStore.Get", "store.sql_job.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusNotFound) - } else { - result.Err = model.NewAppError("SqlJobStore.Get", "store.sql_job.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusInternalServerError) - } - } else { - result.Data = status + var status *model.Job + if err := jss.GetReplica().SelectOne(&status, query, map[string]interface{}{"Id": id}); err != nil { + if err == sql.ErrNoRows { + return nil, model.NewAppError("SqlJobStore.Get", "store.sql_job.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusNotFound) } - }) + return nil, model.NewAppError("SqlJobStore.Get", "store.sql_job.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusInternalServerError) + } + return status, nil } -func (jss SqlJobStore) GetAllPage(offset int, limit int) store.StoreChannel { - return store.Do(func(result *store.StoreResult) { - var statuses []*model.Job +func (jss SqlJobStore) GetAllPage(offset int, limit int) ([]*model.Job, *model.AppError) { + query := "SELECT * FROM Jobs ORDER BY CreateAt DESC LIMIT :Limit OFFSET :Offset" - if _, err := jss.GetReplica().Select(&statuses, - `SELECT - * - FROM - Jobs - ORDER BY - CreateAt DESC - LIMIT - :Limit - OFFSET - :Offset`, map[string]interface{}{"Limit": limit, "Offset": offset}); err != nil { - result.Err = model.NewAppError("SqlJobStore.GetAllPage", "store.sql_job.get_all.app_error", nil, err.Error(), http.StatusInternalServerError) - } else { - result.Data = statuses - } - }) + var statuses []*model.Job + if _, err := jss.GetReplica().Select(&statuses, query, map[string]interface{}{"Limit": limit, "Offset": offset}); err != nil { + return nil, model.NewAppError("SqlJobStore.GetAllPage", "store.sql_job.get_all.app_error", nil, err.Error(), http.StatusInternalServerError) + } + return statuses, nil } -func (jss SqlJobStore) GetAllByType(jobType string) store.StoreChannel { - return store.Do(func(result *store.StoreResult) { - var statuses []*model.Job - - if _, err := jss.GetReplica().Select(&statuses, - `SELECT - * - FROM - Jobs - WHERE - Type = :Type - ORDER BY - CreateAt DESC`, map[string]interface{}{"Type": jobType}); err != nil { - result.Err = model.NewAppError("SqlJobStore.GetAllByType", "store.sql_job.get_all.app_error", nil, "Type="+jobType+", "+err.Error(), http.StatusInternalServerError) - } else { - result.Data = statuses - } - }) +func (jss SqlJobStore) GetAllByType(jobType string) ([]*model.Job, *model.AppError) { + query := "SELECT * FROM Jobs WHERE Type = :Type ORDER BY CreateAt DESC" + var statuses []*model.Job + if _, err := jss.GetReplica().Select(&statuses, query, map[string]interface{}{"Type": jobType}); err != nil { + return nil, model.NewAppError("SqlJobStore.GetAllByType", "store.sql_job.get_all.app_error", nil, "Type="+jobType+", "+err.Error(), http.StatusInternalServerError) + } + return statuses, nil } -func (jss SqlJobStore) GetAllByTypePage(jobType string, offset int, limit int) store.StoreChannel { - return store.Do(func(result *store.StoreResult) { - var statuses []*model.Job +func (jss SqlJobStore) GetAllByTypePage(jobType string, offset int, limit int) ([]*model.Job, *model.AppError) { + query := "SELECT * FROM Jobs WHERE Type = :Type ORDER BY CreateAt DESC LIMIT :Limit OFFSET :Offset" - if _, err := jss.GetReplica().Select(&statuses, - `SELECT - * - FROM - Jobs - WHERE - Type = :Type - ORDER BY - CreateAt DESC - LIMIT - :Limit - OFFSET - :Offset`, map[string]interface{}{"Type": jobType, "Limit": limit, "Offset": offset}); err != nil { - result.Err = model.NewAppError("SqlJobStore.GetAllByTypePage", "store.sql_job.get_all.app_error", nil, "Type="+jobType+", "+err.Error(), http.StatusInternalServerError) - } else { - result.Data = statuses - } - }) + var statuses []*model.Job + if _, err := jss.GetReplica().Select(&statuses, query, map[string]interface{}{"Type": jobType, "Limit": limit, "Offset": offset}); err != nil { + return nil, model.NewAppError("SqlJobStore.GetAllByTypePage", "store.sql_job.get_all.app_error", nil, "Type="+jobType+", "+err.Error(), http.StatusInternalServerError) + } + return statuses, nil } -func (jss SqlJobStore) GetAllByStatus(status string) store.StoreChannel { - return store.Do(func(result *store.StoreResult) { - var statuses []*model.Job +func (jss SqlJobStore) GetAllByStatus(status string) ([]*model.Job, *model.AppError) { + var statuses []*model.Job + query := "SELECT * FROM Jobs WHERE Status = :Status ORDER BY CreateAt ASC" - if _, err := jss.GetReplica().Select(&statuses, - `SELECT - * - FROM - Jobs - WHERE - Status = :Status - ORDER BY - CreateAt ASC`, map[string]interface{}{"Status": status}); err != nil { - result.Err = model.NewAppError("SqlJobStore.GetAllByStatus", "store.sql_job.get_all.app_error", nil, "Status="+status+", "+err.Error(), http.StatusInternalServerError) - } else { - result.Data = statuses - } - }) + if _, err := jss.GetReplica().Select(&statuses, query, map[string]interface{}{"Status": status}); err != nil { + return nil, model.NewAppError("SqlJobStore.GetAllByStatus", "store.sql_job.get_all.app_error", nil, "Status="+status+", "+err.Error(), http.StatusInternalServerError) + } + return statuses, nil } -func (jss SqlJobStore) GetNewestJobByStatusAndType(status string, jobType string) store.StoreChannel { - return store.Do(func(result *store.StoreResult) { - var job *model.Job +func (jss SqlJobStore) GetNewestJobByStatusAndType(status string, jobType string) (*model.Job, *model.AppError) { + query := "SELECT * FROM Jobs WHERE Status = :Status AND Type = :Type ORDER BY CreateAt DESC LIMIT 1" - if err := jss.GetReplica().SelectOne(&job, - `SELECT - * - FROM - Jobs - WHERE - Status = :Status - AND - Type = :Type - ORDER BY - CreateAt DESC - LIMIT 1`, map[string]interface{}{"Status": status, "Type": jobType}); err != nil && err != sql.ErrNoRows { - result.Err = model.NewAppError("SqlJobStore.GetNewestJobByStatusAndType", "store.sql_job.get_newest_job_by_status_and_type.app_error", nil, "Status="+status+", "+err.Error(), http.StatusInternalServerError) - } else { - result.Data = job - } - }) + var job *model.Job + if err := jss.GetReplica().SelectOne(&job, query, map[string]interface{}{"Status": status, "Type": jobType}); err != nil && err != sql.ErrNoRows { + return nil, model.NewAppError("SqlJobStore.GetNewestJobByStatusAndType", "store.sql_job.get_newest_job_by_status_and_type.app_error", nil, "Status="+status+", "+err.Error(), http.StatusInternalServerError) + } + return job, nil } -func (jss SqlJobStore) GetCountByStatusAndType(status string, jobType string) store.StoreChannel { - return store.Do(func(result *store.StoreResult) { - if count, err := jss.GetReplica().SelectInt(`SELECT - COUNT(*) - FROM - Jobs - WHERE - Status = :Status - AND - Type = :Type`, map[string]interface{}{"Status": status, "Type": jobType}); err != nil { - result.Err = model.NewAppError("SqlJobStore.GetCountByStatusAndType", "store.sql_job.get_count_by_status_and_type.app_error", nil, "Status="+status+", "+err.Error(), http.StatusInternalServerError) - } else { - result.Data = count - } - }) +func (jss SqlJobStore) GetCountByStatusAndType(status string, jobType string) (int64, *model.AppError) { + query := "SELECT COUNT(*) FROM Jobs WHERE Status = :Status AND Type = :Type" + count, err := jss.GetReplica().SelectInt(query, map[string]interface{}{"Status": status, "Type": jobType}) + if err != nil { + return int64(0), model.NewAppError("SqlJobStore.GetCountByStatusAndType", "store.sql_job.get_count_by_status_and_type.app_error", nil, "Status="+status+", "+err.Error(), http.StatusInternalServerError) + } + return count, nil } -func (jss SqlJobStore) Delete(id string) store.StoreChannel { - return store.Do(func(result *store.StoreResult) { - if _, err := jss.GetMaster().Exec( - `DELETE FROM - Jobs - WHERE - Id = :Id`, map[string]interface{}{"Id": id}); err != nil { - result.Err = model.NewAppError("SqlJobStore.DeleteByType", "store.sql_job.delete.app_error", nil, "id="+id+", "+err.Error(), http.StatusInternalServerError) - } else { - result.Data = id - } - }) +func (jss SqlJobStore) Delete(id string) (string, *model.AppError) { + query := "DELETE FROM Jobs WHERE Id = :Id" + if _, err := jss.GetMaster().Exec(query, map[string]interface{}{"Id": id}); err != nil { + return "", model.NewAppError("SqlJobStore.DeleteByType", "store.sql_job.delete.app_error", nil, "id="+id+", "+err.Error(), http.StatusInternalServerError) + } + return id, nil } diff --git a/store/store.go b/store/store.go index b19ad3278a..1eb7e5b7a6 100644 --- a/store/store.go +++ b/store/store.go @@ -500,18 +500,18 @@ type ReactionStore interface { } type JobStore interface { - Save(job *model.Job) StoreChannel - UpdateOptimistically(job *model.Job, currentStatus string) StoreChannel - UpdateStatus(id string, status string) StoreChannel - UpdateStatusOptimistically(id string, currentStatus string, newStatus string) StoreChannel - Get(id string) StoreChannel - GetAllPage(offset int, limit int) StoreChannel - GetAllByType(jobType string) StoreChannel - GetAllByTypePage(jobType string, offset int, limit int) StoreChannel - GetAllByStatus(status string) StoreChannel - GetNewestJobByStatusAndType(status string, jobType string) StoreChannel - GetCountByStatusAndType(status string, jobType string) StoreChannel - Delete(id string) StoreChannel + Save(job *model.Job) (*model.Job, *model.AppError) + UpdateOptimistically(job *model.Job, currentStatus string) (bool, *model.AppError) + UpdateStatus(id string, status string) (*model.Job, *model.AppError) + UpdateStatusOptimistically(id string, currentStatus string, newStatus string) (bool, *model.AppError) + Get(id string) (*model.Job, *model.AppError) + GetAllPage(offset int, limit int) ([]*model.Job, *model.AppError) + GetAllByType(jobType string) ([]*model.Job, *model.AppError) + GetAllByTypePage(jobType string, offset int, limit int) ([]*model.Job, *model.AppError) + GetAllByStatus(status string) ([]*model.Job, *model.AppError) + GetNewestJobByStatusAndType(status string, jobType string) (*model.Job, *model.AppError) + GetCountByStatusAndType(status string, jobType string) (int64, *model.AppError) + Delete(id string) (string, *model.AppError) } type UserAccessTokenStore interface { diff --git a/store/storetest/job_store.go b/store/storetest/job_store.go index 936999f52d..16cdd6174d 100644 --- a/store/storetest/job_store.go +++ b/store/storetest/job_store.go @@ -11,6 +11,7 @@ import ( "github.com/mattermost/mattermost-server/model" "github.com/mattermost/mattermost-server/store" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestJobStore(t *testing.T, ss store.Store) { @@ -38,17 +39,14 @@ func testJobSaveGet(t *testing.T, ss store.Store) { }, } - if result := <-ss.Job().Save(job); result.Err != nil { - t.Fatal(result.Err) - } + _, err := ss.Job().Save(job) + require.Nil(t, err) - defer func() { - <-ss.Job().Delete(job.Id) - }() + defer ss.Job().Delete(job.Id) - if result := <-ss.Job().Get(job.Id); result.Err != nil { - t.Fatal(result.Err) - } else if received := result.Data.(*model.Job); received.Id != job.Id { + if received, err := ss.Job().Get(job.Id); err != nil { + t.Fatal(err) + } else if received.Id != job.Id { t.Fatal("received incorrect job after save") } else if received.Data["Total"] != "12345" { t.Fatal("data field was not retrieved successfully:", received.Data) @@ -74,13 +72,14 @@ func testJobGetAllByType(t *testing.T, ss store.Store) { } for _, job := range jobs { - store.Must(ss.Job().Save(job)) + _, err := ss.Job().Save(job) + require.Nil(t, err) defer ss.Job().Delete(job.Id) } - if result := <-ss.Job().GetAllByType(jobType); result.Err != nil { - t.Fatal(result.Err) - } else if received := result.Data.([]*model.Job); len(received) != 2 { + if received, err := ss.Job().GetAllByType(jobType); err != nil { + t.Fatal(err) + } else if len(received) != 2 { t.Fatal("received wrong number of jobs") } else if received[0].Id != jobs[0].Id && received[1].Id != jobs[0].Id { t.Fatal("should've received first jobs") @@ -116,13 +115,14 @@ func testJobGetAllByTypePage(t *testing.T, ss store.Store) { } for _, job := range jobs { - store.Must(ss.Job().Save(job)) + _, err := ss.Job().Save(job) + require.Nil(t, err) defer ss.Job().Delete(job.Id) } - if result := <-ss.Job().GetAllByTypePage(jobType, 0, 2); result.Err != nil { - t.Fatal(result.Err) - } else if received := result.Data.([]*model.Job); len(received) != 2 { + if received, err := ss.Job().GetAllByTypePage(jobType, 0, 2); err != nil { + t.Fatal(err) + } else if len(received) != 2 { t.Fatal("received wrong number of jobs") } else if received[0].Id != jobs[2].Id { t.Fatal("should've received newest job first") @@ -130,9 +130,9 @@ func testJobGetAllByTypePage(t *testing.T, ss store.Store) { t.Fatal("should've received second newest job second") } - if result := <-ss.Job().GetAllByTypePage(jobType, 2, 2); result.Err != nil { - t.Fatal(result.Err) - } else if received := result.Data.([]*model.Job); len(received) != 1 { + if received, err := ss.Job().GetAllByTypePage(jobType, 2, 2); err != nil { + t.Fatal(err) + } else if len(received) != 1 { t.Fatal("received wrong number of jobs") } else if received[0].Id != jobs[1].Id { t.Fatal("should've received oldest job last") @@ -162,13 +162,14 @@ func testJobGetAllPage(t *testing.T, ss store.Store) { } for _, job := range jobs { - store.Must(ss.Job().Save(job)) + _, err := ss.Job().Save(job) + require.Nil(t, err) defer ss.Job().Delete(job.Id) } - if result := <-ss.Job().GetAllPage(0, 2); result.Err != nil { - t.Fatal(result.Err) - } else if received := result.Data.([]*model.Job); len(received) != 2 { + if received, err := ss.Job().GetAllPage(0, 2); err != nil { + t.Fatal(err) + } else if len(received) != 2 { t.Fatal("received wrong number of jobs") } else if received[0].Id != jobs[2].Id { t.Fatal("should've received newest job first") @@ -176,9 +177,9 @@ func testJobGetAllPage(t *testing.T, ss store.Store) { t.Fatal("should've received second newest job second") } - if result := <-ss.Job().GetAllPage(2, 2); result.Err != nil { - t.Fatal(result.Err) - } else if received := result.Data.([]*model.Job); len(received) < 1 { + if received, err := ss.Job().GetAllPage(2, 2); err != nil { + t.Fatal(err) + } else if len(received) < 1 { t.Fatal("received wrong number of jobs") } else if received[0].Id != jobs[1].Id { t.Fatal("should've received oldest job last") @@ -220,13 +221,14 @@ func testJobGetAllByStatus(t *testing.T, ss store.Store) { } for _, job := range jobs { - store.Must(ss.Job().Save(job)) + _, err := ss.Job().Save(job) + require.Nil(t, err) defer ss.Job().Delete(job.Id) } - if result := <-ss.Job().GetAllByStatus(status); result.Err != nil { - t.Fatal(result.Err) - } else if received := result.Data.([]*model.Job); len(received) != 3 { + if received, err := ss.Job().GetAllByStatus(status); err != nil { + t.Fatal(err) + } else if len(received) != 3 { t.Fatal("received wrong number of jobs") } else if received[0].Id != jobs[1].Id || received[1].Id != jobs[0].Id || received[2].Id != jobs[2].Id { t.Fatal("should've received jobs ordered by CreateAt time") @@ -269,17 +271,18 @@ func testJobStoreGetNewestJobByStatusAndType(t *testing.T, ss store.Store) { } for _, job := range jobs { - store.Must(ss.Job().Save(job)) + _, err := ss.Job().Save(job) + require.Nil(t, err) defer ss.Job().Delete(job.Id) } - result := <-ss.Job().GetNewestJobByStatusAndType(status1, jobType1) - assert.Nil(t, result.Err) - assert.EqualValues(t, jobs[0].Id, result.Data.(*model.Job).Id) + received, err := ss.Job().GetNewestJobByStatusAndType(status1, jobType1) + assert.Nil(t, err) + assert.EqualValues(t, jobs[0].Id, received.Id) - result = <-ss.Job().GetNewestJobByStatusAndType(model.NewId(), model.NewId()) - assert.Nil(t, result.Err) - assert.Nil(t, result.Data.(*model.Job)) + received, err = ss.Job().GetNewestJobByStatusAndType(model.NewId(), model.NewId()) + assert.Nil(t, err) + assert.Nil(t, received) } func testJobStoreGetCountByStatusAndType(t *testing.T, ss store.Store) { @@ -316,25 +319,26 @@ func testJobStoreGetCountByStatusAndType(t *testing.T, ss store.Store) { } for _, job := range jobs { - store.Must(ss.Job().Save(job)) + _, err := ss.Job().Save(job) + require.Nil(t, err) defer ss.Job().Delete(job.Id) } - result := <-ss.Job().GetCountByStatusAndType(status1, jobType1) - assert.Nil(t, result.Err) - assert.EqualValues(t, 2, result.Data.(int64)) + count, err := ss.Job().GetCountByStatusAndType(status1, jobType1) + assert.Nil(t, err) + assert.EqualValues(t, 2, count) - result = <-ss.Job().GetCountByStatusAndType(status2, jobType2) - assert.Nil(t, result.Err) - assert.EqualValues(t, 0, result.Data.(int64)) + count, err = ss.Job().GetCountByStatusAndType(status2, jobType2) + assert.Nil(t, err) + assert.EqualValues(t, 0, count) - result = <-ss.Job().GetCountByStatusAndType(status1, jobType2) - assert.Nil(t, result.Err) - assert.EqualValues(t, 1, result.Data.(int64)) + count, err = ss.Job().GetCountByStatusAndType(status1, jobType2) + assert.Nil(t, err) + assert.EqualValues(t, 1, count) - result = <-ss.Job().GetCountByStatusAndType(status2, jobType1) - assert.Nil(t, result.Err) - assert.EqualValues(t, 1, result.Data.(int64)) + count, err = ss.Job().GetCountByStatusAndType(status2, jobType1) + assert.Nil(t, err) + assert.EqualValues(t, 1, count) } func testJobUpdateOptimistically(t *testing.T, ss store.Store) { @@ -345,9 +349,8 @@ func testJobUpdateOptimistically(t *testing.T, ss store.Store) { Status: model.JOB_STATUS_PENDING, } - if result := <-ss.Job().Save(job); result.Err != nil { - t.Fatal(result.Err) - } + _, err := ss.Job().Save(job) + require.Nil(t, err) defer ss.Job().Delete(job.Id) job.LastActivityAt = model.GetMillis() @@ -357,34 +360,24 @@ func testJobUpdateOptimistically(t *testing.T, ss store.Store) { "Foo": "Bar", } - if result := <-ss.Job().UpdateOptimistically(job, model.JOB_STATUS_SUCCESS); result.Err != nil { - if result.Data.(bool) { + if updated, err2 := ss.Job().UpdateOptimistically(job, model.JOB_STATUS_SUCCESS); err2 != nil { + if updated { t.Fatal("should have failed due to incorrect old status") } } time.Sleep(2 * time.Millisecond) - if result := <-ss.Job().UpdateOptimistically(job, model.JOB_STATUS_PENDING); result.Err != nil { - t.Fatal(result.Err) - } else { - if !result.Data.(bool) { - t.Fatal("Should have successfully updated") - } + updated, err := ss.Job().UpdateOptimistically(job, model.JOB_STATUS_PENDING) + require.Nil(t, err) + require.True(t, updated) - var updatedJob *model.Job + updatedJob, err := ss.Job().Get(job.Id) + require.Nil(t, err) - if result := <-ss.Job().Get(job.Id); result.Err != nil { - t.Fatal(result.Err) - } else { - updatedJob = result.Data.(*model.Job) - } - - if updatedJob.Type != job.Type || updatedJob.CreateAt != job.CreateAt || updatedJob.Status != job.Status || updatedJob.LastActivityAt <= job.LastActivityAt || updatedJob.Progress != job.Progress || updatedJob.Data["Foo"] != job.Data["Foo"] { - t.Fatal("Some update property was not as expected") - } + if updatedJob.Type != job.Type || updatedJob.CreateAt != job.CreateAt || updatedJob.Status != job.Status || updatedJob.LastActivityAt <= job.LastActivityAt || updatedJob.Progress != job.Progress || updatedJob.Data["Foo"] != job.Data["Foo"] { + t.Fatal("Some update property was not as expected") } - } func testJobUpdateStatusUpdateStatusOptimistically(t *testing.T, ss store.Store) { @@ -396,111 +389,85 @@ func testJobUpdateStatusUpdateStatusOptimistically(t *testing.T, ss store.Store) } var lastUpdateAt int64 - if result := <-ss.Job().Save(job); result.Err != nil { - t.Fatal(result.Err) - } else { - lastUpdateAt = result.Data.(*model.Job).LastActivityAt - } + received, err := ss.Job().Save(job) + require.Nil(t, err) + lastUpdateAt = received.LastActivityAt defer ss.Job().Delete(job.Id) time.Sleep(2 * time.Millisecond) - if result := <-ss.Job().UpdateStatus(job.Id, model.JOB_STATUS_PENDING); result.Err != nil { - t.Fatal(result.Err) - } else { - received := result.Data.(*model.Job) - if received.Status != model.JOB_STATUS_PENDING { - t.Fatal("status wasn't updated") - } - if received.LastActivityAt <= lastUpdateAt { - t.Fatal("lastActivityAt wasn't updated") - } - lastUpdateAt = received.LastActivityAt + received, err = ss.Job().UpdateStatus(job.Id, model.JOB_STATUS_PENDING) + require.Nil(t, err) + + if received.Status != model.JOB_STATUS_PENDING { + t.Fatal("status wasn't updated") + } + if received.LastActivityAt <= lastUpdateAt { + t.Fatal("lastActivityAt wasn't updated") + } + lastUpdateAt = received.LastActivityAt + + time.Sleep(2 * time.Millisecond) + + updated, err := ss.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_SUCCESS) + require.Nil(t, err) + require.False(t, updated) + + received, err = ss.Job().Get(job.Id) + require.Nil(t, err) + + if received.Status != model.JOB_STATUS_PENDING { + t.Fatal("should still be pending") + } + if received.LastActivityAt != lastUpdateAt { + t.Fatal("last activity at shouldn't have changed") } time.Sleep(2 * time.Millisecond) - if result := <-ss.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_SUCCESS); result.Err != nil { - t.Fatal(result.Err) - } else { - if result.Data.(bool) { - t.Fatal("should be false due to incorrect original status") - } - } - - if result := <-ss.Job().Get(job.Id); result.Err != nil { - t.Fatal(result.Err) - } else { - received := result.Data.(*model.Job) - if received.Status != model.JOB_STATUS_PENDING { - t.Fatal("should still be pending") - } - if received.LastActivityAt != lastUpdateAt { - t.Fatal("last activity at shouldn't have changed") - } - } - - time.Sleep(2 * time.Millisecond) - - if result := <-ss.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_IN_PROGRESS); result.Err != nil { - t.Fatal(result.Err) - } else { - if !result.Data.(bool) { - t.Fatal("should have succeeded") - } - } + updated, err = ss.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_IN_PROGRESS) + require.Nil(t, err) + require.True(t, updated, "should have succeeded") var startAtSet int64 - if result := <-ss.Job().Get(job.Id); result.Err != nil { - t.Fatal(result.Err) - } else { - received := result.Data.(*model.Job) - if received.Status != model.JOB_STATUS_IN_PROGRESS { - t.Fatal("should be in progress") - } - if received.StartAt == 0 { - t.Fatal("received should have start at set") - } - if received.LastActivityAt <= lastUpdateAt { - t.Fatal("lastActivityAt wasn't updated") - } - lastUpdateAt = received.LastActivityAt - startAtSet = received.StartAt + received, err = ss.Job().Get(job.Id) + require.Nil(t, err) + if received.Status != model.JOB_STATUS_IN_PROGRESS { + t.Fatal("should be in progress") } + if received.StartAt == 0 { + t.Fatal("received should have start at set") + } + if received.LastActivityAt <= lastUpdateAt { + t.Fatal("lastActivityAt wasn't updated") + } + lastUpdateAt = received.LastActivityAt + startAtSet = received.StartAt time.Sleep(2 * time.Millisecond) - if result := <-ss.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_SUCCESS); result.Err != nil { - t.Fatal(result.Err) - } else { - if !result.Data.(bool) { - t.Fatal("should have succeeded") - } - } + updated, err = ss.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_SUCCESS) + require.Nil(t, err) + require.True(t, updated, "should have succeeded") - if result := <-ss.Job().Get(job.Id); result.Err != nil { - t.Fatal(result.Err) - } else { - received := result.Data.(*model.Job) - if received.Status != model.JOB_STATUS_SUCCESS { - t.Fatal("should be success status") - } - if received.StartAt != startAtSet { - t.Fatal("startAt should not have changed") - } - if received.LastActivityAt <= lastUpdateAt { - t.Fatal("lastActivityAt wasn't updated") - } + received, err = ss.Job().Get(job.Id) + require.Nil(t, err) + if received.Status != model.JOB_STATUS_SUCCESS { + t.Fatal("should be success status") + } + if received.StartAt != startAtSet { + t.Fatal("startAt should not have changed") + } + if received.LastActivityAt <= lastUpdateAt { + t.Fatal("lastActivityAt wasn't updated") } } func testJobDelete(t *testing.T, ss store.Store) { - job := store.Must(ss.Job().Save(&model.Job{ - Id: model.NewId(), - })).(*model.Job) + job, err := ss.Job().Save(&model.Job{Id: model.NewId()}) + require.Nil(t, err) - if result := <-ss.Job().Delete(job.Id); result.Err != nil { - t.Fatal(result.Err) - } + _, err = ss.Job().Delete(job.Id) + assert.Nil(t, err) } diff --git a/store/storetest/mocks/JobStore.go b/store/storetest/mocks/JobStore.go index a78a3f94e2..2c9908dece 100644 --- a/store/storetest/mocks/JobStore.go +++ b/store/storetest/mocks/JobStore.go @@ -6,7 +6,6 @@ package mocks import mock "github.com/stretchr/testify/mock" import model "github.com/mattermost/mattermost-server/model" -import store "github.com/mattermost/mattermost-server/store" // JobStore is an autogenerated mock type for the JobStore type type JobStore struct { @@ -14,193 +13,293 @@ type JobStore struct { } // Delete provides a mock function with given fields: id -func (_m *JobStore) Delete(id string) store.StoreChannel { +func (_m *JobStore) Delete(id string) (string, *model.AppError) { ret := _m.Called(id) - var r0 store.StoreChannel - if rf, ok := ret.Get(0).(func(string) store.StoreChannel); ok { + var r0 string + if rf, ok := ret.Get(0).(func(string) string); ok { r0 = rf(id) } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(store.StoreChannel) + r0 = ret.Get(0).(string) + } + + var r1 *model.AppError + if rf, ok := ret.Get(1).(func(string) *model.AppError); ok { + r1 = rf(id) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(*model.AppError) } } - return r0 + return r0, r1 } // Get provides a mock function with given fields: id -func (_m *JobStore) Get(id string) store.StoreChannel { +func (_m *JobStore) Get(id string) (*model.Job, *model.AppError) { ret := _m.Called(id) - var r0 store.StoreChannel - if rf, ok := ret.Get(0).(func(string) store.StoreChannel); ok { + var r0 *model.Job + if rf, ok := ret.Get(0).(func(string) *model.Job); ok { r0 = rf(id) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(store.StoreChannel) + r0 = ret.Get(0).(*model.Job) } } - return r0 + var r1 *model.AppError + if rf, ok := ret.Get(1).(func(string) *model.AppError); ok { + r1 = rf(id) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(*model.AppError) + } + } + + return r0, r1 } // GetAllByStatus provides a mock function with given fields: status -func (_m *JobStore) GetAllByStatus(status string) store.StoreChannel { +func (_m *JobStore) GetAllByStatus(status string) ([]*model.Job, *model.AppError) { ret := _m.Called(status) - var r0 store.StoreChannel - if rf, ok := ret.Get(0).(func(string) store.StoreChannel); ok { + var r0 []*model.Job + if rf, ok := ret.Get(0).(func(string) []*model.Job); ok { r0 = rf(status) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(store.StoreChannel) + r0 = ret.Get(0).([]*model.Job) } } - return r0 + var r1 *model.AppError + if rf, ok := ret.Get(1).(func(string) *model.AppError); ok { + r1 = rf(status) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(*model.AppError) + } + } + + return r0, r1 } // GetAllByType provides a mock function with given fields: jobType -func (_m *JobStore) GetAllByType(jobType string) store.StoreChannel { +func (_m *JobStore) GetAllByType(jobType string) ([]*model.Job, *model.AppError) { ret := _m.Called(jobType) - var r0 store.StoreChannel - if rf, ok := ret.Get(0).(func(string) store.StoreChannel); ok { + var r0 []*model.Job + if rf, ok := ret.Get(0).(func(string) []*model.Job); ok { r0 = rf(jobType) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(store.StoreChannel) + r0 = ret.Get(0).([]*model.Job) } } - return r0 + var r1 *model.AppError + if rf, ok := ret.Get(1).(func(string) *model.AppError); ok { + r1 = rf(jobType) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(*model.AppError) + } + } + + return r0, r1 } // GetAllByTypePage provides a mock function with given fields: jobType, offset, limit -func (_m *JobStore) GetAllByTypePage(jobType string, offset int, limit int) store.StoreChannel { +func (_m *JobStore) GetAllByTypePage(jobType string, offset int, limit int) ([]*model.Job, *model.AppError) { ret := _m.Called(jobType, offset, limit) - var r0 store.StoreChannel - if rf, ok := ret.Get(0).(func(string, int, int) store.StoreChannel); ok { + var r0 []*model.Job + if rf, ok := ret.Get(0).(func(string, int, int) []*model.Job); ok { r0 = rf(jobType, offset, limit) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(store.StoreChannel) + r0 = ret.Get(0).([]*model.Job) } } - return r0 + var r1 *model.AppError + if rf, ok := ret.Get(1).(func(string, int, int) *model.AppError); ok { + r1 = rf(jobType, offset, limit) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(*model.AppError) + } + } + + return r0, r1 } // GetAllPage provides a mock function with given fields: offset, limit -func (_m *JobStore) GetAllPage(offset int, limit int) store.StoreChannel { +func (_m *JobStore) GetAllPage(offset int, limit int) ([]*model.Job, *model.AppError) { ret := _m.Called(offset, limit) - var r0 store.StoreChannel - if rf, ok := ret.Get(0).(func(int, int) store.StoreChannel); ok { + var r0 []*model.Job + if rf, ok := ret.Get(0).(func(int, int) []*model.Job); ok { r0 = rf(offset, limit) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(store.StoreChannel) + r0 = ret.Get(0).([]*model.Job) } } - return r0 + var r1 *model.AppError + if rf, ok := ret.Get(1).(func(int, int) *model.AppError); ok { + r1 = rf(offset, limit) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(*model.AppError) + } + } + + return r0, r1 } // GetCountByStatusAndType provides a mock function with given fields: status, jobType -func (_m *JobStore) GetCountByStatusAndType(status string, jobType string) store.StoreChannel { +func (_m *JobStore) GetCountByStatusAndType(status string, jobType string) (int64, *model.AppError) { ret := _m.Called(status, jobType) - var r0 store.StoreChannel - if rf, ok := ret.Get(0).(func(string, string) store.StoreChannel); ok { + var r0 int64 + if rf, ok := ret.Get(0).(func(string, string) int64); ok { r0 = rf(status, jobType) } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(store.StoreChannel) + r0 = ret.Get(0).(int64) + } + + var r1 *model.AppError + if rf, ok := ret.Get(1).(func(string, string) *model.AppError); ok { + r1 = rf(status, jobType) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(*model.AppError) } } - return r0 + return r0, r1 } // GetNewestJobByStatusAndType provides a mock function with given fields: status, jobType -func (_m *JobStore) GetNewestJobByStatusAndType(status string, jobType string) store.StoreChannel { +func (_m *JobStore) GetNewestJobByStatusAndType(status string, jobType string) (*model.Job, *model.AppError) { ret := _m.Called(status, jobType) - var r0 store.StoreChannel - if rf, ok := ret.Get(0).(func(string, string) store.StoreChannel); ok { + var r0 *model.Job + if rf, ok := ret.Get(0).(func(string, string) *model.Job); ok { r0 = rf(status, jobType) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(store.StoreChannel) + r0 = ret.Get(0).(*model.Job) } } - return r0 + var r1 *model.AppError + if rf, ok := ret.Get(1).(func(string, string) *model.AppError); ok { + r1 = rf(status, jobType) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(*model.AppError) + } + } + + return r0, r1 } // Save provides a mock function with given fields: job -func (_m *JobStore) Save(job *model.Job) store.StoreChannel { +func (_m *JobStore) Save(job *model.Job) (*model.Job, *model.AppError) { ret := _m.Called(job) - var r0 store.StoreChannel - if rf, ok := ret.Get(0).(func(*model.Job) store.StoreChannel); ok { + var r0 *model.Job + if rf, ok := ret.Get(0).(func(*model.Job) *model.Job); ok { r0 = rf(job) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(store.StoreChannel) + r0 = ret.Get(0).(*model.Job) } } - return r0 + var r1 *model.AppError + if rf, ok := ret.Get(1).(func(*model.Job) *model.AppError); ok { + r1 = rf(job) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(*model.AppError) + } + } + + return r0, r1 } // UpdateOptimistically provides a mock function with given fields: job, currentStatus -func (_m *JobStore) UpdateOptimistically(job *model.Job, currentStatus string) store.StoreChannel { +func (_m *JobStore) UpdateOptimistically(job *model.Job, currentStatus string) (bool, *model.AppError) { ret := _m.Called(job, currentStatus) - var r0 store.StoreChannel - if rf, ok := ret.Get(0).(func(*model.Job, string) store.StoreChannel); ok { + var r0 bool + if rf, ok := ret.Get(0).(func(*model.Job, string) bool); ok { r0 = rf(job, currentStatus) } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(store.StoreChannel) + r0 = ret.Get(0).(bool) + } + + var r1 *model.AppError + if rf, ok := ret.Get(1).(func(*model.Job, string) *model.AppError); ok { + r1 = rf(job, currentStatus) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(*model.AppError) } } - return r0 + return r0, r1 } // UpdateStatus provides a mock function with given fields: id, status -func (_m *JobStore) UpdateStatus(id string, status string) store.StoreChannel { +func (_m *JobStore) UpdateStatus(id string, status string) (*model.Job, *model.AppError) { ret := _m.Called(id, status) - var r0 store.StoreChannel - if rf, ok := ret.Get(0).(func(string, string) store.StoreChannel); ok { + var r0 *model.Job + if rf, ok := ret.Get(0).(func(string, string) *model.Job); ok { r0 = rf(id, status) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(store.StoreChannel) + r0 = ret.Get(0).(*model.Job) } } - return r0 + var r1 *model.AppError + if rf, ok := ret.Get(1).(func(string, string) *model.AppError); ok { + r1 = rf(id, status) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(*model.AppError) + } + } + + return r0, r1 } // UpdateStatusOptimistically provides a mock function with given fields: id, currentStatus, newStatus -func (_m *JobStore) UpdateStatusOptimistically(id string, currentStatus string, newStatus string) store.StoreChannel { +func (_m *JobStore) UpdateStatusOptimistically(id string, currentStatus string, newStatus string) (bool, *model.AppError) { ret := _m.Called(id, currentStatus, newStatus) - var r0 store.StoreChannel - if rf, ok := ret.Get(0).(func(string, string, string) store.StoreChannel); ok { + var r0 bool + if rf, ok := ret.Get(0).(func(string, string, string) bool); ok { r0 = rf(id, currentStatus, newStatus) } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(store.StoreChannel) + r0 = ret.Get(0).(bool) + } + + var r1 *model.AppError + if rf, ok := ret.Get(1).(func(string, string, string) *model.AppError); ok { + r1 = rf(id, currentStatus, newStatus) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(*model.AppError) } } - return r0 + return r0, r1 }