From 4257cbb6184687fd38969a45e836b7f977072f7a Mon Sep 17 00:00:00 2001 From: Julien Fontanet Date: Wed, 28 Feb 2018 16:22:41 +0100 Subject: [PATCH] chore(xo-server): improve jobs code (#2696) - add type filtering (default to `call`) - support passing extra params to the call - Flow typing --- packages/xo-server/src/job-executor.js | 7 +- packages/xo-server/src/xo-mixins/jobs.js | 98 +++++++++++++++++++----- 2 files changed, 82 insertions(+), 23 deletions(-) diff --git a/packages/xo-server/src/job-executor.js b/packages/xo-server/src/job-executor.js index 1f2ab8c7e..8efeb7e70 100644 --- a/packages/xo-server/src/job-executor.js +++ b/packages/xo-server/src/job-executor.js @@ -70,7 +70,7 @@ export default class JobExecutor { ) } - async exec (job, onStart) { + async exec (job, onStart, extraParams) { const runJobId = this._logger.notice(`Starting execution of ${job.id}.`, { event: 'job.start', userId: job.userId, @@ -83,7 +83,7 @@ export default class JobExecutor { try { if (job.type === 'call') { - const execStatus = await this._execCall(job, runJobId) + const execStatus = await this._execCall(job, runJobId, extraParams) this.xo.emit('job:terminated', execStatus) } else { @@ -105,7 +105,7 @@ export default class JobExecutor { } } - async _execCall (job, runJobId) { + async _execCall (job, runJobId, extraParams) { const { paramsVector } = job const paramsFlatVector = paramsVector ? resolveParamsVector.call(this, paramsVector) @@ -125,6 +125,7 @@ export default class JobExecutor { } await asyncMap(paramsFlatVector, params => { + Object.assign(params, extraParams) const runCallId = this._logger.notice( `Starting ${job.method} call. (${job.id})`, { diff --git a/packages/xo-server/src/xo-mixins/jobs.js b/packages/xo-server/src/xo-mixins/jobs.js index af5ffb3a0..1819c5550 100644 --- a/packages/xo-server/src/xo-mixins/jobs.js +++ b/packages/xo-server/src/xo-mixins/jobs.js @@ -1,5 +1,11 @@ +// @flow + +import type { Pattern } from 'value-matcher' + +// $FlowFixMe import { assign } from 'lodash' -import { lastly } from 'promise-toolbox' +// $FlowFixMe +import { finally as pFinally } from 'promise-toolbox' import { noSuchObject } from 'xo-common/api-errors' import JobExecutor from '../job-executor' @@ -8,8 +14,50 @@ import { mapToArray } from '../utils' // =================================================================== +type ParamsVector = + | {| + items: Array, + type: 'crossProduct' + |} + | {| + mapping: Object, + type: 'extractProperties', + value: Object + |} + | {| + pattern: Pattern, + type: 'fetchObjects' + |} + | {| + collection: Object, + iteratee: Function, + paramName?: string, + type: 'map' + |} + | {| + type: 'set', + values: any + |} + +export type Job = { + id: string, + name: string, + userId: string +} + +export type CallJob = Job & {| + method: string, + paramsVector: ParamsVector, + timeout?: number, + type: 'call' +|} + export default class Jobs { - constructor (xo) { + _executor: JobExecutor + _jobs: JobsDb + _runningJobs: { __proto__: null, [string]: boolean } + + constructor (xo: any) { this._executor = new JobExecutor(xo) const jobsDb = (this._jobs = new JobsDb({ connection: xo._redis, @@ -29,31 +77,35 @@ export default class Jobs { }) } - async getAllJobs () { + async getAllJobs (type: string = 'call'): Promise> { const jobs = await this._jobs.get() const runningJobs = this._runningJobs + const result = [] jobs.forEach(job => { - job.runId = runningJobs[job.id] + if (job.type === type) { + job.runId = runningJobs[job.id] + result.push(job) + } }) - return jobs + return result } - async getJob (id) { + async getJob (id: string, type: string = 'call'): Promise { const job = await this._jobs.first(id) - if (!job) { + if (job === null || job.type !== type) { throw noSuchObject(id, 'job') } return job.properties } - async createJob (job) { + async createJob (job: $Diff): Promise { // TODO: use plain objects const job_ = await this._jobs.create(job) return job_.properties } - async updateJob ({ id, ...props }) { + async updateJob ({ id, ...props }: $Shape) { const job = await this.getJob(id) assign(job, props) @@ -64,32 +116,38 @@ export default class Jobs { return /* await */ this._jobs.save(job) } - async removeJob (id) { + async removeJob (id: string) { return /* await */ this._jobs.remove(id) } - _runJob (job) { + _runJob (job: Job, extraParams: {}) { const { id } = job const runningJobs = this._runningJobs - if (runningJobs[id]) { + if (id in runningJobs) { throw new Error(`job ${id} is already running`) } - return this._executor - .exec(job, runJobId => { - runningJobs[id] = runJobId - }) - ::lastly(() => { + runningJobs[id] = true + return pFinally.call( + this._executor.exec( + job, + runJobId => { + runningJobs[id] = runJobId + }, + extraParams + ), + () => { delete runningJobs[id] - }) + } + ) } - async runJobSequence (idSequence) { + async runJobSequence (idSequence: Array, extraParams: {}) { const jobs = await Promise.all( mapToArray(idSequence, id => this.getJob(id)) ) for (const job of jobs) { - await this._runJob(job) + await this._runJob(job, extraParams) } } }