chore(xo-server): inject schedule in jobs (#2710)

This commit is contained in:
Julien Fontanet 2018-03-01 16:27:51 +01:00 committed by GitHub
parent 65e1ac2ef9
commit d99e643634
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 21 additions and 21 deletions

View File

@ -1,6 +1,6 @@
import { createPredicate } from 'value-matcher' import { createPredicate } from 'value-matcher'
import { timeout } from 'promise-toolbox' import { timeout } from 'promise-toolbox'
import { assign, filter, find, isEmpty, map, mapValues } from 'lodash' import { assign, filter, isEmpty, map, mapValues } from 'lodash'
import { crossProduct } from '../../math' import { crossProduct } from '../../math'
import { asyncMap, serializeError, thunkToArray } from '../../utils' import { asyncMap, serializeError, thunkToArray } from '../../utils'
@ -47,10 +47,10 @@ export function resolveParamsVector (paramsVector) {
export default async function executeJobCall ({ export default async function executeJobCall ({
app, app,
data,
job, job,
logger, logger,
runJobId, runJobId,
schedule,
session, session,
}) { }) {
const { paramsVector } = job const { paramsVector } = job
@ -58,8 +58,6 @@ export default async function executeJobCall ({
? resolveParamsVector.call(app, paramsVector) ? resolveParamsVector.call(app, paramsVector)
: [{}] // One call with no parameters : [{}] // One call with no parameters
const schedule = find(await app.getAllSchedules(), { jobId: job.id })
const execStatus = { const execStatus = {
calls: {}, calls: {},
runJobId, runJobId,
@ -68,7 +66,6 @@ export default async function executeJobCall ({
} }
await asyncMap(paramsFlatVector, params => { await asyncMap(paramsFlatVector, params => {
Object.assign(params, data)
const runCallId = logger.notice( const runCallId = logger.notice(
`Starting ${job.method} call. (${job.id})`, `Starting ${job.method} call. (${job.id})`,
{ {

View File

@ -12,11 +12,19 @@ import { Jobs as JobsDb } from '../../models/job'
import { mapToArray, serializeError } from '../../utils' import { mapToArray, serializeError } from '../../utils'
import type Logger from '../logs/loggers/abstract' import type Logger from '../logs/loggers/abstract'
import { type Schedule } from '../scheduling'
import executeCall from './execute-call' import executeCall from './execute-call'
// =================================================================== // ===================================================================
export type Job = {
id: string,
name: string,
type: string,
userId: string
}
type ParamsVector = type ParamsVector =
| {| | {|
items: Array<Object>, items: Array<Object>,
@ -42,13 +50,6 @@ type ParamsVector =
values: any values: any
|} |}
export type Job = {
id: string,
name: string,
type: string,
userId: string
}
export type CallJob = {| export type CallJob = {|
...$Exact<Job>, ...$Exact<Job>,
method: string, method: string,
@ -57,13 +58,13 @@ export type CallJob = {|
type: 'call' type: 'call'
|} |}
type Executor = ({| export type Executor = ({|
app: Object, app: Object,
cancelToken: any, cancelToken: any,
data: Object,
job: Job, job: Job,
logger: Logger, logger: Logger,
runJobId: string, runJobId: string,
schedule?: Schedule,
session: Object session: Object
|}) => Promise<void> |}) => Promise<void>
@ -153,7 +154,7 @@ export default class Jobs {
return /* await */ this._jobs.remove(id) return /* await */ this._jobs.remove(id)
} }
async _runJob (cancelToken: any, job: Job, data: {}) { async _runJob (cancelToken: any, job: Job, schedule?: Schedule) {
const { id } = job const { id } = job
const runningJobs = this._runningJobs const runningJobs = this._runningJobs
@ -185,10 +186,10 @@ export default class Jobs {
const status = await executor({ const status = await executor({
app, app,
cancelToken, cancelToken,
data,
job, job,
logger, logger,
runJobId, runJobId,
schedule,
session, session,
}) })
logger.notice(`Execution terminated for ${job.id}.`, { logger.notice(`Execution terminated for ${job.id}.`, {
@ -211,7 +212,11 @@ export default class Jobs {
} }
@cancelable @cancelable
async runJobSequence ($cancelToken: any, idSequence: Array<string>, data: {}) { async runJobSequence (
$cancelToken: any,
idSequence: Array<string>,
schedule?: Schedule
) {
const jobs = await Promise.all( const jobs = await Promise.all(
mapToArray(idSequence, id => this.getJob(id)) mapToArray(idSequence, id => this.getJob(id))
) )
@ -220,7 +225,7 @@ export default class Jobs {
if ($cancelToken.requested) { if ($cancelToken.requested) {
break break
} }
await this._runJob($cancelToken, job, data) await this._runJob($cancelToken, job, schedule)
} }
} }
} }

View File

@ -155,9 +155,7 @@ export default class Scheduling {
this._runs[id] = createSchedule( this._runs[id] = createSchedule(
schedule.cron, schedule.cron,
schedule.timezone schedule.timezone
).startJob(() => ).startJob(() => this._app.runJobSequence([schedule.jobId], schedule))
this._app.runJobSequence([schedule.jobId], { _schedule: schedule })
)
} }
} }