diff --git a/packages/xo-server/src/job-executor.js b/packages/xo-server/src/job-executor.js deleted file mode 100644 index 8efeb7e70..000000000 --- a/packages/xo-server/src/job-executor.js +++ /dev/null @@ -1,190 +0,0 @@ -import { BaseError } from 'make-error' -import { createPredicate } from 'value-matcher' -import { timeout } from 'promise-toolbox' -import { assign, filter, find, isEmpty, map, mapValues } from 'lodash' - -import { crossProduct } from './math' -import { asyncMap, serializeError, thunkToArray } from './utils' - -export class JobExecutorError extends BaseError {} -export class UnsupportedJobType extends JobExecutorError { - constructor (job) { - super('Unknown job type: ' + job.type) - } -} -export class UnsupportedVectorType extends JobExecutorError { - constructor (vector) { - super('Unknown vector type: ' + vector.type) - } -} - -// =================================================================== - -const paramsVectorActionsMap = { - extractProperties ({ mapping, value }) { - return mapValues(mapping, key => value[key]) - }, - crossProduct ({ items }) { - return thunkToArray( - crossProduct(map(items, value => resolveParamsVector.call(this, value))) - ) - }, - fetchObjects ({ pattern }) { - const objects = filter(this.xo.getObjects(), createPredicate(pattern)) - if (isEmpty(objects)) { - throw new Error('no objects match this pattern') - } - return objects - }, - map ({ collection, iteratee, paramName = 'value' }) { - return map(resolveParamsVector.call(this, collection), value => { - return resolveParamsVector.call(this, { - ...iteratee, - [paramName]: value, - }) - }) - }, - set: ({ values }) => values, -} - -export function resolveParamsVector (paramsVector) { - const visitor = paramsVectorActionsMap[paramsVector.type] - if (!visitor) { - throw new Error(`Unsupported function '${paramsVector.type}'.`) - } - - return visitor.call(this, paramsVector) -} - -// =================================================================== - -export default class JobExecutor { - constructor (xo) { - this.xo = xo - - // The logger is not available until Xo has started. - xo.on('start', () => - xo.getLogger('jobs').then(logger => { - this._logger = logger - }) - ) - } - - async exec (job, onStart, extraParams) { - const runJobId = this._logger.notice(`Starting execution of ${job.id}.`, { - event: 'job.start', - userId: job.userId, - jobId: job.id, - key: job.key, - }) - if (onStart !== undefined) { - onStart(runJobId) - } - - try { - if (job.type === 'call') { - const execStatus = await this._execCall(job, runJobId, extraParams) - - this.xo.emit('job:terminated', execStatus) - } else { - throw new UnsupportedJobType(job) - } - - this._logger.notice(`Execution terminated for ${job.id}.`, { - event: 'job.end', - runJobId, - }) - } catch (error) { - this._logger.error(`The execution of ${job.id} has failed.`, { - event: 'job.end', - runJobId, - error: serializeError(error), - }) - - throw error - } - } - - async _execCall (job, runJobId, extraParams) { - const { paramsVector } = job - const paramsFlatVector = paramsVector - ? resolveParamsVector.call(this, paramsVector) - : [{}] // One call with no parameters - - const connection = this.xo.createUserConnection() - - connection.set('user_id', job.userId) - - const schedule = find(await this.xo.getAllSchedules(), { jobId: job.id }) - - const execStatus = { - calls: {}, - runJobId, - start: Date.now(), - timezone: schedule !== undefined ? schedule.timezone : undefined, - } - - await asyncMap(paramsFlatVector, params => { - Object.assign(params, extraParams) - const runCallId = this._logger.notice( - `Starting ${job.method} call. (${job.id})`, - { - event: 'jobCall.start', - runJobId, - method: job.method, - params, - } - ) - - const call = (execStatus.calls[runCallId] = { - method: job.method, - params, - start: Date.now(), - }) - let promise = this.xo.callApiMethod( - connection, - job.method, - assign({}, params) - ) - if (job.timeout) { - promise = promise::timeout(job.timeout) - } - - return promise.then( - value => { - this._logger.notice( - `Call ${job.method} (${runCallId}) is a success. (${job.id})`, - { - event: 'jobCall.end', - runJobId, - runCallId, - returnedValue: value, - } - ) - - call.returnedValue = value - call.end = Date.now() - }, - reason => { - this._logger.notice( - `Call ${job.method} (${runCallId}) has failed. (${job.id})`, - { - event: 'jobCall.end', - runJobId, - runCallId, - error: serializeError(reason), - } - ) - - call.error = reason - call.end = Date.now() - } - ) - }) - - connection.close() - execStatus.end = Date.now() - - return execStatus - } -} diff --git a/packages/xo-server/src/xo-mixins/jobs/execute-call.js b/packages/xo-server/src/xo-mixins/jobs/execute-call.js new file mode 100644 index 000000000..94e3c9fed --- /dev/null +++ b/packages/xo-server/src/xo-mixins/jobs/execute-call.js @@ -0,0 +1,127 @@ +import { createPredicate } from 'value-matcher' +import { timeout } from 'promise-toolbox' +import { assign, filter, find, isEmpty, map, mapValues } from 'lodash' + +import { crossProduct } from '../../math' +import { asyncMap, serializeError, thunkToArray } from '../../utils' + +// =================================================================== + +const paramsVectorActionsMap = { + extractProperties ({ mapping, value }) { + return mapValues(mapping, key => value[key]) + }, + crossProduct ({ items }) { + return thunkToArray( + crossProduct(map(items, value => resolveParamsVector.call(this, value))) + ) + }, + fetchObjects ({ pattern }) { + const objects = filter(this.getObjects(), createPredicate(pattern)) + if (isEmpty(objects)) { + throw new Error('no objects match this pattern') + } + return objects + }, + map ({ collection, iteratee, paramName = 'value' }) { + return map(resolveParamsVector.call(this, collection), value => { + return resolveParamsVector.call(this, { + ...iteratee, + [paramName]: value, + }) + }) + }, + set: ({ values }) => values, +} + +export function resolveParamsVector (paramsVector) { + const visitor = paramsVectorActionsMap[paramsVector.type] + if (!visitor) { + throw new Error(`Unsupported function '${paramsVector.type}'.`) + } + + return visitor.call(this, paramsVector) +} + +// =================================================================== + +export default async function executeJobCall ({ + app, + data, + job, + logger, + runJobId, + session, +}) { + const { paramsVector } = job + const paramsFlatVector = paramsVector + ? resolveParamsVector.call(app, paramsVector) + : [{}] // One call with no parameters + + const schedule = find(await app.getAllSchedules(), { jobId: job.id }) + + const execStatus = { + calls: {}, + runJobId, + start: Date.now(), + timezone: schedule !== undefined ? schedule.timezone : undefined, + } + + await asyncMap(paramsFlatVector, params => { + Object.assign(params, data) + const runCallId = logger.notice( + `Starting ${job.method} call. (${job.id})`, + { + event: 'jobCall.start', + runJobId, + method: job.method, + params, + } + ) + + const call = (execStatus.calls[runCallId] = { + method: job.method, + params, + start: Date.now(), + }) + let promise = app.callApiMethod(session, job.method, assign({}, params)) + if (job.timeout) { + promise = promise::timeout(job.timeout) + } + + return promise.then( + value => { + logger.notice( + `Call ${job.method} (${runCallId}) is a success. (${job.id})`, + { + event: 'jobCall.end', + runJobId, + runCallId, + returnedValue: value, + } + ) + + call.returnedValue = value + call.end = Date.now() + }, + reason => { + logger.notice( + `Call ${job.method} (${runCallId}) has failed. (${job.id})`, + { + event: 'jobCall.end', + runJobId, + runCallId, + error: serializeError(reason), + } + ) + + call.error = reason + call.end = Date.now() + } + ) + }) + + execStatus.end = Date.now() + + return execStatus +} diff --git a/packages/xo-server/src/job-executor.spec.js b/packages/xo-server/src/xo-mixins/jobs/execute-call.spec.js similarity index 70% rename from packages/xo-server/src/job-executor.spec.js rename to packages/xo-server/src/xo-mixins/jobs/execute-call.spec.js index f879d0a2a..742a94249 100644 --- a/packages/xo-server/src/job-executor.spec.js +++ b/packages/xo-server/src/xo-mixins/jobs/execute-call.spec.js @@ -1,7 +1,7 @@ /* eslint-env jest */ import { forEach } from 'lodash' -import { resolveParamsVector } from './job-executor' +import { resolveParamsVector } from './execute-call' describe('resolveParamsVector', function () { forEach( @@ -68,37 +68,35 @@ describe('resolveParamsVector', function () { // Context. { - xo: { - getObjects: function () { - return [ - { - id: 'vm:1', - $pool: 'pool:1', - tags: [], - type: 'VM', - power_state: 'Halted', - }, - { - id: 'vm:2', - $pool: 'pool:1', - tags: ['foo'], - type: 'VM', - power_state: 'Running', - }, - { - id: 'host:1', - type: 'host', - power_state: 'Running', - }, - { - id: 'vm:3', - $pool: 'pool:8', - tags: ['foo'], - type: 'VM', - power_state: 'Halted', - }, - ] - }, + getObjects: function () { + return [ + { + id: 'vm:1', + $pool: 'pool:1', + tags: [], + type: 'VM', + power_state: 'Halted', + }, + { + id: 'vm:2', + $pool: 'pool:1', + tags: ['foo'], + type: 'VM', + power_state: 'Running', + }, + { + id: 'host:1', + type: 'host', + power_state: 'Running', + }, + { + id: 'vm:3', + $pool: 'pool:8', + tags: ['foo'], + type: 'VM', + power_state: 'Halted', + }, + ] }, }, ], diff --git a/packages/xo-server/src/xo-mixins/jobs.js b/packages/xo-server/src/xo-mixins/jobs/index.js similarity index 53% rename from packages/xo-server/src/xo-mixins/jobs.js rename to packages/xo-server/src/xo-mixins/jobs/index.js index 1819c5550..da413307b 100644 --- a/packages/xo-server/src/xo-mixins/jobs.js +++ b/packages/xo-server/src/xo-mixins/jobs/index.js @@ -5,12 +5,15 @@ import type { Pattern } from 'value-matcher' // $FlowFixMe import { assign } from 'lodash' // $FlowFixMe -import { finally as pFinally } from 'promise-toolbox' +import { cancelable } from 'promise-toolbox' import { noSuchObject } from 'xo-common/api-errors' -import JobExecutor from '../job-executor' -import { Jobs as JobsDb } from '../models/job' -import { mapToArray } from '../utils' +import { Jobs as JobsDb } from '../../models/job' +import { mapToArray, serializeError } from '../../utils' + +import type Logger from '../logs/loggers/abstract' + +import executeCall from './execute-call' // =================================================================== @@ -42,30 +45,48 @@ type ParamsVector = export type Job = { id: string, name: string, + type: string, userId: string } -export type CallJob = Job & {| +export type CallJob = {| + ...$Exact, method: string, paramsVector: ParamsVector, timeout?: number, type: 'call' |} +type Executor = ({| + app: Object, + cancelToken: any, + data: Object, + job: Job, + logger: Logger, + runJobId: string, + session: Object +|}) => Promise + export default class Jobs { - _executor: JobExecutor + _app: any + _executors: { __proto__: null, [string]: Executor } _jobs: JobsDb + _logger: Logger _runningJobs: { __proto__: null, [string]: boolean } constructor (xo: any) { - this._executor = new JobExecutor(xo) + this._app = xo + const executors = (this._executors = Object.create(null)) const jobsDb = (this._jobs = new JobsDb({ connection: xo._redis, prefix: 'xo:job', indexes: ['user_id', 'key'], })) + this._logger = undefined this._runningJobs = Object.create(null) + executors.call = executeCall + xo.on('clean', () => jobsDb.rebuildIndexes()) xo.on('start', () => { xo.addConfigManager( @@ -74,6 +95,10 @@ export default class Jobs { jobs => Promise.all(mapToArray(jobs, job => jobsDb.save(job))), ['users'] ) + + xo.getLogger('jobs').then(logger => { + this._logger = logger + }) }) } @@ -116,38 +141,86 @@ export default class Jobs { return /* await */ this._jobs.save(job) } + registerJobExecutor (type: string, executor: Executor): void { + const executors = this._executors + if (type in executor) { + throw new Error(`there is already a job executor for type ${type}`) + } + executors[type] = executor + } + async removeJob (id: string) { return /* await */ this._jobs.remove(id) } - _runJob (job: Job, extraParams: {}) { + async _runJob (cancelToken: any, job: Job, data: {}) { const { id } = job + const runningJobs = this._runningJobs if (id in runningJobs) { throw new Error(`job ${id} is already running`) } - runningJobs[id] = true - return pFinally.call( - this._executor.exec( + + const executor = this._executors[job.type] + if (executor === undefined) { + throw new Error(`cannot run job ${id}: no executor for type ${job.type}`) + } + + const logger = this._logger + const runJobId = logger.notice(`Starting execution of ${id}.`, { + event: 'job.start', + userId: job.userId, + jobId: id, + // $FlowFixMe only defined for CallJob + key: job.key, + }) + + runningJobs[id] = runJobId + + try { + const app = this._app + const session = app.createUserConnection() + session.set('user_id', job.userId) + + const status = await executor({ + app, + cancelToken, + data, job, - runJobId => { - runningJobs[id] = runJobId - }, - extraParams - ), - () => { - delete runningJobs[id] - } - ) + logger, + runJobId, + session, + }) + logger.notice(`Execution terminated for ${job.id}.`, { + event: 'job.end', + runJobId, + }) + + session.close() + app.emit('job:terminated', status) + } catch (error) { + logger.error(`The execution of ${id} has failed.`, { + event: 'job.end', + runJobId, + error: serializeError(error), + }) + throw error + } finally { + delete runningJobs[id] + } } - async runJobSequence (idSequence: Array, extraParams: {}) { + @cancelable + async runJobSequence ($cancelToken: any, idSequence: Array, data: {}) { const jobs = await Promise.all( mapToArray(idSequence, id => this.getJob(id)) ) for (const job of jobs) { - await this._runJob(job, extraParams) + if ($cancelToken.requested) { + break + } + await this._runJob($cancelToken, job, data) } } }