fix(xo-server/runJob): register job as soon as job.start (#5620)

This commit is contained in:
Julien Fontanet 2021-02-25 17:00:07 +01:00 committed by GitHub
parent b45556062d
commit fd560c351f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -7,6 +7,7 @@ import createLogger from '@xen-orchestra/log'
import emitAsync from '@xen-orchestra/emit-async'
import { CancelToken, ignoreErrors } from 'promise-toolbox'
import { defer } from 'golike-defer'
import { map as mapToArray } from 'lodash'
import { noSuchObject } from 'xo-common/api-errors'
@ -244,9 +245,11 @@ export default class Jobs {
return Promise.all(promises)
}
async _runJob(job: Job, schedule?: Schedule, data_?: any) {
@defer
async _runJob($defer, job: Job, schedule?: Schedule, data_?: any) {
const logger = this._logger
const { id, type } = job
const runJobId = logger.notice(`Starting execution of ${id}.`, {
data:
type === 'backup' || type === 'metadataBackup'
@ -266,27 +269,6 @@ export default class Jobs {
key: job.key,
type,
})
const app = this._app
const user = await app.getUser(job.userId).catch(error => {
if (!noSuchObject.is(error)) {
throw error
}
})
const data = {
callId: Math.random().toString(36).slice(2),
method: 'backupNg.runJob',
params: {
id: job.id,
proxy: job.proxy,
schedule: schedule?.id,
settings: job.settings,
vms: job.vms,
},
timestamp: Date.now(),
userId: job.userId,
userName: user?.name ?? '(unknown user)',
}
try {
const runningJobs = this._runningJobs
@ -294,7 +276,7 @@ export default class Jobs {
throw new Error(`the job (${id}) is already running`)
}
const executor = this._executors[type]
let executor = this._executors[type]
if (executor === undefined) {
throw new Error(`cannot run job (${id}): no executor for type ${type}`)
}
@ -303,69 +285,100 @@ export default class Jobs {
this.updateJob({ id, runId: runJobId })::ignoreErrors()
runningJobs[id] = runJobId
const runs = this._runs
let session
try {
const { cancel, token } = CancelToken.source()
runs[runJobId] = { cancel }
session = app.createUserConnection()
session.set('user_id', job.userId)
if (type === 'backup') {
await emitAsync.call(
app,
{
onError(error) {
log.warn('backup:preCall listener failure', { error })
},
},
'backup:preCall',
data
)
}
const status = await executor({
app,
cancelToken: token,
data: data_,
job,
logger,
runJobId,
schedule,
session,
})
await logger.notice(
`Execution terminated for ${job.id}.`,
{
event: 'job.end',
runJobId,
},
true
)
const now = Date.now()
type === 'backup' &&
app.emit('backup:postCall', {
...data,
duration: now - data.timestamp,
// Result of runJobSequence()
result: true,
timestamp: now,
})
app.emit('job:terminated', runJobId, {
type: job.type,
status,
})
} finally {
$defer(() => {
this.updateJob({ id, runId: null })::ignoreErrors()
delete runningJobs[id]
delete runs[runJobId]
if (session !== undefined) {
session.close()
})
const app = this._app
if (type === 'backup') {
const hookData = {
callId: Math.random().toString(36).slice(2),
method: 'backupNg.runJob',
params: {
id: job.id,
proxy: job.proxy,
schedule: schedule?.id,
settings: job.settings,
vms: job.vms,
},
timestamp: Date.now(),
userId: job.userId,
userName:
(
await app.getUser(job.userId).catch(error => {
if (!noSuchObject.is(error)) {
throw error
}
})
)?.name ?? '(unknown user)',
}
executor = (executor =>
async function () {
await emitAsync.call(
app,
{
onError(error) {
log.warn('backup:preCall listener failure', { error })
},
},
'backup:preCall',
hookData
)
try {
const result = await executor.apply(this, arguments)
// Result of runJobSequence()
hookData.result = true
return result
} catch (error) {
hookData.error = serializeError(error)
throw error
} finally {
const now = Date.now()
hookData.duration = now - hookData.timestamp
hookData.timestamp = now
app.emit('backup:postCall', hookData)
}
})(executor)
}
const session = app.createUserConnection()
$defer.call(session, 'close')
session.set('user_id', job.userId)
const { cancel, token } = CancelToken.source()
const runs = this._runs
runs[runJobId] = { cancel }
$defer(() => delete runs[runJobId])
const status = await executor({
app,
cancelToken: token,
data: data_,
job,
logger,
runJobId,
schedule,
session,
})
await logger.notice(
`Execution terminated for ${job.id}.`,
{
event: 'job.end',
runJobId,
},
true
)
app.emit('job:terminated', runJobId, { status, type })
} catch (error) {
await logger.error(
`The execution of ${id} has failed.`,
@ -376,18 +389,6 @@ export default class Jobs {
},
true
)
const now = Date.now()
type === 'backup' &&
app.emit('backup:postCall', {
...data,
duration: now - data.timestamp,
error: serializeError(error),
timestamp: now,
})
app.emit('job:terminated', runJobId, {
type: job.type,
})
throw error
}
}