mirror of
https://github.com/mattermost/mattermost.git
synced 2025-02-25 18:55:24 -06:00
PLT-7644: Improve job scheduler architecture. (#7532)
This commit is contained in:
committed by
Corey Hulen
parent
f263d2b951
commit
a06830b2f8
@@ -5679,6 +5679,14 @@
|
||||
"id": "store.sql_job.update.app_error",
|
||||
"translation": "We couldn't update the job"
|
||||
},
|
||||
{
|
||||
"id": "store.sql_job.get_newest_job_by_status_and_type.app_error",
|
||||
"translation": "We couldn't get the newest job by status and type"
|
||||
},
|
||||
{
|
||||
"id": "store.sql_job.get_count_by_status_and_type.app_erro",
|
||||
"translation": "We couldn't get the job count by status and type"
|
||||
},
|
||||
{
|
||||
"id": "store.sql_license.get.app_error",
|
||||
"translation": "We encountered an error getting the license"
|
||||
|
||||
26
jobs/jobs.go
26
jobs/jobs.go
@@ -141,3 +141,29 @@ func CancellationWatcher(ctx context.Context, jobId string, cancelChan chan inte
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func GenerateNextStartDateTime(now time.Time, nextStartTime time.Time) *time.Time {
|
||||
nextTime := time.Date(now.Year(), now.Month(), now.Day(), nextStartTime.Hour(), nextStartTime.Minute(), 0, 0, time.Local)
|
||||
|
||||
if !now.Before(nextTime) {
|
||||
nextTime = nextTime.AddDate(0, 0, 1)
|
||||
}
|
||||
|
||||
return &nextTime
|
||||
}
|
||||
|
||||
func CheckForPendingJobsByType(jobType string) (bool, *model.AppError) {
|
||||
if result := <-Srv.Store.Job().GetCountByStatusAndType(model.JOB_STATUS_PENDING, jobType); result.Err != nil {
|
||||
return false, result.Err
|
||||
} else {
|
||||
return result.Data.(int64) > 0, nil
|
||||
}
|
||||
}
|
||||
|
||||
func GetLastSuccessfulJobByType(jobType string) (*model.Job, *model.AppError) {
|
||||
if result := <-Srv.Store.Job().GetNewestJobByStatusAndType(model.JOB_STATUS_SUCCESS, jobType); result.Err != nil {
|
||||
return nil, result.Err
|
||||
} else {
|
||||
return result.Data.(*model.Job), nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,106 +5,160 @@ package jobs
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
l4g "github.com/alecthomas/log4go"
|
||||
ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs"
|
||||
|
||||
ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs"
|
||||
"github.com/mattermost/mattermost-server/model"
|
||||
"github.com/mattermost/mattermost-server/utils"
|
||||
)
|
||||
|
||||
type Schedulers struct {
|
||||
startOnce sync.Once
|
||||
stop chan bool
|
||||
stopped chan bool
|
||||
configChanged chan *model.Config
|
||||
listenerId string
|
||||
startOnce sync.Once
|
||||
|
||||
DataRetention model.Scheduler
|
||||
ElasticsearchAggregation model.Scheduler
|
||||
LdapSync model.Scheduler
|
||||
|
||||
listenerId string
|
||||
schedulers []model.Scheduler
|
||||
nextRunTimes []*time.Time
|
||||
}
|
||||
|
||||
func InitSchedulers() *Schedulers {
|
||||
schedulers := &Schedulers{}
|
||||
l4g.Debug("Initialising schedulers.")
|
||||
schedulers := &Schedulers{
|
||||
stop: make(chan bool),
|
||||
stopped: make(chan bool),
|
||||
configChanged: make(chan *model.Config),
|
||||
}
|
||||
|
||||
if dataRetentionInterface := ejobs.GetDataRetentionInterface(); dataRetentionInterface != nil {
|
||||
schedulers.DataRetention = dataRetentionInterface.MakeScheduler()
|
||||
schedulers.schedulers = append(schedulers.schedulers, dataRetentionInterface.MakeScheduler())
|
||||
}
|
||||
|
||||
if elasticsearchAggregatorInterface := ejobs.GetElasticsearchAggregatorInterface(); elasticsearchAggregatorInterface != nil {
|
||||
schedulers.ElasticsearchAggregation = elasticsearchAggregatorInterface.MakeScheduler()
|
||||
schedulers.schedulers = append(schedulers.schedulers, elasticsearchAggregatorInterface.MakeScheduler())
|
||||
}
|
||||
|
||||
if ldaySyncInterface := ejobs.GetLdapSyncInterface(); ldaySyncInterface != nil {
|
||||
schedulers.LdapSync = ldaySyncInterface.MakeScheduler()
|
||||
if ldapSyncInterface := ejobs.GetLdapSyncInterface(); ldapSyncInterface != nil {
|
||||
schedulers.schedulers = append(schedulers.schedulers, ldapSyncInterface.MakeScheduler())
|
||||
}
|
||||
|
||||
schedulers.nextRunTimes = make([]*time.Time, len(schedulers.schedulers))
|
||||
return schedulers
|
||||
}
|
||||
|
||||
func (schedulers *Schedulers) Start() *Schedulers {
|
||||
l4g.Info("Starting schedulers")
|
||||
|
||||
schedulers.startOnce.Do(func() {
|
||||
if schedulers.DataRetention != nil && (*utils.Cfg.DataRetentionSettings.EnableMessageDeletion || *utils.Cfg.DataRetentionSettings.EnableFileDeletion) {
|
||||
go schedulers.DataRetention.Run()
|
||||
}
|
||||
|
||||
if schedulers.ElasticsearchAggregation != nil && *utils.Cfg.ElasticsearchSettings.EnableIndexing {
|
||||
go schedulers.ElasticsearchAggregation.Run()
|
||||
}
|
||||
|
||||
if schedulers.LdapSync != nil && *utils.Cfg.LdapSettings.Enable {
|
||||
go schedulers.LdapSync.Run()
|
||||
}
|
||||
})
|
||||
|
||||
schedulers.listenerId = utils.AddConfigListener(schedulers.handleConfigChange)
|
||||
|
||||
go func() {
|
||||
schedulers.startOnce.Do(func() {
|
||||
l4g.Info("Starting schedulers.")
|
||||
|
||||
defer func() {
|
||||
l4g.Info("Schedulers stopped.")
|
||||
close(schedulers.stopped)
|
||||
}()
|
||||
|
||||
now := time.Now()
|
||||
for idx, scheduler := range schedulers.schedulers {
|
||||
if !scheduler.Enabled(utils.Cfg) {
|
||||
schedulers.nextRunTimes[idx] = nil
|
||||
} else {
|
||||
schedulers.setNextRunTime(utils.Cfg, idx, now, false)
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-schedulers.stop:
|
||||
l4g.Debug("Schedulers received stop signal.")
|
||||
return
|
||||
case now = <-time.After(1 * time.Minute):
|
||||
cfg := utils.Cfg
|
||||
|
||||
for idx, nextTime := range schedulers.nextRunTimes {
|
||||
if nextTime == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if time.Now().After(*nextTime) {
|
||||
scheduler := schedulers.schedulers[idx]
|
||||
if scheduler != nil {
|
||||
if scheduler.Enabled(cfg) {
|
||||
if _, err := schedulers.scheduleJob(cfg, scheduler); err != nil {
|
||||
l4g.Warn("Failed to schedule job with scheduler: %v", scheduler.Name())
|
||||
l4g.Error(err)
|
||||
} else {
|
||||
schedulers.setNextRunTime(cfg, idx, now, true)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
case newCfg := <-schedulers.configChanged:
|
||||
for idx, scheduler := range schedulers.schedulers {
|
||||
if !scheduler.Enabled(newCfg) {
|
||||
schedulers.nextRunTimes[idx] = nil
|
||||
} else {
|
||||
schedulers.setNextRunTime(newCfg, idx, now, false)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}()
|
||||
|
||||
return schedulers
|
||||
}
|
||||
|
||||
func (schedulers *Schedulers) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) {
|
||||
if schedulers.DataRetention != nil {
|
||||
if (!*oldConfig.DataRetentionSettings.EnableMessageDeletion && !*oldConfig.DataRetentionSettings.EnableFileDeletion) && (*newConfig.DataRetentionSettings.EnableMessageDeletion || *newConfig.DataRetentionSettings.EnableFileDeletion) {
|
||||
go schedulers.DataRetention.Run()
|
||||
} else if (*oldConfig.DataRetentionSettings.EnableMessageDeletion || *oldConfig.DataRetentionSettings.EnableFileDeletion) && (!*newConfig.DataRetentionSettings.EnableMessageDeletion && !*newConfig.DataRetentionSettings.EnableFileDeletion) {
|
||||
schedulers.DataRetention.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
if schedulers.ElasticsearchAggregation != nil {
|
||||
if !*oldConfig.ElasticsearchSettings.EnableIndexing && *newConfig.ElasticsearchSettings.EnableIndexing {
|
||||
go schedulers.ElasticsearchAggregation.Run()
|
||||
} else if *oldConfig.ElasticsearchSettings.EnableIndexing && !*newConfig.ElasticsearchSettings.EnableIndexing {
|
||||
schedulers.ElasticsearchAggregation.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
if schedulers.LdapSync != nil {
|
||||
if !*oldConfig.LdapSettings.Enable && *newConfig.LdapSettings.Enable {
|
||||
go schedulers.LdapSync.Run()
|
||||
} else if *oldConfig.LdapSettings.Enable && !*newConfig.LdapSettings.Enable {
|
||||
schedulers.LdapSync.Stop()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (schedulers *Schedulers) Stop() *Schedulers {
|
||||
utils.RemoveConfigListener(schedulers.listenerId)
|
||||
|
||||
if schedulers.DataRetention != nil && (*utils.Cfg.DataRetentionSettings.EnableMessageDeletion || *utils.Cfg.DataRetentionSettings.EnableFileDeletion) {
|
||||
schedulers.DataRetention.Stop()
|
||||
}
|
||||
|
||||
if schedulers.ElasticsearchAggregation != nil && *utils.Cfg.ElasticsearchSettings.EnableIndexing {
|
||||
schedulers.ElasticsearchAggregation.Stop()
|
||||
}
|
||||
|
||||
if schedulers.LdapSync != nil && *utils.Cfg.LdapSettings.Enable {
|
||||
schedulers.LdapSync.Stop()
|
||||
}
|
||||
|
||||
l4g.Info("Stopped schedulers")
|
||||
|
||||
l4g.Info("Stopping schedulers.")
|
||||
close(schedulers.stop)
|
||||
<-schedulers.stopped
|
||||
return schedulers
|
||||
}
|
||||
|
||||
func (schedulers *Schedulers) setNextRunTime(cfg *model.Config, idx int, now time.Time, pendingJobs bool) {
|
||||
scheduler := schedulers.schedulers[idx]
|
||||
|
||||
if !pendingJobs {
|
||||
if pj, err := CheckForPendingJobsByType(scheduler.JobType()); err != nil {
|
||||
l4g.Error("Failed to set next job run time: " + err.Error())
|
||||
schedulers.nextRunTimes[idx] = nil
|
||||
return
|
||||
} else {
|
||||
pendingJobs = pj
|
||||
}
|
||||
}
|
||||
|
||||
lastSuccessfulJob, err := GetLastSuccessfulJobByType(scheduler.JobType())
|
||||
if err != nil {
|
||||
l4g.Error("Failed to set next job run time: " + err.Error())
|
||||
schedulers.nextRunTimes[idx] = nil
|
||||
return
|
||||
}
|
||||
|
||||
schedulers.nextRunTimes[idx] = scheduler.NextScheduleTime(cfg, now, pendingJobs, lastSuccessfulJob)
|
||||
l4g.Debug("Next run time for scheduler %v: %v", scheduler.Name(), schedulers.nextRunTimes[idx])
|
||||
}
|
||||
|
||||
func (schedulers *Schedulers) scheduleJob(cfg *model.Config, scheduler model.Scheduler) (*model.Job, *model.AppError) {
|
||||
pendingJobs, err := CheckForPendingJobsByType(scheduler.JobType())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
lastSuccessfulJob, err2 := GetLastSuccessfulJobByType(scheduler.JobType())
|
||||
if err2 != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return scheduler.ScheduleJob(cfg, pendingJobs, lastSuccessfulJob)
|
||||
}
|
||||
|
||||
func (schedulers *Schedulers) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) {
|
||||
l4g.Debug("Schedulers received config change.")
|
||||
schedulers.configChanged <- newConfig
|
||||
}
|
||||
|
||||
@@ -1,58 +0,0 @@
|
||||
// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
|
||||
// See License.txt for license information.
|
||||
|
||||
package jobs
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
l4g "github.com/alecthomas/log4go"
|
||||
)
|
||||
|
||||
type TestScheduler struct {
|
||||
name string
|
||||
jobType string
|
||||
stop chan bool
|
||||
stopped chan bool
|
||||
}
|
||||
|
||||
func MakeTestScheduler(name string, jobType string) *TestScheduler {
|
||||
return &TestScheduler{
|
||||
name: name,
|
||||
jobType: jobType,
|
||||
stop: make(chan bool, 1),
|
||||
stopped: make(chan bool, 1),
|
||||
}
|
||||
}
|
||||
|
||||
func (scheduler *TestScheduler) Run() {
|
||||
l4g.Debug("Scheduler %v: Started", scheduler.name)
|
||||
|
||||
defer func() {
|
||||
l4g.Debug("Scheduler %v: Finished", scheduler.name)
|
||||
scheduler.stopped <- true
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-scheduler.stop:
|
||||
l4g.Debug("Scheduler %v: Received stop signal", scheduler.name)
|
||||
return
|
||||
case <-time.After(86400 * time.Second):
|
||||
l4g.Debug("Scheduler: %v: Scheduling new job", scheduler.name)
|
||||
scheduler.AddJob()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (scheduler *TestScheduler) AddJob() {
|
||||
if _, err := CreateJob(scheduler.jobType, nil); err != nil {
|
||||
l4g.Error("Scheduler %v: failed to create job: %v", scheduler.name, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (scheduler *TestScheduler) Stop() {
|
||||
l4g.Debug("Scheduler %v: Stopping", scheduler.name)
|
||||
scheduler.stop <- true
|
||||
<-scheduler.stopped
|
||||
}
|
||||
@@ -1859,7 +1859,7 @@ func (o *Config) IsValid() *AppError {
|
||||
return NewAppError("Config.IsValid", "model.config.is_valid.elastic_search.aggregate_posts_after_days.app_error", nil, "", http.StatusBadRequest)
|
||||
}
|
||||
|
||||
if _, err := time.Parse("03:04", *o.ElasticsearchSettings.PostsAggregatorJobStartTime); err != nil {
|
||||
if _, err := time.Parse("15:04", *o.ElasticsearchSettings.PostsAggregatorJobStartTime); err != nil {
|
||||
return NewAppError("Config.IsValid", "model.config.is_valid.elastic_search.posts_aggregator_job_start_time.app_error", nil, err.Error(), http.StatusBadRequest)
|
||||
}
|
||||
|
||||
@@ -1871,7 +1871,7 @@ func (o *Config) IsValid() *AppError {
|
||||
return NewAppError("Config.IsValid", "model.config.is_valid.data_retention.file_retention_days_too_low.app_error", nil, "", http.StatusBadRequest)
|
||||
}
|
||||
|
||||
if _, err := time.Parse("03:04", *o.DataRetentionSettings.DeletionJobStartTime); err != nil {
|
||||
if _, err := time.Parse("15:04", *o.DataRetentionSettings.DeletionJobStartTime); err != nil {
|
||||
return NewAppError("Config.IsValid", "model.config.is_valid.data_retention.deletion_job_start_time.app_error", nil, err.Error(), http.StatusBadRequest)
|
||||
}
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -116,6 +117,9 @@ type Worker interface {
|
||||
}
|
||||
|
||||
type Scheduler interface {
|
||||
Run()
|
||||
Stop()
|
||||
Name() string
|
||||
JobType() string
|
||||
Enabled(cfg *Config) bool
|
||||
NextScheduleTime(cfg *Config, now time.Time, pendingJobs bool, lastSuccessfulJob *Job) *time.Time
|
||||
ScheduleJob(cfg *Config, pendingJobs bool, lastSuccessfulJob *Job) (*Job, *AppError)
|
||||
}
|
||||
|
||||
@@ -325,6 +325,64 @@ func (jss SqlJobStore) GetAllByStatus(status string) store.StoreChannel {
|
||||
return storeChannel
|
||||
}
|
||||
|
||||
func (jss SqlJobStore) GetNewestJobByStatusAndType(status string, jobType string) store.StoreChannel {
|
||||
storeChannel := make(store.StoreChannel, 1)
|
||||
|
||||
go func() {
|
||||
result := store.StoreResult{}
|
||||
|
||||
var job *model.Job
|
||||
|
||||
if err := jss.GetReplica().SelectOne(&job,
|
||||
`SELECT
|
||||
*
|
||||
FROM
|
||||
Jobs
|
||||
WHERE
|
||||
Status = :Status
|
||||
AND
|
||||
Type = :Type
|
||||
ORDER BY
|
||||
CreateAt DESC
|
||||
LIMIT 1`, map[string]interface{}{"Status": status, "Type": jobType}); err != nil {
|
||||
result.Err = model.NewAppError("SqlJobStore.GetAllByStatus", "store.sql_job.get_newest_job_by_status_and_type.app_error", nil, "Status="+status+", "+err.Error(), http.StatusInternalServerError)
|
||||
} else {
|
||||
result.Data = job
|
||||
}
|
||||
|
||||
storeChannel <- result
|
||||
close(storeChannel)
|
||||
}()
|
||||
|
||||
return storeChannel
|
||||
}
|
||||
|
||||
func (jss SqlJobStore) GetCountByStatusAndType(status string, jobType string) store.StoreChannel {
|
||||
storeChannel := make(store.StoreChannel, 1)
|
||||
|
||||
go func() {
|
||||
result := store.StoreResult{}
|
||||
|
||||
if count, err := jss.GetReplica().SelectInt(`SELECT
|
||||
COUNT(*)
|
||||
FROM
|
||||
Jobs
|
||||
WHERE
|
||||
Status = :Status
|
||||
AND
|
||||
Type = :Type`, map[string]interface{}{"Status": status, "Type": jobType}); err != nil {
|
||||
result.Err = model.NewAppError("SqlJobStore.GetCountByStatusAndType", "store.sql_job.get_count_by_status_and_type.app_error", nil, "Status="+status+", "+err.Error(), http.StatusInternalServerError)
|
||||
} else {
|
||||
result.Data = count
|
||||
}
|
||||
|
||||
storeChannel <- result
|
||||
close(storeChannel)
|
||||
}()
|
||||
|
||||
return storeChannel
|
||||
}
|
||||
|
||||
func (jss SqlJobStore) Delete(id string) store.StoreChannel {
|
||||
storeChannel := make(store.StoreChannel, 1)
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
|
||||
"github.com/mattermost/mattermost-server/model"
|
||||
"github.com/mattermost/mattermost-server/store"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestJobSaveGet(t *testing.T) {
|
||||
@@ -231,6 +232,108 @@ func TestJobGetAllByStatus(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestJobStoreGetNewestJobByStatusAndType(t *testing.T) {
|
||||
ss := Setup()
|
||||
|
||||
jobType1 := model.NewId()
|
||||
jobType2 := model.NewId()
|
||||
status1 := model.NewId()
|
||||
status2 := model.NewId()
|
||||
|
||||
jobs := []*model.Job{
|
||||
{
|
||||
Id: model.NewId(),
|
||||
Type: jobType1,
|
||||
CreateAt: 1001,
|
||||
Status: status1,
|
||||
},
|
||||
{
|
||||
Id: model.NewId(),
|
||||
Type: jobType1,
|
||||
CreateAt: 1000,
|
||||
Status: status1,
|
||||
},
|
||||
{
|
||||
Id: model.NewId(),
|
||||
Type: jobType2,
|
||||
CreateAt: 1003,
|
||||
Status: status1,
|
||||
},
|
||||
{
|
||||
Id: model.NewId(),
|
||||
Type: jobType1,
|
||||
CreateAt: 1004,
|
||||
Status: status2,
|
||||
},
|
||||
}
|
||||
|
||||
for _, job := range jobs {
|
||||
store.Must(ss.Job().Save(job))
|
||||
defer ss.Job().Delete(job.Id)
|
||||
}
|
||||
|
||||
result := <-ss.Job().GetNewestJobByStatusAndType(status1, jobType1)
|
||||
assert.Nil(t, result.Err)
|
||||
assert.EqualValues(t, jobs[0].Id, result.Data.(*model.Job).Id)
|
||||
}
|
||||
|
||||
func TestJobStoreGetCountByStatusAndType(t *testing.T) {
|
||||
ss := Setup()
|
||||
|
||||
jobType1 := model.NewId()
|
||||
jobType2 := model.NewId()
|
||||
status1 := model.NewId()
|
||||
status2 := model.NewId()
|
||||
|
||||
jobs := []*model.Job{
|
||||
{
|
||||
Id: model.NewId(),
|
||||
Type: jobType1,
|
||||
CreateAt: 1000,
|
||||
Status: status1,
|
||||
},
|
||||
{
|
||||
Id: model.NewId(),
|
||||
Type: jobType1,
|
||||
CreateAt: 999,
|
||||
Status: status1,
|
||||
},
|
||||
{
|
||||
Id: model.NewId(),
|
||||
Type: jobType2,
|
||||
CreateAt: 1001,
|
||||
Status: status1,
|
||||
},
|
||||
{
|
||||
Id: model.NewId(),
|
||||
Type: jobType1,
|
||||
CreateAt: 1002,
|
||||
Status: status2,
|
||||
},
|
||||
}
|
||||
|
||||
for _, job := range jobs {
|
||||
store.Must(ss.Job().Save(job))
|
||||
defer ss.Job().Delete(job.Id)
|
||||
}
|
||||
|
||||
result := <-ss.Job().GetCountByStatusAndType(status1, jobType1)
|
||||
assert.Nil(t, result.Err)
|
||||
assert.EqualValues(t, 2, result.Data.(int64))
|
||||
|
||||
result = <-ss.Job().GetCountByStatusAndType(status2, jobType2)
|
||||
assert.Nil(t, result.Err)
|
||||
assert.EqualValues(t, 0, result.Data.(int64))
|
||||
|
||||
result = <-ss.Job().GetCountByStatusAndType(status1, jobType2)
|
||||
assert.Nil(t, result.Err)
|
||||
assert.EqualValues(t, 1, result.Data.(int64))
|
||||
|
||||
result = <-ss.Job().GetCountByStatusAndType(status2, jobType1)
|
||||
assert.Nil(t, result.Err)
|
||||
assert.EqualValues(t, 1, result.Data.(int64))
|
||||
}
|
||||
|
||||
func TestJobUpdateOptimistically(t *testing.T) {
|
||||
ss := Setup()
|
||||
|
||||
|
||||
@@ -411,6 +411,8 @@ type JobStore interface {
|
||||
GetAllByType(jobType string) StoreChannel
|
||||
GetAllByTypePage(jobType string, offset int, limit int) StoreChannel
|
||||
GetAllByStatus(status string) StoreChannel
|
||||
GetNewestJobByStatusAndType(status string, jobType string) StoreChannel
|
||||
GetCountByStatusAndType(status string, jobType string) StoreChannel
|
||||
Delete(id string) StoreChannel
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user