Migrate Jobs store to sync by default (#11183)

* Migrate Jobs store to sync by default

* Fixing compilation

* Fixing compilation

* Fixing govet
This commit is contained in:
Jesús Espino
2019-06-15 17:55:06 +02:00
committed by GitHub
parent bbdd6927de
commit f934502a56
12 changed files with 570 additions and 619 deletions

View File

@@ -8,7 +8,7 @@ import (
"testing"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/store"
"github.com/stretchr/testify/require"
)
func TestCreateJob(t *testing.T) {
@@ -46,9 +46,8 @@ func TestGetJob(t *testing.T) {
Id: model.NewId(),
Status: model.JOB_STATUS_PENDING,
}
if result := <-th.App.Srv.Store.Job().Save(job); result.Err != nil {
t.Fatal(result.Err)
}
_, err := th.App.Srv.Store.Job().Save(job)
require.Nil(t, err)
defer th.App.Srv.Store.Job().Delete(job.Id)
@@ -95,7 +94,8 @@ func TestGetJobs(t *testing.T) {
}
for _, job := range jobs {
store.Must(th.App.Srv.Store.Job().Save(job))
_, err := th.App.Srv.Store.Job().Save(job)
require.Nil(t, err)
defer th.App.Srv.Store.Job().Delete(job.Id)
}
@@ -151,7 +151,8 @@ func TestGetJobsByType(t *testing.T) {
}
for _, job := range jobs {
store.Must(th.App.Srv.Store.Job().Save(job))
_, err := th.App.Srv.Store.Job().Save(job)
require.Nil(t, err)
defer th.App.Srv.Store.Job().Delete(job.Id)
}
@@ -208,7 +209,8 @@ func TestCancelJob(t *testing.T) {
}
for _, job := range jobs {
store.Must(th.App.Srv.Store.Job().Save(job))
_, err := th.App.Srv.Store.Job().Save(job)
require.Nil(t, err)
defer th.App.Srv.Store.Job().Delete(job.Id)
}

View File

@@ -8,11 +8,7 @@ import (
)
func (a *App) GetJob(id string) (*model.Job, *model.AppError) {
result := <-a.Srv.Store.Job().Get(id)
if result.Err != nil {
return nil, result.Err
}
return result.Data.(*model.Job), nil
return a.Srv.Store.Job().Get(id)
}
func (a *App) GetJobsPage(page int, perPage int) ([]*model.Job, *model.AppError) {
@@ -20,11 +16,7 @@ func (a *App) GetJobsPage(page int, perPage int) ([]*model.Job, *model.AppError)
}
func (a *App) GetJobs(offset int, limit int) ([]*model.Job, *model.AppError) {
result := <-a.Srv.Store.Job().GetAllPage(offset, limit)
if result.Err != nil {
return nil, result.Err
}
return result.Data.([]*model.Job), nil
return a.Srv.Store.Job().GetAllPage(offset, limit)
}
func (a *App) GetJobsByTypePage(jobType string, page int, perPage int) ([]*model.Job, *model.AppError) {
@@ -32,11 +24,7 @@ func (a *App) GetJobsByTypePage(jobType string, page int, perPage int) ([]*model
}
func (a *App) GetJobsByType(jobType string, offset int, limit int) ([]*model.Job, *model.AppError) {
result := <-a.Srv.Store.Job().GetAllByTypePage(jobType, offset, limit)
if result.Err != nil {
return nil, result.Err
}
return result.Data.([]*model.Job), nil
return a.Srv.Store.Job().GetAllByTypePage(jobType, offset, limit)
}
func (a *App) CreateJob(job *model.Job) (*model.Job, *model.AppError) {

View File

@@ -7,7 +7,7 @@ import (
"testing"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/store"
"github.com/stretchr/testify/require"
)
func TestGetJob(t *testing.T) {
@@ -18,9 +18,8 @@ func TestGetJob(t *testing.T) {
Id: model.NewId(),
Status: model.NewId(),
}
if result := <-th.App.Srv.Store.Job().Save(status); result.Err != nil {
t.Fatal(result.Err)
}
_, err := th.App.Srv.Store.Job().Save(status)
require.Nil(t, err)
defer th.App.Srv.Store.Job().Delete(status.Id)
@@ -56,7 +55,8 @@ func TestGetJobByType(t *testing.T) {
}
for _, status := range statuses {
store.Must(th.App.Srv.Store.Job().Save(status))
_, err := th.App.Srv.Store.Job().Save(status)
require.Nil(t, err)
defer th.App.Srv.Store.Job().Delete(status.Id)
}

View File

@@ -31,50 +31,42 @@ func (srv *JobServer) CreateJob(jobType string, jobData map[string]string) (*mod
return nil, err
}
if result := <-srv.Store.Job().Save(&job); result.Err != nil {
return nil, result.Err
if _, err := srv.Store.Job().Save(&job); err != nil {
return nil, err
}
return &job, nil
}
func (srv *JobServer) GetJob(id string) (*model.Job, *model.AppError) {
if result := <-srv.Store.Job().Get(id); result.Err != nil {
return nil, result.Err
} else {
return result.Data.(*model.Job), nil
}
return srv.Store.Job().Get(id)
}
func (srv *JobServer) ClaimJob(job *model.Job) (bool, *model.AppError) {
if result := <-srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
return false, result.Err
} else {
success := result.Data.(bool)
return success, nil
}
return srv.Store.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_IN_PROGRESS)
}
func (srv *JobServer) SetJobProgress(job *model.Job, progress int64) *model.AppError {
job.Status = model.JOB_STATUS_IN_PROGRESS
job.Progress = progress
if result := <-srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
return result.Err
} else {
return nil
if _, err := srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); err != nil {
return err
}
return nil
}
func (srv *JobServer) SetJobSuccess(job *model.Job) *model.AppError {
result := <-srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_SUCCESS)
return result.Err
if _, err := srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_SUCCESS); err != nil {
return err
}
return nil
}
func (srv *JobServer) SetJobError(job *model.Job, jobError *model.AppError) *model.AppError {
if jobError == nil {
result := <-srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_ERROR)
return result.Err
_, err := srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_ERROR)
return err
}
job.Status = model.JOB_STATUS_ERROR
@@ -84,17 +76,18 @@ func (srv *JobServer) SetJobError(job *model.Job, jobError *model.AppError) *mod
}
job.Data["error"] = jobError.Message + " — " + jobError.DetailedError
if result := <-srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
return result.Err
} else {
if !result.Data.(bool) {
if result := <-srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil {
return result.Err
} else {
if !result.Data.(bool) {
return model.NewAppError("Jobs.SetJobError", "jobs.set_job_error.update.error", nil, "id="+job.Id, http.StatusInternalServerError)
}
}
updated, err := srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS)
if err != nil {
return err
}
if !updated {
updated, err = srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_CANCEL_REQUESTED)
if err != nil {
return err
}
if !updated {
return model.NewAppError("Jobs.SetJobError", "jobs.set_job_error.update.error", nil, "id="+job.Id, http.StatusInternalServerError)
}
}
@@ -102,27 +95,36 @@ func (srv *JobServer) SetJobError(job *model.Job, jobError *model.AppError) *mod
}
func (srv *JobServer) SetJobCanceled(job *model.Job) *model.AppError {
result := <-srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_CANCELED)
return result.Err
if _, err := srv.Store.Job().UpdateStatus(job.Id, model.JOB_STATUS_CANCELED); err != nil {
return err
}
return nil
}
func (srv *JobServer) UpdateInProgressJobData(job *model.Job) *model.AppError {
job.Status = model.JOB_STATUS_IN_PROGRESS
job.LastActivityAt = model.GetMillis()
result := <-srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS)
return result.Err
if _, err := srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS); err != nil {
return err
}
return nil
}
func (srv *JobServer) RequestCancellation(jobId string) *model.AppError {
if result := <-srv.Store.Job().UpdateStatusOptimistically(jobId, model.JOB_STATUS_PENDING, model.JOB_STATUS_CANCELED); result.Err != nil {
return result.Err
} else if result.Data.(bool) {
updated, err := srv.Store.Job().UpdateStatusOptimistically(jobId, model.JOB_STATUS_PENDING, model.JOB_STATUS_CANCELED)
if err != nil {
return err
}
if updated {
return nil
}
if result := <-srv.Store.Job().UpdateStatusOptimistically(jobId, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_CANCEL_REQUESTED); result.Err != nil {
return result.Err
} else if result.Data.(bool) {
updated, err = srv.Store.Job().UpdateStatusOptimistically(jobId, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_CANCEL_REQUESTED)
if err != nil {
return err
}
if updated {
return nil
}
@@ -137,8 +139,7 @@ func (srv *JobServer) CancellationWatcher(ctx context.Context, jobId string, can
return
case <-time.After(CANCEL_WATCHER_POLLING_INTERVAL * time.Millisecond):
mlog.Debug(fmt.Sprintf("CancellationWatcher for Job: %v polling.", jobId))
if result := <-srv.Store.Job().Get(jobId); result.Err == nil {
jobStatus := result.Data.(*model.Job)
if jobStatus, err := srv.Store.Job().Get(jobId); err == nil {
if jobStatus.Status == model.JOB_STATUS_CANCEL_REQUESTED {
close(cancelChan)
return
@@ -159,17 +160,13 @@ func GenerateNextStartDateTime(now time.Time, nextStartTime time.Time) *time.Tim
}
func (srv *JobServer) CheckForPendingJobsByType(jobType string) (bool, *model.AppError) {
if result := <-srv.Store.Job().GetCountByStatusAndType(model.JOB_STATUS_PENDING, jobType); result.Err != nil {
return false, result.Err
} else {
return result.Data.(int64) > 0, nil
count, err := srv.Store.Job().GetCountByStatusAndType(model.JOB_STATUS_PENDING, jobType)
if err != nil {
return false, err
}
return count > 0, nil
}
func (srv *JobServer) GetLastSuccessfulJobByType(jobType string) (*model.Job, *model.AppError) {
if result := <-srv.Store.Job().GetNewestJobByStatusAndType(model.JOB_STATUS_SUCCESS, jobType); result.Err != nil {
return nil, result.Err
} else {
return result.Data.(*model.Job), nil
}
return srv.Store.Job().GetNewestJobByStatusAndType(model.JOB_STATUS_SUCCESS, jobType)
}

View File

@@ -66,60 +66,60 @@ func (watcher *Watcher) Stop() {
}
func (watcher *Watcher) PollAndNotify() {
if result := <-watcher.srv.Store.Job().GetAllByStatus(model.JOB_STATUS_PENDING); result.Err != nil {
mlog.Error(fmt.Sprintf("Error occurred getting all pending statuses: %v", result.Err.Error()))
} else {
jobs := result.Data.([]*model.Job)
jobs, err := watcher.srv.Store.Job().GetAllByStatus(model.JOB_STATUS_PENDING)
if err != nil {
mlog.Error(fmt.Sprintf("Error occurred getting all pending statuses: %v", err.Error()))
return
}
for _, job := range jobs {
if job.Type == model.JOB_TYPE_DATA_RETENTION {
if watcher.workers.DataRetention != nil {
select {
case watcher.workers.DataRetention.JobChannel() <- *job:
default:
}
for _, job := range jobs {
if job.Type == model.JOB_TYPE_DATA_RETENTION {
if watcher.workers.DataRetention != nil {
select {
case watcher.workers.DataRetention.JobChannel() <- *job:
default:
}
} else if job.Type == model.JOB_TYPE_MESSAGE_EXPORT {
if watcher.workers.MessageExport != nil {
select {
case watcher.workers.MessageExport.JobChannel() <- *job:
default:
}
}
} else if job.Type == model.JOB_TYPE_MESSAGE_EXPORT {
if watcher.workers.MessageExport != nil {
select {
case watcher.workers.MessageExport.JobChannel() <- *job:
default:
}
} else if job.Type == model.JOB_TYPE_ELASTICSEARCH_POST_INDEXING {
if watcher.workers.ElasticsearchIndexing != nil {
select {
case watcher.workers.ElasticsearchIndexing.JobChannel() <- *job:
default:
}
}
} else if job.Type == model.JOB_TYPE_ELASTICSEARCH_POST_INDEXING {
if watcher.workers.ElasticsearchIndexing != nil {
select {
case watcher.workers.ElasticsearchIndexing.JobChannel() <- *job:
default:
}
} else if job.Type == model.JOB_TYPE_ELASTICSEARCH_POST_AGGREGATION {
if watcher.workers.ElasticsearchAggregation != nil {
select {
case watcher.workers.ElasticsearchAggregation.JobChannel() <- *job:
default:
}
}
} else if job.Type == model.JOB_TYPE_ELASTICSEARCH_POST_AGGREGATION {
if watcher.workers.ElasticsearchAggregation != nil {
select {
case watcher.workers.ElasticsearchAggregation.JobChannel() <- *job:
default:
}
} else if job.Type == model.JOB_TYPE_LDAP_SYNC {
if watcher.workers.LdapSync != nil {
select {
case watcher.workers.LdapSync.JobChannel() <- *job:
default:
}
}
} else if job.Type == model.JOB_TYPE_LDAP_SYNC {
if watcher.workers.LdapSync != nil {
select {
case watcher.workers.LdapSync.JobChannel() <- *job:
default:
}
} else if job.Type == model.JOB_TYPE_MIGRATIONS {
if watcher.workers.Migrations != nil {
select {
case watcher.workers.Migrations.JobChannel() <- *job:
default:
}
}
} else if job.Type == model.JOB_TYPE_MIGRATIONS {
if watcher.workers.Migrations != nil {
select {
case watcher.workers.Migrations.JobChannel() <- *job:
default:
}
} else if job.Type == model.JOB_TYPE_PLUGINS {
if watcher.workers.Plugins != nil {
select {
case watcher.workers.Plugins.JobChannel() <- *job:
default:
}
}
} else if job.Type == model.JOB_TYPE_PLUGINS {
if watcher.workers.Plugins != nil {
select {
case watcher.workers.Plugins.JobChannel() <- *job:
default:
}
}
}

View File

@@ -267,16 +267,15 @@ func (me *TestHelper) ResetRoleMigration() {
}
func (me *TestHelper) DeleteAllJobsByTypeAndMigrationKey(jobType string, migrationKey string) {
if res := <-me.App.Srv.Store.Job().GetAllByType(model.JOB_TYPE_MIGRATIONS); res.Err != nil {
panic(res.Err)
} else {
jobs := res.Data.([]*model.Job)
jobs, err := me.App.Srv.Store.Job().GetAllByType(model.JOB_TYPE_MIGRATIONS)
if err != nil {
panic(err)
}
for _, job := range jobs {
if key, ok := job.Data[JOB_DATA_KEY_MIGRATION]; ok && key == migrationKey {
if res := <-me.App.Srv.Store.Job().Delete(job.Id); res.Err != nil {
panic(res.Err)
}
for _, job := range jobs {
if key, ok := job.Data[JOB_DATA_KEY_MIGRATION]; ok && key == migrationKey {
if _, err = me.App.Srv.Store.Job().Delete(job.Id); err != nil {
panic(err)
}
}
}

View File

@@ -40,21 +40,22 @@ func GetMigrationState(migration string, store store.Store) (string, *model.Job,
return MIGRATION_STATE_COMPLETED, nil, nil
}
if result := <-store.Job().GetAllByType(model.JOB_TYPE_MIGRATIONS); result.Err != nil {
return "", nil, result.Err
} else {
for _, job := range result.Data.([]*model.Job) {
if key, ok := job.Data[JOB_DATA_KEY_MIGRATION]; ok {
if key != migration {
continue
}
jobs, err := store.Job().GetAllByType(model.JOB_TYPE_MIGRATIONS)
if err != nil {
return "", nil, err
}
switch job.Status {
case model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_PENDING:
return MIGRATION_STATE_IN_PROGRESS, job, nil
default:
return MIGRATION_STATE_UNSCHEDULED, job, nil
}
for _, job := range jobs {
if key, ok := job.Data[JOB_DATA_KEY_MIGRATION]; ok {
if key != migration {
continue
}
switch job.Status {
case model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_PENDING:
return MIGRATION_STATE_IN_PROGRESS, job, nil
default:
return MIGRATION_STATE_UNSCHEDULED, job, nil
}
}
}

View File

@@ -7,6 +7,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/mattermost/mattermost-server/model"
)
@@ -52,7 +53,8 @@ func TestGetMigrationState(t *testing.T) {
Type: model.JOB_TYPE_MIGRATIONS,
}
j1 = (<-th.App.Srv.Store.Job().Save(j1)).Data.(*model.Job)
j1, err = th.App.Srv.Store.Job().Save(j1)
require.Nil(t, err)
state, job, err = GetMigrationState(migrationKey, th.App.Srv.Store)
assert.Nil(t, err)
@@ -70,7 +72,8 @@ func TestGetMigrationState(t *testing.T) {
Type: model.JOB_TYPE_MIGRATIONS,
}
j2 = (<-th.App.Srv.Store.Job().Save(j2)).Data.(*model.Job)
j2, err = th.App.Srv.Store.Job().Save(j2)
require.Nil(t, err)
state, job, err = GetMigrationState(migrationKey, th.App.Srv.Store)
assert.Nil(t, err)
@@ -88,7 +91,8 @@ func TestGetMigrationState(t *testing.T) {
Type: model.JOB_TYPE_MIGRATIONS,
}
j3 = (<-th.App.Srv.Store.Job().Save(j3)).Data.(*model.Job)
j3, err = th.App.Srv.Store.Job().Save(j3)
require.Nil(t, err)
state, job, err = GetMigrationState(migrationKey, th.App.Srv.Store)
assert.Nil(t, err)

View File

@@ -34,267 +34,161 @@ func (jss SqlJobStore) CreateIndexesIfNotExists() {
jss.CreateIndexIfNotExists("idx_jobs_type", "Jobs", "Type")
}
func (jss SqlJobStore) Save(job *model.Job) store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
if err := jss.GetMaster().Insert(job); err != nil {
result.Err = model.NewAppError("SqlJobStore.Save", "store.sql_job.save.app_error", nil, "id="+job.Id+", "+err.Error(), http.StatusInternalServerError)
} else {
result.Data = job
}
})
func (jss SqlJobStore) Save(job *model.Job) (*model.Job, *model.AppError) {
if err := jss.GetMaster().Insert(job); err != nil {
return nil, model.NewAppError("SqlJobStore.Save", "store.sql_job.save.app_error", nil, "id="+job.Id+", "+err.Error(), http.StatusInternalServerError)
}
return job, nil
}
func (jss SqlJobStore) UpdateOptimistically(job *model.Job, currentStatus string) store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
if sqlResult, err := jss.GetMaster().Exec(
`UPDATE
Jobs
SET
LastActivityAt = :LastActivityAt,
Status = :Status,
Progress = :Progress,
Data = :Data
WHERE
Id = :Id
AND
Status = :OldStatus`,
map[string]interface{}{
"Id": job.Id,
"OldStatus": currentStatus,
"LastActivityAt": model.GetMillis(),
"Status": job.Status,
"Data": job.DataToJson(),
"Progress": job.Progress,
}); err != nil {
result.Err = model.NewAppError("SqlJobStore.UpdateOptimistically", "store.sql_job.update.app_error", nil, "id="+job.Id+", "+err.Error(), http.StatusInternalServerError)
} else {
rows, err := sqlResult.RowsAffected()
func (jss SqlJobStore) UpdateOptimistically(job *model.Job, currentStatus string) (bool, *model.AppError) {
query := "UPDATE Jobs SET LastActivityAt = :LastActivityAt, Status = :Status, Progress = :Progress, Data = :Data WHERE Id = :Id AND Status = :OldStatus"
params := map[string]interface{}{
"Id": job.Id,
"OldStatus": currentStatus,
"LastActivityAt": model.GetMillis(),
"Status": job.Status,
"Data": job.DataToJson(),
"Progress": job.Progress,
}
sqlResult, err := jss.GetMaster().Exec(query, params)
if err != nil {
return false, model.NewAppError("SqlJobStore.UpdateOptimistically", "store.sql_job.update.app_error", nil, "id="+job.Id+", "+err.Error(), http.StatusInternalServerError)
}
if err != nil {
result.Err = model.NewAppError("SqlJobStore.UpdateStatus", "store.sql_job.update.app_error", nil, "id="+job.Id+", "+err.Error(), http.StatusInternalServerError)
} else {
if rows == 1 {
result.Data = true
} else {
result.Data = false
}
}
}
})
rows, err := sqlResult.RowsAffected()
if err != nil {
return false, model.NewAppError("SqlJobStore.UpdateStatus", "store.sql_job.update.app_error", nil, "id="+job.Id+", "+err.Error(), http.StatusInternalServerError)
}
if rows != 1 {
return false, nil
}
return true, nil
}
func (jss SqlJobStore) UpdateStatus(id string, status string) store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
job := &model.Job{
Id: id,
Status: status,
LastActivityAt: model.GetMillis(),
}
func (jss SqlJobStore) UpdateStatus(id string, status string) (*model.Job, *model.AppError) {
job := &model.Job{
Id: id,
Status: status,
LastActivityAt: model.GetMillis(),
}
if _, err := jss.GetMaster().UpdateColumns(func(col *gorp.ColumnMap) bool {
return col.ColumnName == "Status" || col.ColumnName == "LastActivityAt"
}, job); err != nil {
result.Err = model.NewAppError("SqlJobStore.UpdateStatus", "store.sql_job.update.app_error", nil, "id="+id+", "+err.Error(), http.StatusInternalServerError)
}
if _, err := jss.GetMaster().UpdateColumns(func(col *gorp.ColumnMap) bool {
return col.ColumnName == "Status" || col.ColumnName == "LastActivityAt"
}, job); err != nil {
return nil, model.NewAppError("SqlJobStore.UpdateStatus", "store.sql_job.update.app_error", nil, "id="+id+", "+err.Error(), http.StatusInternalServerError)
}
if result.Err == nil {
result.Data = job
}
})
return job, nil
}
func (jss SqlJobStore) UpdateStatusOptimistically(id string, currentStatus string, newStatus string) store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
var startAtClause string
if newStatus == model.JOB_STATUS_IN_PROGRESS {
startAtClause = `StartAt = :StartAt,`
}
func (jss SqlJobStore) UpdateStatusOptimistically(id string, currentStatus string, newStatus string) (bool, *model.AppError) {
var startAtClause string
if newStatus == model.JOB_STATUS_IN_PROGRESS {
startAtClause = "StartAt = :StartAt,"
}
query := "UPDATE Jobs SET " + startAtClause + " Status = :NewStatus, LastActivityAt = :LastActivityAt WHERE Id = :Id AND Status = :OldStatus"
params := map[string]interface{}{
"Id": id,
"OldStatus": currentStatus,
"NewStatus": newStatus,
"StartAt": model.GetMillis(),
"LastActivityAt": model.GetMillis(),
}
if sqlResult, err := jss.GetMaster().Exec(
`UPDATE
Jobs
SET `+startAtClause+`
Status = :NewStatus,
LastActivityAt = :LastActivityAt
WHERE
Id = :Id
AND
Status = :OldStatus`, map[string]interface{}{"Id": id, "OldStatus": currentStatus, "NewStatus": newStatus, "StartAt": model.GetMillis(), "LastActivityAt": model.GetMillis()}); err != nil {
result.Err = model.NewAppError("SqlJobStore.UpdateStatus", "store.sql_job.update.app_error", nil, "id="+id+", "+err.Error(), http.StatusInternalServerError)
} else {
rows, err := sqlResult.RowsAffected()
sqlResult, err := jss.GetMaster().Exec(query, params)
if err != nil {
return false, model.NewAppError("SqlJobStore.UpdateStatus", "store.sql_job.update.app_error", nil, "id="+id+", "+err.Error(), http.StatusInternalServerError)
}
rows, err := sqlResult.RowsAffected()
if err != nil {
return false, model.NewAppError("SqlJobStore.UpdateStatus", "store.sql_job.update.app_error", nil, "id="+id+", "+err.Error(), http.StatusInternalServerError)
}
if rows != 1 {
return false, nil
}
if err != nil {
result.Err = model.NewAppError("SqlJobStore.UpdateStatus", "store.sql_job.update.app_error", nil, "id="+id+", "+err.Error(), http.StatusInternalServerError)
} else {
if rows == 1 {
result.Data = true
} else {
result.Data = false
}
}
}
})
return true, nil
}
func (jss SqlJobStore) Get(id string) store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
var status *model.Job
func (jss SqlJobStore) Get(id string) (*model.Job, *model.AppError) {
query := "SELECT * FROM Jobs WHERE Id = :Id"
if err := jss.GetReplica().SelectOne(&status,
`SELECT
*
FROM
Jobs
WHERE
Id = :Id`, map[string]interface{}{"Id": id}); err != nil {
if err == sql.ErrNoRows {
result.Err = model.NewAppError("SqlJobStore.Get", "store.sql_job.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusNotFound)
} else {
result.Err = model.NewAppError("SqlJobStore.Get", "store.sql_job.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusInternalServerError)
}
} else {
result.Data = status
var status *model.Job
if err := jss.GetReplica().SelectOne(&status, query, map[string]interface{}{"Id": id}); err != nil {
if err == sql.ErrNoRows {
return nil, model.NewAppError("SqlJobStore.Get", "store.sql_job.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusNotFound)
}
})
return nil, model.NewAppError("SqlJobStore.Get", "store.sql_job.get.app_error", nil, "Id="+id+", "+err.Error(), http.StatusInternalServerError)
}
return status, nil
}
func (jss SqlJobStore) GetAllPage(offset int, limit int) store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
var statuses []*model.Job
func (jss SqlJobStore) GetAllPage(offset int, limit int) ([]*model.Job, *model.AppError) {
query := "SELECT * FROM Jobs ORDER BY CreateAt DESC LIMIT :Limit OFFSET :Offset"
if _, err := jss.GetReplica().Select(&statuses,
`SELECT
*
FROM
Jobs
ORDER BY
CreateAt DESC
LIMIT
:Limit
OFFSET
:Offset`, map[string]interface{}{"Limit": limit, "Offset": offset}); err != nil {
result.Err = model.NewAppError("SqlJobStore.GetAllPage", "store.sql_job.get_all.app_error", nil, err.Error(), http.StatusInternalServerError)
} else {
result.Data = statuses
}
})
var statuses []*model.Job
if _, err := jss.GetReplica().Select(&statuses, query, map[string]interface{}{"Limit": limit, "Offset": offset}); err != nil {
return nil, model.NewAppError("SqlJobStore.GetAllPage", "store.sql_job.get_all.app_error", nil, err.Error(), http.StatusInternalServerError)
}
return statuses, nil
}
func (jss SqlJobStore) GetAllByType(jobType string) store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
var statuses []*model.Job
if _, err := jss.GetReplica().Select(&statuses,
`SELECT
*
FROM
Jobs
WHERE
Type = :Type
ORDER BY
CreateAt DESC`, map[string]interface{}{"Type": jobType}); err != nil {
result.Err = model.NewAppError("SqlJobStore.GetAllByType", "store.sql_job.get_all.app_error", nil, "Type="+jobType+", "+err.Error(), http.StatusInternalServerError)
} else {
result.Data = statuses
}
})
func (jss SqlJobStore) GetAllByType(jobType string) ([]*model.Job, *model.AppError) {
query := "SELECT * FROM Jobs WHERE Type = :Type ORDER BY CreateAt DESC"
var statuses []*model.Job
if _, err := jss.GetReplica().Select(&statuses, query, map[string]interface{}{"Type": jobType}); err != nil {
return nil, model.NewAppError("SqlJobStore.GetAllByType", "store.sql_job.get_all.app_error", nil, "Type="+jobType+", "+err.Error(), http.StatusInternalServerError)
}
return statuses, nil
}
func (jss SqlJobStore) GetAllByTypePage(jobType string, offset int, limit int) store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
var statuses []*model.Job
func (jss SqlJobStore) GetAllByTypePage(jobType string, offset int, limit int) ([]*model.Job, *model.AppError) {
query := "SELECT * FROM Jobs WHERE Type = :Type ORDER BY CreateAt DESC LIMIT :Limit OFFSET :Offset"
if _, err := jss.GetReplica().Select(&statuses,
`SELECT
*
FROM
Jobs
WHERE
Type = :Type
ORDER BY
CreateAt DESC
LIMIT
:Limit
OFFSET
:Offset`, map[string]interface{}{"Type": jobType, "Limit": limit, "Offset": offset}); err != nil {
result.Err = model.NewAppError("SqlJobStore.GetAllByTypePage", "store.sql_job.get_all.app_error", nil, "Type="+jobType+", "+err.Error(), http.StatusInternalServerError)
} else {
result.Data = statuses
}
})
var statuses []*model.Job
if _, err := jss.GetReplica().Select(&statuses, query, map[string]interface{}{"Type": jobType, "Limit": limit, "Offset": offset}); err != nil {
return nil, model.NewAppError("SqlJobStore.GetAllByTypePage", "store.sql_job.get_all.app_error", nil, "Type="+jobType+", "+err.Error(), http.StatusInternalServerError)
}
return statuses, nil
}
func (jss SqlJobStore) GetAllByStatus(status string) store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
var statuses []*model.Job
func (jss SqlJobStore) GetAllByStatus(status string) ([]*model.Job, *model.AppError) {
var statuses []*model.Job
query := "SELECT * FROM Jobs WHERE Status = :Status ORDER BY CreateAt ASC"
if _, err := jss.GetReplica().Select(&statuses,
`SELECT
*
FROM
Jobs
WHERE
Status = :Status
ORDER BY
CreateAt ASC`, map[string]interface{}{"Status": status}); err != nil {
result.Err = model.NewAppError("SqlJobStore.GetAllByStatus", "store.sql_job.get_all.app_error", nil, "Status="+status+", "+err.Error(), http.StatusInternalServerError)
} else {
result.Data = statuses
}
})
if _, err := jss.GetReplica().Select(&statuses, query, map[string]interface{}{"Status": status}); err != nil {
return nil, model.NewAppError("SqlJobStore.GetAllByStatus", "store.sql_job.get_all.app_error", nil, "Status="+status+", "+err.Error(), http.StatusInternalServerError)
}
return statuses, nil
}
func (jss SqlJobStore) GetNewestJobByStatusAndType(status string, jobType string) store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
var job *model.Job
func (jss SqlJobStore) GetNewestJobByStatusAndType(status string, jobType string) (*model.Job, *model.AppError) {
query := "SELECT * FROM Jobs WHERE Status = :Status AND Type = :Type ORDER BY CreateAt DESC LIMIT 1"
if err := jss.GetReplica().SelectOne(&job,
`SELECT
*
FROM
Jobs
WHERE
Status = :Status
AND
Type = :Type
ORDER BY
CreateAt DESC
LIMIT 1`, map[string]interface{}{"Status": status, "Type": jobType}); err != nil && err != sql.ErrNoRows {
result.Err = model.NewAppError("SqlJobStore.GetNewestJobByStatusAndType", "store.sql_job.get_newest_job_by_status_and_type.app_error", nil, "Status="+status+", "+err.Error(), http.StatusInternalServerError)
} else {
result.Data = job
}
})
var job *model.Job
if err := jss.GetReplica().SelectOne(&job, query, map[string]interface{}{"Status": status, "Type": jobType}); err != nil && err != sql.ErrNoRows {
return nil, model.NewAppError("SqlJobStore.GetNewestJobByStatusAndType", "store.sql_job.get_newest_job_by_status_and_type.app_error", nil, "Status="+status+", "+err.Error(), http.StatusInternalServerError)
}
return job, nil
}
func (jss SqlJobStore) GetCountByStatusAndType(status string, jobType string) store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
if count, err := jss.GetReplica().SelectInt(`SELECT
COUNT(*)
FROM
Jobs
WHERE
Status = :Status
AND
Type = :Type`, map[string]interface{}{"Status": status, "Type": jobType}); err != nil {
result.Err = model.NewAppError("SqlJobStore.GetCountByStatusAndType", "store.sql_job.get_count_by_status_and_type.app_error", nil, "Status="+status+", "+err.Error(), http.StatusInternalServerError)
} else {
result.Data = count
}
})
func (jss SqlJobStore) GetCountByStatusAndType(status string, jobType string) (int64, *model.AppError) {
query := "SELECT COUNT(*) FROM Jobs WHERE Status = :Status AND Type = :Type"
count, err := jss.GetReplica().SelectInt(query, map[string]interface{}{"Status": status, "Type": jobType})
if err != nil {
return int64(0), model.NewAppError("SqlJobStore.GetCountByStatusAndType", "store.sql_job.get_count_by_status_and_type.app_error", nil, "Status="+status+", "+err.Error(), http.StatusInternalServerError)
}
return count, nil
}
func (jss SqlJobStore) Delete(id string) store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
if _, err := jss.GetMaster().Exec(
`DELETE FROM
Jobs
WHERE
Id = :Id`, map[string]interface{}{"Id": id}); err != nil {
result.Err = model.NewAppError("SqlJobStore.DeleteByType", "store.sql_job.delete.app_error", nil, "id="+id+", "+err.Error(), http.StatusInternalServerError)
} else {
result.Data = id
}
})
func (jss SqlJobStore) Delete(id string) (string, *model.AppError) {
query := "DELETE FROM Jobs WHERE Id = :Id"
if _, err := jss.GetMaster().Exec(query, map[string]interface{}{"Id": id}); err != nil {
return "", model.NewAppError("SqlJobStore.DeleteByType", "store.sql_job.delete.app_error", nil, "id="+id+", "+err.Error(), http.StatusInternalServerError)
}
return id, nil
}

View File

@@ -500,18 +500,18 @@ type ReactionStore interface {
}
type JobStore interface {
Save(job *model.Job) StoreChannel
UpdateOptimistically(job *model.Job, currentStatus string) StoreChannel
UpdateStatus(id string, status string) StoreChannel
UpdateStatusOptimistically(id string, currentStatus string, newStatus string) StoreChannel
Get(id string) StoreChannel
GetAllPage(offset int, limit int) StoreChannel
GetAllByType(jobType string) StoreChannel
GetAllByTypePage(jobType string, offset int, limit int) StoreChannel
GetAllByStatus(status string) StoreChannel
GetNewestJobByStatusAndType(status string, jobType string) StoreChannel
GetCountByStatusAndType(status string, jobType string) StoreChannel
Delete(id string) StoreChannel
Save(job *model.Job) (*model.Job, *model.AppError)
UpdateOptimistically(job *model.Job, currentStatus string) (bool, *model.AppError)
UpdateStatus(id string, status string) (*model.Job, *model.AppError)
UpdateStatusOptimistically(id string, currentStatus string, newStatus string) (bool, *model.AppError)
Get(id string) (*model.Job, *model.AppError)
GetAllPage(offset int, limit int) ([]*model.Job, *model.AppError)
GetAllByType(jobType string) ([]*model.Job, *model.AppError)
GetAllByTypePage(jobType string, offset int, limit int) ([]*model.Job, *model.AppError)
GetAllByStatus(status string) ([]*model.Job, *model.AppError)
GetNewestJobByStatusAndType(status string, jobType string) (*model.Job, *model.AppError)
GetCountByStatusAndType(status string, jobType string) (int64, *model.AppError)
Delete(id string) (string, *model.AppError)
}
type UserAccessTokenStore interface {

View File

@@ -11,6 +11,7 @@ import (
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/store"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestJobStore(t *testing.T, ss store.Store) {
@@ -38,17 +39,14 @@ func testJobSaveGet(t *testing.T, ss store.Store) {
},
}
if result := <-ss.Job().Save(job); result.Err != nil {
t.Fatal(result.Err)
}
_, err := ss.Job().Save(job)
require.Nil(t, err)
defer func() {
<-ss.Job().Delete(job.Id)
}()
defer ss.Job().Delete(job.Id)
if result := <-ss.Job().Get(job.Id); result.Err != nil {
t.Fatal(result.Err)
} else if received := result.Data.(*model.Job); received.Id != job.Id {
if received, err := ss.Job().Get(job.Id); err != nil {
t.Fatal(err)
} else if received.Id != job.Id {
t.Fatal("received incorrect job after save")
} else if received.Data["Total"] != "12345" {
t.Fatal("data field was not retrieved successfully:", received.Data)
@@ -74,13 +72,14 @@ func testJobGetAllByType(t *testing.T, ss store.Store) {
}
for _, job := range jobs {
store.Must(ss.Job().Save(job))
_, err := ss.Job().Save(job)
require.Nil(t, err)
defer ss.Job().Delete(job.Id)
}
if result := <-ss.Job().GetAllByType(jobType); result.Err != nil {
t.Fatal(result.Err)
} else if received := result.Data.([]*model.Job); len(received) != 2 {
if received, err := ss.Job().GetAllByType(jobType); err != nil {
t.Fatal(err)
} else if len(received) != 2 {
t.Fatal("received wrong number of jobs")
} else if received[0].Id != jobs[0].Id && received[1].Id != jobs[0].Id {
t.Fatal("should've received first jobs")
@@ -116,13 +115,14 @@ func testJobGetAllByTypePage(t *testing.T, ss store.Store) {
}
for _, job := range jobs {
store.Must(ss.Job().Save(job))
_, err := ss.Job().Save(job)
require.Nil(t, err)
defer ss.Job().Delete(job.Id)
}
if result := <-ss.Job().GetAllByTypePage(jobType, 0, 2); result.Err != nil {
t.Fatal(result.Err)
} else if received := result.Data.([]*model.Job); len(received) != 2 {
if received, err := ss.Job().GetAllByTypePage(jobType, 0, 2); err != nil {
t.Fatal(err)
} else if len(received) != 2 {
t.Fatal("received wrong number of jobs")
} else if received[0].Id != jobs[2].Id {
t.Fatal("should've received newest job first")
@@ -130,9 +130,9 @@ func testJobGetAllByTypePage(t *testing.T, ss store.Store) {
t.Fatal("should've received second newest job second")
}
if result := <-ss.Job().GetAllByTypePage(jobType, 2, 2); result.Err != nil {
t.Fatal(result.Err)
} else if received := result.Data.([]*model.Job); len(received) != 1 {
if received, err := ss.Job().GetAllByTypePage(jobType, 2, 2); err != nil {
t.Fatal(err)
} else if len(received) != 1 {
t.Fatal("received wrong number of jobs")
} else if received[0].Id != jobs[1].Id {
t.Fatal("should've received oldest job last")
@@ -162,13 +162,14 @@ func testJobGetAllPage(t *testing.T, ss store.Store) {
}
for _, job := range jobs {
store.Must(ss.Job().Save(job))
_, err := ss.Job().Save(job)
require.Nil(t, err)
defer ss.Job().Delete(job.Id)
}
if result := <-ss.Job().GetAllPage(0, 2); result.Err != nil {
t.Fatal(result.Err)
} else if received := result.Data.([]*model.Job); len(received) != 2 {
if received, err := ss.Job().GetAllPage(0, 2); err != nil {
t.Fatal(err)
} else if len(received) != 2 {
t.Fatal("received wrong number of jobs")
} else if received[0].Id != jobs[2].Id {
t.Fatal("should've received newest job first")
@@ -176,9 +177,9 @@ func testJobGetAllPage(t *testing.T, ss store.Store) {
t.Fatal("should've received second newest job second")
}
if result := <-ss.Job().GetAllPage(2, 2); result.Err != nil {
t.Fatal(result.Err)
} else if received := result.Data.([]*model.Job); len(received) < 1 {
if received, err := ss.Job().GetAllPage(2, 2); err != nil {
t.Fatal(err)
} else if len(received) < 1 {
t.Fatal("received wrong number of jobs")
} else if received[0].Id != jobs[1].Id {
t.Fatal("should've received oldest job last")
@@ -220,13 +221,14 @@ func testJobGetAllByStatus(t *testing.T, ss store.Store) {
}
for _, job := range jobs {
store.Must(ss.Job().Save(job))
_, err := ss.Job().Save(job)
require.Nil(t, err)
defer ss.Job().Delete(job.Id)
}
if result := <-ss.Job().GetAllByStatus(status); result.Err != nil {
t.Fatal(result.Err)
} else if received := result.Data.([]*model.Job); len(received) != 3 {
if received, err := ss.Job().GetAllByStatus(status); err != nil {
t.Fatal(err)
} else if len(received) != 3 {
t.Fatal("received wrong number of jobs")
} else if received[0].Id != jobs[1].Id || received[1].Id != jobs[0].Id || received[2].Id != jobs[2].Id {
t.Fatal("should've received jobs ordered by CreateAt time")
@@ -269,17 +271,18 @@ func testJobStoreGetNewestJobByStatusAndType(t *testing.T, ss store.Store) {
}
for _, job := range jobs {
store.Must(ss.Job().Save(job))
_, err := ss.Job().Save(job)
require.Nil(t, err)
defer ss.Job().Delete(job.Id)
}
result := <-ss.Job().GetNewestJobByStatusAndType(status1, jobType1)
assert.Nil(t, result.Err)
assert.EqualValues(t, jobs[0].Id, result.Data.(*model.Job).Id)
received, err := ss.Job().GetNewestJobByStatusAndType(status1, jobType1)
assert.Nil(t, err)
assert.EqualValues(t, jobs[0].Id, received.Id)
result = <-ss.Job().GetNewestJobByStatusAndType(model.NewId(), model.NewId())
assert.Nil(t, result.Err)
assert.Nil(t, result.Data.(*model.Job))
received, err = ss.Job().GetNewestJobByStatusAndType(model.NewId(), model.NewId())
assert.Nil(t, err)
assert.Nil(t, received)
}
func testJobStoreGetCountByStatusAndType(t *testing.T, ss store.Store) {
@@ -316,25 +319,26 @@ func testJobStoreGetCountByStatusAndType(t *testing.T, ss store.Store) {
}
for _, job := range jobs {
store.Must(ss.Job().Save(job))
_, err := ss.Job().Save(job)
require.Nil(t, err)
defer ss.Job().Delete(job.Id)
}
result := <-ss.Job().GetCountByStatusAndType(status1, jobType1)
assert.Nil(t, result.Err)
assert.EqualValues(t, 2, result.Data.(int64))
count, err := ss.Job().GetCountByStatusAndType(status1, jobType1)
assert.Nil(t, err)
assert.EqualValues(t, 2, count)
result = <-ss.Job().GetCountByStatusAndType(status2, jobType2)
assert.Nil(t, result.Err)
assert.EqualValues(t, 0, result.Data.(int64))
count, err = ss.Job().GetCountByStatusAndType(status2, jobType2)
assert.Nil(t, err)
assert.EqualValues(t, 0, count)
result = <-ss.Job().GetCountByStatusAndType(status1, jobType2)
assert.Nil(t, result.Err)
assert.EqualValues(t, 1, result.Data.(int64))
count, err = ss.Job().GetCountByStatusAndType(status1, jobType2)
assert.Nil(t, err)
assert.EqualValues(t, 1, count)
result = <-ss.Job().GetCountByStatusAndType(status2, jobType1)
assert.Nil(t, result.Err)
assert.EqualValues(t, 1, result.Data.(int64))
count, err = ss.Job().GetCountByStatusAndType(status2, jobType1)
assert.Nil(t, err)
assert.EqualValues(t, 1, count)
}
func testJobUpdateOptimistically(t *testing.T, ss store.Store) {
@@ -345,9 +349,8 @@ func testJobUpdateOptimistically(t *testing.T, ss store.Store) {
Status: model.JOB_STATUS_PENDING,
}
if result := <-ss.Job().Save(job); result.Err != nil {
t.Fatal(result.Err)
}
_, err := ss.Job().Save(job)
require.Nil(t, err)
defer ss.Job().Delete(job.Id)
job.LastActivityAt = model.GetMillis()
@@ -357,34 +360,24 @@ func testJobUpdateOptimistically(t *testing.T, ss store.Store) {
"Foo": "Bar",
}
if result := <-ss.Job().UpdateOptimistically(job, model.JOB_STATUS_SUCCESS); result.Err != nil {
if result.Data.(bool) {
if updated, err2 := ss.Job().UpdateOptimistically(job, model.JOB_STATUS_SUCCESS); err2 != nil {
if updated {
t.Fatal("should have failed due to incorrect old status")
}
}
time.Sleep(2 * time.Millisecond)
if result := <-ss.Job().UpdateOptimistically(job, model.JOB_STATUS_PENDING); result.Err != nil {
t.Fatal(result.Err)
} else {
if !result.Data.(bool) {
t.Fatal("Should have successfully updated")
}
updated, err := ss.Job().UpdateOptimistically(job, model.JOB_STATUS_PENDING)
require.Nil(t, err)
require.True(t, updated)
var updatedJob *model.Job
updatedJob, err := ss.Job().Get(job.Id)
require.Nil(t, err)
if result := <-ss.Job().Get(job.Id); result.Err != nil {
t.Fatal(result.Err)
} else {
updatedJob = result.Data.(*model.Job)
}
if updatedJob.Type != job.Type || updatedJob.CreateAt != job.CreateAt || updatedJob.Status != job.Status || updatedJob.LastActivityAt <= job.LastActivityAt || updatedJob.Progress != job.Progress || updatedJob.Data["Foo"] != job.Data["Foo"] {
t.Fatal("Some update property was not as expected")
}
if updatedJob.Type != job.Type || updatedJob.CreateAt != job.CreateAt || updatedJob.Status != job.Status || updatedJob.LastActivityAt <= job.LastActivityAt || updatedJob.Progress != job.Progress || updatedJob.Data["Foo"] != job.Data["Foo"] {
t.Fatal("Some update property was not as expected")
}
}
func testJobUpdateStatusUpdateStatusOptimistically(t *testing.T, ss store.Store) {
@@ -396,111 +389,85 @@ func testJobUpdateStatusUpdateStatusOptimistically(t *testing.T, ss store.Store)
}
var lastUpdateAt int64
if result := <-ss.Job().Save(job); result.Err != nil {
t.Fatal(result.Err)
} else {
lastUpdateAt = result.Data.(*model.Job).LastActivityAt
}
received, err := ss.Job().Save(job)
require.Nil(t, err)
lastUpdateAt = received.LastActivityAt
defer ss.Job().Delete(job.Id)
time.Sleep(2 * time.Millisecond)
if result := <-ss.Job().UpdateStatus(job.Id, model.JOB_STATUS_PENDING); result.Err != nil {
t.Fatal(result.Err)
} else {
received := result.Data.(*model.Job)
if received.Status != model.JOB_STATUS_PENDING {
t.Fatal("status wasn't updated")
}
if received.LastActivityAt <= lastUpdateAt {
t.Fatal("lastActivityAt wasn't updated")
}
lastUpdateAt = received.LastActivityAt
received, err = ss.Job().UpdateStatus(job.Id, model.JOB_STATUS_PENDING)
require.Nil(t, err)
if received.Status != model.JOB_STATUS_PENDING {
t.Fatal("status wasn't updated")
}
if received.LastActivityAt <= lastUpdateAt {
t.Fatal("lastActivityAt wasn't updated")
}
lastUpdateAt = received.LastActivityAt
time.Sleep(2 * time.Millisecond)
updated, err := ss.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_SUCCESS)
require.Nil(t, err)
require.False(t, updated)
received, err = ss.Job().Get(job.Id)
require.Nil(t, err)
if received.Status != model.JOB_STATUS_PENDING {
t.Fatal("should still be pending")
}
if received.LastActivityAt != lastUpdateAt {
t.Fatal("last activity at shouldn't have changed")
}
time.Sleep(2 * time.Millisecond)
if result := <-ss.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_SUCCESS); result.Err != nil {
t.Fatal(result.Err)
} else {
if result.Data.(bool) {
t.Fatal("should be false due to incorrect original status")
}
}
if result := <-ss.Job().Get(job.Id); result.Err != nil {
t.Fatal(result.Err)
} else {
received := result.Data.(*model.Job)
if received.Status != model.JOB_STATUS_PENDING {
t.Fatal("should still be pending")
}
if received.LastActivityAt != lastUpdateAt {
t.Fatal("last activity at shouldn't have changed")
}
}
time.Sleep(2 * time.Millisecond)
if result := <-ss.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_IN_PROGRESS); result.Err != nil {
t.Fatal(result.Err)
} else {
if !result.Data.(bool) {
t.Fatal("should have succeeded")
}
}
updated, err = ss.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_PENDING, model.JOB_STATUS_IN_PROGRESS)
require.Nil(t, err)
require.True(t, updated, "should have succeeded")
var startAtSet int64
if result := <-ss.Job().Get(job.Id); result.Err != nil {
t.Fatal(result.Err)
} else {
received := result.Data.(*model.Job)
if received.Status != model.JOB_STATUS_IN_PROGRESS {
t.Fatal("should be in progress")
}
if received.StartAt == 0 {
t.Fatal("received should have start at set")
}
if received.LastActivityAt <= lastUpdateAt {
t.Fatal("lastActivityAt wasn't updated")
}
lastUpdateAt = received.LastActivityAt
startAtSet = received.StartAt
received, err = ss.Job().Get(job.Id)
require.Nil(t, err)
if received.Status != model.JOB_STATUS_IN_PROGRESS {
t.Fatal("should be in progress")
}
if received.StartAt == 0 {
t.Fatal("received should have start at set")
}
if received.LastActivityAt <= lastUpdateAt {
t.Fatal("lastActivityAt wasn't updated")
}
lastUpdateAt = received.LastActivityAt
startAtSet = received.StartAt
time.Sleep(2 * time.Millisecond)
if result := <-ss.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_SUCCESS); result.Err != nil {
t.Fatal(result.Err)
} else {
if !result.Data.(bool) {
t.Fatal("should have succeeded")
}
}
updated, err = ss.Job().UpdateStatusOptimistically(job.Id, model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_SUCCESS)
require.Nil(t, err)
require.True(t, updated, "should have succeeded")
if result := <-ss.Job().Get(job.Id); result.Err != nil {
t.Fatal(result.Err)
} else {
received := result.Data.(*model.Job)
if received.Status != model.JOB_STATUS_SUCCESS {
t.Fatal("should be success status")
}
if received.StartAt != startAtSet {
t.Fatal("startAt should not have changed")
}
if received.LastActivityAt <= lastUpdateAt {
t.Fatal("lastActivityAt wasn't updated")
}
received, err = ss.Job().Get(job.Id)
require.Nil(t, err)
if received.Status != model.JOB_STATUS_SUCCESS {
t.Fatal("should be success status")
}
if received.StartAt != startAtSet {
t.Fatal("startAt should not have changed")
}
if received.LastActivityAt <= lastUpdateAt {
t.Fatal("lastActivityAt wasn't updated")
}
}
func testJobDelete(t *testing.T, ss store.Store) {
job := store.Must(ss.Job().Save(&model.Job{
Id: model.NewId(),
})).(*model.Job)
job, err := ss.Job().Save(&model.Job{Id: model.NewId()})
require.Nil(t, err)
if result := <-ss.Job().Delete(job.Id); result.Err != nil {
t.Fatal(result.Err)
}
_, err = ss.Job().Delete(job.Id)
assert.Nil(t, err)
}

View File

@@ -6,7 +6,6 @@ package mocks
import mock "github.com/stretchr/testify/mock"
import model "github.com/mattermost/mattermost-server/model"
import store "github.com/mattermost/mattermost-server/store"
// JobStore is an autogenerated mock type for the JobStore type
type JobStore struct {
@@ -14,193 +13,293 @@ type JobStore struct {
}
// Delete provides a mock function with given fields: id
func (_m *JobStore) Delete(id string) store.StoreChannel {
func (_m *JobStore) Delete(id string) (string, *model.AppError) {
ret := _m.Called(id)
var r0 store.StoreChannel
if rf, ok := ret.Get(0).(func(string) store.StoreChannel); ok {
var r0 string
if rf, ok := ret.Get(0).(func(string) string); ok {
r0 = rf(id)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(store.StoreChannel)
r0 = ret.Get(0).(string)
}
var r1 *model.AppError
if rf, ok := ret.Get(1).(func(string) *model.AppError); ok {
r1 = rf(id)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(*model.AppError)
}
}
return r0
return r0, r1
}
// Get provides a mock function with given fields: id
func (_m *JobStore) Get(id string) store.StoreChannel {
func (_m *JobStore) Get(id string) (*model.Job, *model.AppError) {
ret := _m.Called(id)
var r0 store.StoreChannel
if rf, ok := ret.Get(0).(func(string) store.StoreChannel); ok {
var r0 *model.Job
if rf, ok := ret.Get(0).(func(string) *model.Job); ok {
r0 = rf(id)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(store.StoreChannel)
r0 = ret.Get(0).(*model.Job)
}
}
return r0
var r1 *model.AppError
if rf, ok := ret.Get(1).(func(string) *model.AppError); ok {
r1 = rf(id)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(*model.AppError)
}
}
return r0, r1
}
// GetAllByStatus provides a mock function with given fields: status
func (_m *JobStore) GetAllByStatus(status string) store.StoreChannel {
func (_m *JobStore) GetAllByStatus(status string) ([]*model.Job, *model.AppError) {
ret := _m.Called(status)
var r0 store.StoreChannel
if rf, ok := ret.Get(0).(func(string) store.StoreChannel); ok {
var r0 []*model.Job
if rf, ok := ret.Get(0).(func(string) []*model.Job); ok {
r0 = rf(status)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(store.StoreChannel)
r0 = ret.Get(0).([]*model.Job)
}
}
return r0
var r1 *model.AppError
if rf, ok := ret.Get(1).(func(string) *model.AppError); ok {
r1 = rf(status)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(*model.AppError)
}
}
return r0, r1
}
// GetAllByType provides a mock function with given fields: jobType
func (_m *JobStore) GetAllByType(jobType string) store.StoreChannel {
func (_m *JobStore) GetAllByType(jobType string) ([]*model.Job, *model.AppError) {
ret := _m.Called(jobType)
var r0 store.StoreChannel
if rf, ok := ret.Get(0).(func(string) store.StoreChannel); ok {
var r0 []*model.Job
if rf, ok := ret.Get(0).(func(string) []*model.Job); ok {
r0 = rf(jobType)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(store.StoreChannel)
r0 = ret.Get(0).([]*model.Job)
}
}
return r0
var r1 *model.AppError
if rf, ok := ret.Get(1).(func(string) *model.AppError); ok {
r1 = rf(jobType)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(*model.AppError)
}
}
return r0, r1
}
// GetAllByTypePage provides a mock function with given fields: jobType, offset, limit
func (_m *JobStore) GetAllByTypePage(jobType string, offset int, limit int) store.StoreChannel {
func (_m *JobStore) GetAllByTypePage(jobType string, offset int, limit int) ([]*model.Job, *model.AppError) {
ret := _m.Called(jobType, offset, limit)
var r0 store.StoreChannel
if rf, ok := ret.Get(0).(func(string, int, int) store.StoreChannel); ok {
var r0 []*model.Job
if rf, ok := ret.Get(0).(func(string, int, int) []*model.Job); ok {
r0 = rf(jobType, offset, limit)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(store.StoreChannel)
r0 = ret.Get(0).([]*model.Job)
}
}
return r0
var r1 *model.AppError
if rf, ok := ret.Get(1).(func(string, int, int) *model.AppError); ok {
r1 = rf(jobType, offset, limit)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(*model.AppError)
}
}
return r0, r1
}
// GetAllPage provides a mock function with given fields: offset, limit
func (_m *JobStore) GetAllPage(offset int, limit int) store.StoreChannel {
func (_m *JobStore) GetAllPage(offset int, limit int) ([]*model.Job, *model.AppError) {
ret := _m.Called(offset, limit)
var r0 store.StoreChannel
if rf, ok := ret.Get(0).(func(int, int) store.StoreChannel); ok {
var r0 []*model.Job
if rf, ok := ret.Get(0).(func(int, int) []*model.Job); ok {
r0 = rf(offset, limit)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(store.StoreChannel)
r0 = ret.Get(0).([]*model.Job)
}
}
return r0
var r1 *model.AppError
if rf, ok := ret.Get(1).(func(int, int) *model.AppError); ok {
r1 = rf(offset, limit)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(*model.AppError)
}
}
return r0, r1
}
// GetCountByStatusAndType provides a mock function with given fields: status, jobType
func (_m *JobStore) GetCountByStatusAndType(status string, jobType string) store.StoreChannel {
func (_m *JobStore) GetCountByStatusAndType(status string, jobType string) (int64, *model.AppError) {
ret := _m.Called(status, jobType)
var r0 store.StoreChannel
if rf, ok := ret.Get(0).(func(string, string) store.StoreChannel); ok {
var r0 int64
if rf, ok := ret.Get(0).(func(string, string) int64); ok {
r0 = rf(status, jobType)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(store.StoreChannel)
r0 = ret.Get(0).(int64)
}
var r1 *model.AppError
if rf, ok := ret.Get(1).(func(string, string) *model.AppError); ok {
r1 = rf(status, jobType)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(*model.AppError)
}
}
return r0
return r0, r1
}
// GetNewestJobByStatusAndType provides a mock function with given fields: status, jobType
func (_m *JobStore) GetNewestJobByStatusAndType(status string, jobType string) store.StoreChannel {
func (_m *JobStore) GetNewestJobByStatusAndType(status string, jobType string) (*model.Job, *model.AppError) {
ret := _m.Called(status, jobType)
var r0 store.StoreChannel
if rf, ok := ret.Get(0).(func(string, string) store.StoreChannel); ok {
var r0 *model.Job
if rf, ok := ret.Get(0).(func(string, string) *model.Job); ok {
r0 = rf(status, jobType)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(store.StoreChannel)
r0 = ret.Get(0).(*model.Job)
}
}
return r0
var r1 *model.AppError
if rf, ok := ret.Get(1).(func(string, string) *model.AppError); ok {
r1 = rf(status, jobType)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(*model.AppError)
}
}
return r0, r1
}
// Save provides a mock function with given fields: job
func (_m *JobStore) Save(job *model.Job) store.StoreChannel {
func (_m *JobStore) Save(job *model.Job) (*model.Job, *model.AppError) {
ret := _m.Called(job)
var r0 store.StoreChannel
if rf, ok := ret.Get(0).(func(*model.Job) store.StoreChannel); ok {
var r0 *model.Job
if rf, ok := ret.Get(0).(func(*model.Job) *model.Job); ok {
r0 = rf(job)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(store.StoreChannel)
r0 = ret.Get(0).(*model.Job)
}
}
return r0
var r1 *model.AppError
if rf, ok := ret.Get(1).(func(*model.Job) *model.AppError); ok {
r1 = rf(job)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(*model.AppError)
}
}
return r0, r1
}
// UpdateOptimistically provides a mock function with given fields: job, currentStatus
func (_m *JobStore) UpdateOptimistically(job *model.Job, currentStatus string) store.StoreChannel {
func (_m *JobStore) UpdateOptimistically(job *model.Job, currentStatus string) (bool, *model.AppError) {
ret := _m.Called(job, currentStatus)
var r0 store.StoreChannel
if rf, ok := ret.Get(0).(func(*model.Job, string) store.StoreChannel); ok {
var r0 bool
if rf, ok := ret.Get(0).(func(*model.Job, string) bool); ok {
r0 = rf(job, currentStatus)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(store.StoreChannel)
r0 = ret.Get(0).(bool)
}
var r1 *model.AppError
if rf, ok := ret.Get(1).(func(*model.Job, string) *model.AppError); ok {
r1 = rf(job, currentStatus)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(*model.AppError)
}
}
return r0
return r0, r1
}
// UpdateStatus provides a mock function with given fields: id, status
func (_m *JobStore) UpdateStatus(id string, status string) store.StoreChannel {
func (_m *JobStore) UpdateStatus(id string, status string) (*model.Job, *model.AppError) {
ret := _m.Called(id, status)
var r0 store.StoreChannel
if rf, ok := ret.Get(0).(func(string, string) store.StoreChannel); ok {
var r0 *model.Job
if rf, ok := ret.Get(0).(func(string, string) *model.Job); ok {
r0 = rf(id, status)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(store.StoreChannel)
r0 = ret.Get(0).(*model.Job)
}
}
return r0
var r1 *model.AppError
if rf, ok := ret.Get(1).(func(string, string) *model.AppError); ok {
r1 = rf(id, status)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(*model.AppError)
}
}
return r0, r1
}
// UpdateStatusOptimistically provides a mock function with given fields: id, currentStatus, newStatus
func (_m *JobStore) UpdateStatusOptimistically(id string, currentStatus string, newStatus string) store.StoreChannel {
func (_m *JobStore) UpdateStatusOptimistically(id string, currentStatus string, newStatus string) (bool, *model.AppError) {
ret := _m.Called(id, currentStatus, newStatus)
var r0 store.StoreChannel
if rf, ok := ret.Get(0).(func(string, string, string) store.StoreChannel); ok {
var r0 bool
if rf, ok := ret.Get(0).(func(string, string, string) bool); ok {
r0 = rf(id, currentStatus, newStatus)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(store.StoreChannel)
r0 = ret.Get(0).(bool)
}
var r1 *model.AppError
if rf, ok := ret.Get(1).(func(string, string, string) *model.AppError); ok {
r1 = rf(id, currentStatus, newStatus)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(*model.AppError)
}
}
return r0
return r0, r1
}