fix(xo-server/runJob): register job as soon as job.start (#5620)
This commit is contained in:
parent
b45556062d
commit
fd560c351f
@ -7,6 +7,7 @@ import createLogger from '@xen-orchestra/log'
|
||||
import emitAsync from '@xen-orchestra/emit-async'
|
||||
|
||||
import { CancelToken, ignoreErrors } from 'promise-toolbox'
|
||||
import { defer } from 'golike-defer'
|
||||
import { map as mapToArray } from 'lodash'
|
||||
import { noSuchObject } from 'xo-common/api-errors'
|
||||
|
||||
@ -244,9 +245,11 @@ export default class Jobs {
|
||||
return Promise.all(promises)
|
||||
}
|
||||
|
||||
async _runJob(job: Job, schedule?: Schedule, data_?: any) {
|
||||
@defer
|
||||
async _runJob($defer, job: Job, schedule?: Schedule, data_?: any) {
|
||||
const logger = this._logger
|
||||
const { id, type } = job
|
||||
|
||||
const runJobId = logger.notice(`Starting execution of ${id}.`, {
|
||||
data:
|
||||
type === 'backup' || type === 'metadataBackup'
|
||||
@ -266,27 +269,6 @@ export default class Jobs {
|
||||
key: job.key,
|
||||
type,
|
||||
})
|
||||
|
||||
const app = this._app
|
||||
const user = await app.getUser(job.userId).catch(error => {
|
||||
if (!noSuchObject.is(error)) {
|
||||
throw error
|
||||
}
|
||||
})
|
||||
const data = {
|
||||
callId: Math.random().toString(36).slice(2),
|
||||
method: 'backupNg.runJob',
|
||||
params: {
|
||||
id: job.id,
|
||||
proxy: job.proxy,
|
||||
schedule: schedule?.id,
|
||||
settings: job.settings,
|
||||
vms: job.vms,
|
||||
},
|
||||
timestamp: Date.now(),
|
||||
userId: job.userId,
|
||||
userName: user?.name ?? '(unknown user)',
|
||||
}
|
||||
try {
|
||||
const runningJobs = this._runningJobs
|
||||
|
||||
@ -294,7 +276,7 @@ export default class Jobs {
|
||||
throw new Error(`the job (${id}) is already running`)
|
||||
}
|
||||
|
||||
const executor = this._executors[type]
|
||||
let executor = this._executors[type]
|
||||
if (executor === undefined) {
|
||||
throw new Error(`cannot run job (${id}): no executor for type ${type}`)
|
||||
}
|
||||
@ -303,69 +285,100 @@ export default class Jobs {
|
||||
this.updateJob({ id, runId: runJobId })::ignoreErrors()
|
||||
runningJobs[id] = runJobId
|
||||
|
||||
const runs = this._runs
|
||||
let session
|
||||
try {
|
||||
const { cancel, token } = CancelToken.source()
|
||||
runs[runJobId] = { cancel }
|
||||
|
||||
session = app.createUserConnection()
|
||||
session.set('user_id', job.userId)
|
||||
if (type === 'backup') {
|
||||
await emitAsync.call(
|
||||
app,
|
||||
{
|
||||
onError(error) {
|
||||
log.warn('backup:preCall listener failure', { error })
|
||||
},
|
||||
},
|
||||
'backup:preCall',
|
||||
data
|
||||
)
|
||||
}
|
||||
|
||||
const status = await executor({
|
||||
app,
|
||||
cancelToken: token,
|
||||
data: data_,
|
||||
job,
|
||||
logger,
|
||||
runJobId,
|
||||
schedule,
|
||||
session,
|
||||
})
|
||||
|
||||
await logger.notice(
|
||||
`Execution terminated for ${job.id}.`,
|
||||
{
|
||||
event: 'job.end',
|
||||
runJobId,
|
||||
},
|
||||
true
|
||||
)
|
||||
|
||||
const now = Date.now()
|
||||
type === 'backup' &&
|
||||
app.emit('backup:postCall', {
|
||||
...data,
|
||||
duration: now - data.timestamp,
|
||||
// Result of runJobSequence()
|
||||
result: true,
|
||||
timestamp: now,
|
||||
})
|
||||
|
||||
app.emit('job:terminated', runJobId, {
|
||||
type: job.type,
|
||||
status,
|
||||
})
|
||||
} finally {
|
||||
$defer(() => {
|
||||
this.updateJob({ id, runId: null })::ignoreErrors()
|
||||
delete runningJobs[id]
|
||||
delete runs[runJobId]
|
||||
if (session !== undefined) {
|
||||
session.close()
|
||||
})
|
||||
|
||||
const app = this._app
|
||||
|
||||
if (type === 'backup') {
|
||||
const hookData = {
|
||||
callId: Math.random().toString(36).slice(2),
|
||||
method: 'backupNg.runJob',
|
||||
params: {
|
||||
id: job.id,
|
||||
proxy: job.proxy,
|
||||
schedule: schedule?.id,
|
||||
settings: job.settings,
|
||||
vms: job.vms,
|
||||
},
|
||||
timestamp: Date.now(),
|
||||
userId: job.userId,
|
||||
userName:
|
||||
(
|
||||
await app.getUser(job.userId).catch(error => {
|
||||
if (!noSuchObject.is(error)) {
|
||||
throw error
|
||||
}
|
||||
})
|
||||
)?.name ?? '(unknown user)',
|
||||
}
|
||||
|
||||
executor = (executor =>
|
||||
async function () {
|
||||
await emitAsync.call(
|
||||
app,
|
||||
{
|
||||
onError(error) {
|
||||
log.warn('backup:preCall listener failure', { error })
|
||||
},
|
||||
},
|
||||
'backup:preCall',
|
||||
hookData
|
||||
)
|
||||
|
||||
try {
|
||||
const result = await executor.apply(this, arguments)
|
||||
|
||||
// Result of runJobSequence()
|
||||
hookData.result = true
|
||||
|
||||
return result
|
||||
} catch (error) {
|
||||
hookData.error = serializeError(error)
|
||||
|
||||
throw error
|
||||
} finally {
|
||||
const now = Date.now()
|
||||
hookData.duration = now - hookData.timestamp
|
||||
hookData.timestamp = now
|
||||
app.emit('backup:postCall', hookData)
|
||||
}
|
||||
})(executor)
|
||||
}
|
||||
|
||||
const session = app.createUserConnection()
|
||||
$defer.call(session, 'close')
|
||||
session.set('user_id', job.userId)
|
||||
|
||||
const { cancel, token } = CancelToken.source()
|
||||
|
||||
const runs = this._runs
|
||||
runs[runJobId] = { cancel }
|
||||
$defer(() => delete runs[runJobId])
|
||||
|
||||
const status = await executor({
|
||||
app,
|
||||
cancelToken: token,
|
||||
data: data_,
|
||||
job,
|
||||
logger,
|
||||
runJobId,
|
||||
schedule,
|
||||
session,
|
||||
})
|
||||
|
||||
await logger.notice(
|
||||
`Execution terminated for ${job.id}.`,
|
||||
{
|
||||
event: 'job.end',
|
||||
runJobId,
|
||||
},
|
||||
true
|
||||
)
|
||||
|
||||
app.emit('job:terminated', runJobId, { status, type })
|
||||
} catch (error) {
|
||||
await logger.error(
|
||||
`The execution of ${id} has failed.`,
|
||||
@ -376,18 +389,6 @@ export default class Jobs {
|
||||
},
|
||||
true
|
||||
)
|
||||
const now = Date.now()
|
||||
type === 'backup' &&
|
||||
app.emit('backup:postCall', {
|
||||
...data,
|
||||
duration: now - data.timestamp,
|
||||
error: serializeError(error),
|
||||
timestamp: now,
|
||||
})
|
||||
app.emit('job:terminated', runJobId, {
|
||||
type: job.type,
|
||||
})
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user