chore(xo-server): pluggable job executors (#2707)

This commit is contained in:
Julien Fontanet 2018-03-01 12:10:08 +01:00 committed by GitHub
parent dccddd78a6
commit 488eed046e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 252 additions and 244 deletions

View File

@ -1,190 +0,0 @@
import { BaseError } from 'make-error'
import { createPredicate } from 'value-matcher'
import { timeout } from 'promise-toolbox'
import { assign, filter, find, isEmpty, map, mapValues } from 'lodash'
import { crossProduct } from './math'
import { asyncMap, serializeError, thunkToArray } from './utils'
export class JobExecutorError extends BaseError {}
export class UnsupportedJobType extends JobExecutorError {
constructor (job) {
super('Unknown job type: ' + job.type)
}
}
export class UnsupportedVectorType extends JobExecutorError {
constructor (vector) {
super('Unknown vector type: ' + vector.type)
}
}
// ===================================================================
const paramsVectorActionsMap = {
extractProperties ({ mapping, value }) {
return mapValues(mapping, key => value[key])
},
crossProduct ({ items }) {
return thunkToArray(
crossProduct(map(items, value => resolveParamsVector.call(this, value)))
)
},
fetchObjects ({ pattern }) {
const objects = filter(this.xo.getObjects(), createPredicate(pattern))
if (isEmpty(objects)) {
throw new Error('no objects match this pattern')
}
return objects
},
map ({ collection, iteratee, paramName = 'value' }) {
return map(resolveParamsVector.call(this, collection), value => {
return resolveParamsVector.call(this, {
...iteratee,
[paramName]: value,
})
})
},
set: ({ values }) => values,
}
export function resolveParamsVector (paramsVector) {
const visitor = paramsVectorActionsMap[paramsVector.type]
if (!visitor) {
throw new Error(`Unsupported function '${paramsVector.type}'.`)
}
return visitor.call(this, paramsVector)
}
// ===================================================================
export default class JobExecutor {
constructor (xo) {
this.xo = xo
// The logger is not available until Xo has started.
xo.on('start', () =>
xo.getLogger('jobs').then(logger => {
this._logger = logger
})
)
}
async exec (job, onStart, extraParams) {
const runJobId = this._logger.notice(`Starting execution of ${job.id}.`, {
event: 'job.start',
userId: job.userId,
jobId: job.id,
key: job.key,
})
if (onStart !== undefined) {
onStart(runJobId)
}
try {
if (job.type === 'call') {
const execStatus = await this._execCall(job, runJobId, extraParams)
this.xo.emit('job:terminated', execStatus)
} else {
throw new UnsupportedJobType(job)
}
this._logger.notice(`Execution terminated for ${job.id}.`, {
event: 'job.end',
runJobId,
})
} catch (error) {
this._logger.error(`The execution of ${job.id} has failed.`, {
event: 'job.end',
runJobId,
error: serializeError(error),
})
throw error
}
}
async _execCall (job, runJobId, extraParams) {
const { paramsVector } = job
const paramsFlatVector = paramsVector
? resolveParamsVector.call(this, paramsVector)
: [{}] // One call with no parameters
const connection = this.xo.createUserConnection()
connection.set('user_id', job.userId)
const schedule = find(await this.xo.getAllSchedules(), { jobId: job.id })
const execStatus = {
calls: {},
runJobId,
start: Date.now(),
timezone: schedule !== undefined ? schedule.timezone : undefined,
}
await asyncMap(paramsFlatVector, params => {
Object.assign(params, extraParams)
const runCallId = this._logger.notice(
`Starting ${job.method} call. (${job.id})`,
{
event: 'jobCall.start',
runJobId,
method: job.method,
params,
}
)
const call = (execStatus.calls[runCallId] = {
method: job.method,
params,
start: Date.now(),
})
let promise = this.xo.callApiMethod(
connection,
job.method,
assign({}, params)
)
if (job.timeout) {
promise = promise::timeout(job.timeout)
}
return promise.then(
value => {
this._logger.notice(
`Call ${job.method} (${runCallId}) is a success. (${job.id})`,
{
event: 'jobCall.end',
runJobId,
runCallId,
returnedValue: value,
}
)
call.returnedValue = value
call.end = Date.now()
},
reason => {
this._logger.notice(
`Call ${job.method} (${runCallId}) has failed. (${job.id})`,
{
event: 'jobCall.end',
runJobId,
runCallId,
error: serializeError(reason),
}
)
call.error = reason
call.end = Date.now()
}
)
})
connection.close()
execStatus.end = Date.now()
return execStatus
}
}

View File

@ -0,0 +1,127 @@
import { createPredicate } from 'value-matcher'
import { timeout } from 'promise-toolbox'
import { assign, filter, find, isEmpty, map, mapValues } from 'lodash'
import { crossProduct } from '../../math'
import { asyncMap, serializeError, thunkToArray } from '../../utils'
// ===================================================================
const paramsVectorActionsMap = {
extractProperties ({ mapping, value }) {
return mapValues(mapping, key => value[key])
},
crossProduct ({ items }) {
return thunkToArray(
crossProduct(map(items, value => resolveParamsVector.call(this, value)))
)
},
fetchObjects ({ pattern }) {
const objects = filter(this.getObjects(), createPredicate(pattern))
if (isEmpty(objects)) {
throw new Error('no objects match this pattern')
}
return objects
},
map ({ collection, iteratee, paramName = 'value' }) {
return map(resolveParamsVector.call(this, collection), value => {
return resolveParamsVector.call(this, {
...iteratee,
[paramName]: value,
})
})
},
set: ({ values }) => values,
}
export function resolveParamsVector (paramsVector) {
const visitor = paramsVectorActionsMap[paramsVector.type]
if (!visitor) {
throw new Error(`Unsupported function '${paramsVector.type}'.`)
}
return visitor.call(this, paramsVector)
}
// ===================================================================
export default async function executeJobCall ({
app,
data,
job,
logger,
runJobId,
session,
}) {
const { paramsVector } = job
const paramsFlatVector = paramsVector
? resolveParamsVector.call(app, paramsVector)
: [{}] // One call with no parameters
const schedule = find(await app.getAllSchedules(), { jobId: job.id })
const execStatus = {
calls: {},
runJobId,
start: Date.now(),
timezone: schedule !== undefined ? schedule.timezone : undefined,
}
await asyncMap(paramsFlatVector, params => {
Object.assign(params, data)
const runCallId = logger.notice(
`Starting ${job.method} call. (${job.id})`,
{
event: 'jobCall.start',
runJobId,
method: job.method,
params,
}
)
const call = (execStatus.calls[runCallId] = {
method: job.method,
params,
start: Date.now(),
})
let promise = app.callApiMethod(session, job.method, assign({}, params))
if (job.timeout) {
promise = promise::timeout(job.timeout)
}
return promise.then(
value => {
logger.notice(
`Call ${job.method} (${runCallId}) is a success. (${job.id})`,
{
event: 'jobCall.end',
runJobId,
runCallId,
returnedValue: value,
}
)
call.returnedValue = value
call.end = Date.now()
},
reason => {
logger.notice(
`Call ${job.method} (${runCallId}) has failed. (${job.id})`,
{
event: 'jobCall.end',
runJobId,
runCallId,
error: serializeError(reason),
}
)
call.error = reason
call.end = Date.now()
}
)
})
execStatus.end = Date.now()
return execStatus
}

View File

@ -1,7 +1,7 @@
/* eslint-env jest */
import { forEach } from 'lodash'
import { resolveParamsVector } from './job-executor'
import { resolveParamsVector } from './execute-call'
describe('resolveParamsVector', function () {
forEach(
@ -68,7 +68,6 @@ describe('resolveParamsVector', function () {
// Context.
{
xo: {
getObjects: function () {
return [
{
@ -100,7 +99,6 @@ describe('resolveParamsVector', function () {
]
},
},
},
],
},
([expectedResult, entry, context], name) => {

View File

@ -5,12 +5,15 @@ import type { Pattern } from 'value-matcher'
// $FlowFixMe
import { assign } from 'lodash'
// $FlowFixMe
import { finally as pFinally } from 'promise-toolbox'
import { cancelable } from 'promise-toolbox'
import { noSuchObject } from 'xo-common/api-errors'
import JobExecutor from '../job-executor'
import { Jobs as JobsDb } from '../models/job'
import { mapToArray } from '../utils'
import { Jobs as JobsDb } from '../../models/job'
import { mapToArray, serializeError } from '../../utils'
import type Logger from '../logs/loggers/abstract'
import executeCall from './execute-call'
// ===================================================================
@ -42,30 +45,48 @@ type ParamsVector =
export type Job = {
id: string,
name: string,
type: string,
userId: string
}
export type CallJob = Job & {|
export type CallJob = {|
...$Exact<Job>,
method: string,
paramsVector: ParamsVector,
timeout?: number,
type: 'call'
|}
type Executor = ({|
app: Object,
cancelToken: any,
data: Object,
job: Job,
logger: Logger,
runJobId: string,
session: Object
|}) => Promise<void>
export default class Jobs {
_executor: JobExecutor
_app: any
_executors: { __proto__: null, [string]: Executor }
_jobs: JobsDb
_logger: Logger
_runningJobs: { __proto__: null, [string]: boolean }
constructor (xo: any) {
this._executor = new JobExecutor(xo)
this._app = xo
const executors = (this._executors = Object.create(null))
const jobsDb = (this._jobs = new JobsDb({
connection: xo._redis,
prefix: 'xo:job',
indexes: ['user_id', 'key'],
}))
this._logger = undefined
this._runningJobs = Object.create(null)
executors.call = executeCall
xo.on('clean', () => jobsDb.rebuildIndexes())
xo.on('start', () => {
xo.addConfigManager(
@ -74,6 +95,10 @@ export default class Jobs {
jobs => Promise.all(mapToArray(jobs, job => jobsDb.save(job))),
['users']
)
xo.getLogger('jobs').then(logger => {
this._logger = logger
})
})
}
@ -116,38 +141,86 @@ export default class Jobs {
return /* await */ this._jobs.save(job)
}
registerJobExecutor (type: string, executor: Executor): void {
const executors = this._executors
if (type in executor) {
throw new Error(`there is already a job executor for type ${type}`)
}
executors[type] = executor
}
async removeJob (id: string) {
return /* await */ this._jobs.remove(id)
}
_runJob (job: Job, extraParams: {}) {
async _runJob (cancelToken: any, job: Job, data: {}) {
const { id } = job
const runningJobs = this._runningJobs
if (id in runningJobs) {
throw new Error(`job ${id} is already running`)
}
runningJobs[id] = true
return pFinally.call(
this._executor.exec(
job,
runJobId => {
runningJobs[id] = runJobId
},
extraParams
),
() => {
delete runningJobs[id]
}
)
const executor = this._executors[job.type]
if (executor === undefined) {
throw new Error(`cannot run job ${id}: no executor for type ${job.type}`)
}
async runJobSequence (idSequence: Array<string>, extraParams: {}) {
const logger = this._logger
const runJobId = logger.notice(`Starting execution of ${id}.`, {
event: 'job.start',
userId: job.userId,
jobId: id,
// $FlowFixMe only defined for CallJob
key: job.key,
})
runningJobs[id] = runJobId
try {
const app = this._app
const session = app.createUserConnection()
session.set('user_id', job.userId)
const status = await executor({
app,
cancelToken,
data,
job,
logger,
runJobId,
session,
})
logger.notice(`Execution terminated for ${job.id}.`, {
event: 'job.end',
runJobId,
})
session.close()
app.emit('job:terminated', status)
} catch (error) {
logger.error(`The execution of ${id} has failed.`, {
event: 'job.end',
runJobId,
error: serializeError(error),
})
throw error
} finally {
delete runningJobs[id]
}
}
@cancelable
async runJobSequence ($cancelToken: any, idSequence: Array<string>, data: {}) {
const jobs = await Promise.all(
mapToArray(idSequence, id => this.getJob(id))
)
for (const job of jobs) {
await this._runJob(job, extraParams)
if ($cancelToken.requested) {
break
}
await this._runJob($cancelToken, job, data)
}
}
}