chore(xo-server): improve jobs code (#2696)

- add type filtering (default to `call`)
- support passing extra params to the call
- Flow typing
This commit is contained in:
Julien Fontanet 2018-02-28 16:22:41 +01:00 committed by GitHub
parent e25d6b712d
commit 4257cbb618
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 82 additions and 23 deletions

View File

@ -70,7 +70,7 @@ export default class JobExecutor {
) )
} }
async exec (job, onStart) { async exec (job, onStart, extraParams) {
const runJobId = this._logger.notice(`Starting execution of ${job.id}.`, { const runJobId = this._logger.notice(`Starting execution of ${job.id}.`, {
event: 'job.start', event: 'job.start',
userId: job.userId, userId: job.userId,
@ -83,7 +83,7 @@ export default class JobExecutor {
try { try {
if (job.type === 'call') { if (job.type === 'call') {
const execStatus = await this._execCall(job, runJobId) const execStatus = await this._execCall(job, runJobId, extraParams)
this.xo.emit('job:terminated', execStatus) this.xo.emit('job:terminated', execStatus)
} else { } else {
@ -105,7 +105,7 @@ export default class JobExecutor {
} }
} }
async _execCall (job, runJobId) { async _execCall (job, runJobId, extraParams) {
const { paramsVector } = job const { paramsVector } = job
const paramsFlatVector = paramsVector const paramsFlatVector = paramsVector
? resolveParamsVector.call(this, paramsVector) ? resolveParamsVector.call(this, paramsVector)
@ -125,6 +125,7 @@ export default class JobExecutor {
} }
await asyncMap(paramsFlatVector, params => { await asyncMap(paramsFlatVector, params => {
Object.assign(params, extraParams)
const runCallId = this._logger.notice( const runCallId = this._logger.notice(
`Starting ${job.method} call. (${job.id})`, `Starting ${job.method} call. (${job.id})`,
{ {

View File

@ -1,5 +1,11 @@
// @flow
import type { Pattern } from 'value-matcher'
// $FlowFixMe
import { assign } from 'lodash' import { assign } from 'lodash'
import { lastly } from 'promise-toolbox' // $FlowFixMe
import { finally as pFinally } from 'promise-toolbox'
import { noSuchObject } from 'xo-common/api-errors' import { noSuchObject } from 'xo-common/api-errors'
import JobExecutor from '../job-executor' import JobExecutor from '../job-executor'
@ -8,8 +14,50 @@ import { mapToArray } from '../utils'
// =================================================================== // ===================================================================
type ParamsVector =
| {|
items: Array<Object>,
type: 'crossProduct'
|}
| {|
mapping: Object,
type: 'extractProperties',
value: Object
|}
| {|
pattern: Pattern,
type: 'fetchObjects'
|}
| {|
collection: Object,
iteratee: Function,
paramName?: string,
type: 'map'
|}
| {|
type: 'set',
values: any
|}
export type Job = {
id: string,
name: string,
userId: string
}
export type CallJob = Job & {|
method: string,
paramsVector: ParamsVector,
timeout?: number,
type: 'call'
|}
export default class Jobs { export default class Jobs {
constructor (xo) { _executor: JobExecutor
_jobs: JobsDb
_runningJobs: { __proto__: null, [string]: boolean }
constructor (xo: any) {
this._executor = new JobExecutor(xo) this._executor = new JobExecutor(xo)
const jobsDb = (this._jobs = new JobsDb({ const jobsDb = (this._jobs = new JobsDb({
connection: xo._redis, connection: xo._redis,
@ -29,31 +77,35 @@ export default class Jobs {
}) })
} }
async getAllJobs () { async getAllJobs (type: string = 'call'): Promise<Array<Job>> {
const jobs = await this._jobs.get() const jobs = await this._jobs.get()
const runningJobs = this._runningJobs const runningJobs = this._runningJobs
const result = []
jobs.forEach(job => { jobs.forEach(job => {
if (job.type === type) {
job.runId = runningJobs[job.id] job.runId = runningJobs[job.id]
result.push(job)
}
}) })
return jobs return result
} }
async getJob (id) { async getJob (id: string, type: string = 'call'): Promise<Job> {
const job = await this._jobs.first(id) const job = await this._jobs.first(id)
if (!job) { if (job === null || job.type !== type) {
throw noSuchObject(id, 'job') throw noSuchObject(id, 'job')
} }
return job.properties return job.properties
} }
async createJob (job) { async createJob (job: $Diff<Job, {| id: string |}>): Promise<Job> {
// TODO: use plain objects // TODO: use plain objects
const job_ = await this._jobs.create(job) const job_ = await this._jobs.create(job)
return job_.properties return job_.properties
} }
async updateJob ({ id, ...props }) { async updateJob ({ id, ...props }: $Shape<Job>) {
const job = await this.getJob(id) const job = await this.getJob(id)
assign(job, props) assign(job, props)
@ -64,32 +116,38 @@ export default class Jobs {
return /* await */ this._jobs.save(job) return /* await */ this._jobs.save(job)
} }
async removeJob (id) { async removeJob (id: string) {
return /* await */ this._jobs.remove(id) return /* await */ this._jobs.remove(id)
} }
_runJob (job) { _runJob (job: Job, extraParams: {}) {
const { id } = job const { id } = job
const runningJobs = this._runningJobs const runningJobs = this._runningJobs
if (runningJobs[id]) { if (id in runningJobs) {
throw new Error(`job ${id} is already running`) throw new Error(`job ${id} is already running`)
} }
return this._executor runningJobs[id] = true
.exec(job, runJobId => { return pFinally.call(
this._executor.exec(
job,
runJobId => {
runningJobs[id] = runJobId runningJobs[id] = runJobId
}) },
::lastly(() => { extraParams
),
() => {
delete runningJobs[id] delete runningJobs[id]
}) }
)
} }
async runJobSequence (idSequence) { async runJobSequence (idSequence: Array<string>, extraParams: {}) {
const jobs = await Promise.all( const jobs = await Promise.all(
mapToArray(idSequence, id => this.getJob(id)) mapToArray(idSequence, id => this.getJob(id))
) )
for (const job of jobs) { for (const job of jobs) {
await this._runJob(job) await this._runJob(job, extraParams)
} }
} }
} }