diff --git a/src/xo-mixins/jobs.js b/src/xo-mixins/jobs.js index ab31e47aa..6845710cf 100644 --- a/src/xo-mixins/jobs.js +++ b/src/xo-mixins/jobs.js @@ -1,20 +1,22 @@ -import assign from 'lodash/assign' +import { assign } from 'lodash' +import { lastly } from 'promise-toolbox' +import { noSuchObject } from 'xo-common/api-errors' import JobExecutor from '../job-executor' -import { Jobs } from '../models/job' +import { Jobs as JobsDb } from '../models/job' import { mapToArray } from '../utils' -import { noSuchObject } from 'xo-common/api-errors' // =================================================================== -export default class { +export default class Jobs { constructor (xo) { this._executor = new JobExecutor(xo) - const jobsDb = this._jobs = new Jobs({ + const jobsDb = this._jobs = new JobsDb({ connection: xo._redis, prefix: 'xo:job', indexes: ['user_id', 'key'] }) + this._runningJobs = Object.create(null) xo.on('start', () => { xo.addConfigManager('jobs', @@ -61,11 +63,23 @@ export default class { return /* await */ this._jobs.remove(id) } + _runJob (job) { + const { id } = job + const runningJobs = this._runningJobs + if (runningJobs[id]) { + throw new Error(`job ${id} is already running`) + } + runningJobs[id] = true + return this._executor.exec(job)::lastly(() => { + delete runningJobs[id] + }) + } + async runJobSequence (idSequence) { const jobs = await Promise.all(mapToArray(idSequence, id => this.getJob(id))) for (const job of jobs) { - await this._executor.exec(job) + await this._runJob(job) } } }