feat(jobs): no concurrent runs (#570)

This commit is contained in:
Julien Fontanet 2017-06-08 09:43:19 +02:00 committed by GitHub
parent 05e75c9a26
commit 51e09ecfcc

View File

@ -1,20 +1,22 @@
import assign from 'lodash/assign' import { assign } from 'lodash'
import { lastly } from 'promise-toolbox'
import { noSuchObject } from 'xo-common/api-errors'
import JobExecutor from '../job-executor' import JobExecutor from '../job-executor'
import { Jobs } from '../models/job' import { Jobs as JobsDb } from '../models/job'
import { mapToArray } from '../utils' import { mapToArray } from '../utils'
import { noSuchObject } from 'xo-common/api-errors'
// =================================================================== // ===================================================================
export default class { export default class Jobs {
constructor (xo) { constructor (xo) {
this._executor = new JobExecutor(xo) this._executor = new JobExecutor(xo)
const jobsDb = this._jobs = new Jobs({ const jobsDb = this._jobs = new JobsDb({
connection: xo._redis, connection: xo._redis,
prefix: 'xo:job', prefix: 'xo:job',
indexes: ['user_id', 'key'] indexes: ['user_id', 'key']
}) })
this._runningJobs = Object.create(null)
xo.on('start', () => { xo.on('start', () => {
xo.addConfigManager('jobs', xo.addConfigManager('jobs',
@ -61,11 +63,23 @@ export default class {
return /* await */ this._jobs.remove(id) return /* await */ this._jobs.remove(id)
} }
_runJob (job) {
const { id } = job
const runningJobs = this._runningJobs
if (runningJobs[id]) {
throw new Error(`job ${id} is already running`)
}
runningJobs[id] = true
return this._executor.exec(job)::lastly(() => {
delete runningJobs[id]
})
}
async runJobSequence (idSequence) { async runJobSequence (idSequence) {
const jobs = await Promise.all(mapToArray(idSequence, id => this.getJob(id))) const jobs = await Promise.all(mapToArray(idSequence, id => this.getJob(id)))
for (const job of jobs) { for (const job of jobs) {
await this._executor.exec(job) await this._runJob(job)
} }
} }
} }