mirror of
https://github.com/mattermost/mattermost.git
synced 2025-02-25 18:55:24 -06:00
* MM-9728: Online migration for advanced permissions phase 2 * Add unit tests for new store functions. * Move migration specific code to own file. * Add migration state function test. * Style fixes. * Add i18n strings. * Fix mocks. * Add TestMain to migrations package tests. * Fix typo. * Fix review comments. * Fix up the "Check if migration is done" check to actually work.
174 lines
4.9 KiB
Go
174 lines
4.9 KiB
Go
// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
|
|
// See License.txt for license information.
|
|
|
|
package jobs
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/mattermost/mattermost-server/mlog"
|
|
"github.com/mattermost/mattermost-server/model"
|
|
)
|
|
|
|
type Schedulers struct {
|
|
stop chan bool
|
|
stopped chan bool
|
|
configChanged chan *model.Config
|
|
listenerId string
|
|
startOnce sync.Once
|
|
jobs *JobServer
|
|
|
|
schedulers []model.Scheduler
|
|
nextRunTimes []*time.Time
|
|
}
|
|
|
|
func (srv *JobServer) InitSchedulers() *Schedulers {
|
|
mlog.Debug("Initialising schedulers.")
|
|
|
|
schedulers := &Schedulers{
|
|
stop: make(chan bool),
|
|
stopped: make(chan bool),
|
|
configChanged: make(chan *model.Config),
|
|
jobs: srv,
|
|
}
|
|
|
|
if srv.DataRetentionJob != nil {
|
|
schedulers.schedulers = append(schedulers.schedulers, srv.DataRetentionJob.MakeScheduler())
|
|
}
|
|
|
|
if srv.MessageExportJob != nil {
|
|
schedulers.schedulers = append(schedulers.schedulers, srv.MessageExportJob.MakeScheduler())
|
|
}
|
|
|
|
if elasticsearchAggregatorInterface := srv.ElasticsearchAggregator; elasticsearchAggregatorInterface != nil {
|
|
schedulers.schedulers = append(schedulers.schedulers, elasticsearchAggregatorInterface.MakeScheduler())
|
|
}
|
|
|
|
if ldapSyncInterface := srv.LdapSync; ldapSyncInterface != nil {
|
|
schedulers.schedulers = append(schedulers.schedulers, ldapSyncInterface.MakeScheduler())
|
|
}
|
|
|
|
if migrationsInterface := srv.Migrations; migrationsInterface != nil {
|
|
schedulers.schedulers = append(schedulers.schedulers, migrationsInterface.MakeScheduler())
|
|
}
|
|
|
|
schedulers.nextRunTimes = make([]*time.Time, len(schedulers.schedulers))
|
|
return schedulers
|
|
}
|
|
|
|
func (schedulers *Schedulers) Start() *Schedulers {
|
|
schedulers.listenerId = schedulers.jobs.ConfigService.AddConfigListener(schedulers.handleConfigChange)
|
|
|
|
go func() {
|
|
schedulers.startOnce.Do(func() {
|
|
mlog.Info("Starting schedulers.")
|
|
|
|
defer func() {
|
|
mlog.Info("Schedulers stopped.")
|
|
close(schedulers.stopped)
|
|
}()
|
|
|
|
now := time.Now()
|
|
for idx, scheduler := range schedulers.schedulers {
|
|
if !scheduler.Enabled(schedulers.jobs.Config()) {
|
|
schedulers.nextRunTimes[idx] = nil
|
|
} else {
|
|
schedulers.setNextRunTime(schedulers.jobs.Config(), idx, now, false)
|
|
}
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-schedulers.stop:
|
|
mlog.Debug("Schedulers received stop signal.")
|
|
return
|
|
case now = <-time.After(1 * time.Minute):
|
|
cfg := schedulers.jobs.Config()
|
|
|
|
for idx, nextTime := range schedulers.nextRunTimes {
|
|
if nextTime == nil {
|
|
continue
|
|
}
|
|
|
|
if time.Now().After(*nextTime) {
|
|
scheduler := schedulers.schedulers[idx]
|
|
if scheduler != nil {
|
|
if scheduler.Enabled(cfg) {
|
|
if _, err := schedulers.scheduleJob(cfg, scheduler); err != nil {
|
|
mlog.Warn(fmt.Sprintf("Failed to schedule job with scheduler: %v", scheduler.Name()))
|
|
mlog.Error(fmt.Sprint(err))
|
|
} else {
|
|
schedulers.setNextRunTime(cfg, idx, now, true)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
case newCfg := <-schedulers.configChanged:
|
|
for idx, scheduler := range schedulers.schedulers {
|
|
if !scheduler.Enabled(newCfg) {
|
|
schedulers.nextRunTimes[idx] = nil
|
|
} else {
|
|
schedulers.setNextRunTime(newCfg, idx, now, false)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
})
|
|
}()
|
|
|
|
return schedulers
|
|
}
|
|
|
|
func (schedulers *Schedulers) Stop() *Schedulers {
|
|
mlog.Info("Stopping schedulers.")
|
|
close(schedulers.stop)
|
|
<-schedulers.stopped
|
|
return schedulers
|
|
}
|
|
|
|
func (schedulers *Schedulers) setNextRunTime(cfg *model.Config, idx int, now time.Time, pendingJobs bool) {
|
|
scheduler := schedulers.schedulers[idx]
|
|
|
|
if !pendingJobs {
|
|
if pj, err := schedulers.jobs.CheckForPendingJobsByType(scheduler.JobType()); err != nil {
|
|
mlog.Error("Failed to set next job run time: " + err.Error())
|
|
schedulers.nextRunTimes[idx] = nil
|
|
return
|
|
} else {
|
|
pendingJobs = pj
|
|
}
|
|
}
|
|
|
|
lastSuccessfulJob, err := schedulers.jobs.GetLastSuccessfulJobByType(scheduler.JobType())
|
|
if err != nil {
|
|
mlog.Error("Failed to set next job run time: " + err.Error())
|
|
schedulers.nextRunTimes[idx] = nil
|
|
return
|
|
}
|
|
|
|
schedulers.nextRunTimes[idx] = scheduler.NextScheduleTime(cfg, now, pendingJobs, lastSuccessfulJob)
|
|
mlog.Debug(fmt.Sprintf("Next run time for scheduler %v: %v", scheduler.Name(), schedulers.nextRunTimes[idx]))
|
|
}
|
|
|
|
func (schedulers *Schedulers) scheduleJob(cfg *model.Config, scheduler model.Scheduler) (*model.Job, *model.AppError) {
|
|
pendingJobs, err := schedulers.jobs.CheckForPendingJobsByType(scheduler.JobType())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
lastSuccessfulJob, err2 := schedulers.jobs.GetLastSuccessfulJobByType(scheduler.JobType())
|
|
if err2 != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return scheduler.ScheduleJob(cfg, pendingJobs, lastSuccessfulJob)
|
|
}
|
|
|
|
func (schedulers *Schedulers) handleConfigChange(oldConfig *model.Config, newConfig *model.Config) {
|
|
mlog.Debug("Schedulers received config change.")
|
|
schedulers.configChanged <- newConfig
|
|
}
|