fix(xo-server/runJob): register job as soon as job.start (#5620)
This commit is contained in:
parent
b45556062d
commit
fd560c351f
@ -7,6 +7,7 @@ import createLogger from '@xen-orchestra/log'
|
|||||||
import emitAsync from '@xen-orchestra/emit-async'
|
import emitAsync from '@xen-orchestra/emit-async'
|
||||||
|
|
||||||
import { CancelToken, ignoreErrors } from 'promise-toolbox'
|
import { CancelToken, ignoreErrors } from 'promise-toolbox'
|
||||||
|
import { defer } from 'golike-defer'
|
||||||
import { map as mapToArray } from 'lodash'
|
import { map as mapToArray } from 'lodash'
|
||||||
import { noSuchObject } from 'xo-common/api-errors'
|
import { noSuchObject } from 'xo-common/api-errors'
|
||||||
|
|
||||||
@ -244,9 +245,11 @@ export default class Jobs {
|
|||||||
return Promise.all(promises)
|
return Promise.all(promises)
|
||||||
}
|
}
|
||||||
|
|
||||||
async _runJob(job: Job, schedule?: Schedule, data_?: any) {
|
@defer
|
||||||
|
async _runJob($defer, job: Job, schedule?: Schedule, data_?: any) {
|
||||||
const logger = this._logger
|
const logger = this._logger
|
||||||
const { id, type } = job
|
const { id, type } = job
|
||||||
|
|
||||||
const runJobId = logger.notice(`Starting execution of ${id}.`, {
|
const runJobId = logger.notice(`Starting execution of ${id}.`, {
|
||||||
data:
|
data:
|
||||||
type === 'backup' || type === 'metadataBackup'
|
type === 'backup' || type === 'metadataBackup'
|
||||||
@ -266,27 +269,6 @@ export default class Jobs {
|
|||||||
key: job.key,
|
key: job.key,
|
||||||
type,
|
type,
|
||||||
})
|
})
|
||||||
|
|
||||||
const app = this._app
|
|
||||||
const user = await app.getUser(job.userId).catch(error => {
|
|
||||||
if (!noSuchObject.is(error)) {
|
|
||||||
throw error
|
|
||||||
}
|
|
||||||
})
|
|
||||||
const data = {
|
|
||||||
callId: Math.random().toString(36).slice(2),
|
|
||||||
method: 'backupNg.runJob',
|
|
||||||
params: {
|
|
||||||
id: job.id,
|
|
||||||
proxy: job.proxy,
|
|
||||||
schedule: schedule?.id,
|
|
||||||
settings: job.settings,
|
|
||||||
vms: job.vms,
|
|
||||||
},
|
|
||||||
timestamp: Date.now(),
|
|
||||||
userId: job.userId,
|
|
||||||
userName: user?.name ?? '(unknown user)',
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
const runningJobs = this._runningJobs
|
const runningJobs = this._runningJobs
|
||||||
|
|
||||||
@ -294,7 +276,7 @@ export default class Jobs {
|
|||||||
throw new Error(`the job (${id}) is already running`)
|
throw new Error(`the job (${id}) is already running`)
|
||||||
}
|
}
|
||||||
|
|
||||||
const executor = this._executors[type]
|
let executor = this._executors[type]
|
||||||
if (executor === undefined) {
|
if (executor === undefined) {
|
||||||
throw new Error(`cannot run job (${id}): no executor for type ${type}`)
|
throw new Error(`cannot run job (${id}): no executor for type ${type}`)
|
||||||
}
|
}
|
||||||
@ -303,69 +285,100 @@ export default class Jobs {
|
|||||||
this.updateJob({ id, runId: runJobId })::ignoreErrors()
|
this.updateJob({ id, runId: runJobId })::ignoreErrors()
|
||||||
runningJobs[id] = runJobId
|
runningJobs[id] = runJobId
|
||||||
|
|
||||||
const runs = this._runs
|
$defer(() => {
|
||||||
let session
|
|
||||||
try {
|
|
||||||
const { cancel, token } = CancelToken.source()
|
|
||||||
runs[runJobId] = { cancel }
|
|
||||||
|
|
||||||
session = app.createUserConnection()
|
|
||||||
session.set('user_id', job.userId)
|
|
||||||
if (type === 'backup') {
|
|
||||||
await emitAsync.call(
|
|
||||||
app,
|
|
||||||
{
|
|
||||||
onError(error) {
|
|
||||||
log.warn('backup:preCall listener failure', { error })
|
|
||||||
},
|
|
||||||
},
|
|
||||||
'backup:preCall',
|
|
||||||
data
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
const status = await executor({
|
|
||||||
app,
|
|
||||||
cancelToken: token,
|
|
||||||
data: data_,
|
|
||||||
job,
|
|
||||||
logger,
|
|
||||||
runJobId,
|
|
||||||
schedule,
|
|
||||||
session,
|
|
||||||
})
|
|
||||||
|
|
||||||
await logger.notice(
|
|
||||||
`Execution terminated for ${job.id}.`,
|
|
||||||
{
|
|
||||||
event: 'job.end',
|
|
||||||
runJobId,
|
|
||||||
},
|
|
||||||
true
|
|
||||||
)
|
|
||||||
|
|
||||||
const now = Date.now()
|
|
||||||
type === 'backup' &&
|
|
||||||
app.emit('backup:postCall', {
|
|
||||||
...data,
|
|
||||||
duration: now - data.timestamp,
|
|
||||||
// Result of runJobSequence()
|
|
||||||
result: true,
|
|
||||||
timestamp: now,
|
|
||||||
})
|
|
||||||
|
|
||||||
app.emit('job:terminated', runJobId, {
|
|
||||||
type: job.type,
|
|
||||||
status,
|
|
||||||
})
|
|
||||||
} finally {
|
|
||||||
this.updateJob({ id, runId: null })::ignoreErrors()
|
this.updateJob({ id, runId: null })::ignoreErrors()
|
||||||
delete runningJobs[id]
|
delete runningJobs[id]
|
||||||
delete runs[runJobId]
|
})
|
||||||
if (session !== undefined) {
|
|
||||||
session.close()
|
const app = this._app
|
||||||
|
|
||||||
|
if (type === 'backup') {
|
||||||
|
const hookData = {
|
||||||
|
callId: Math.random().toString(36).slice(2),
|
||||||
|
method: 'backupNg.runJob',
|
||||||
|
params: {
|
||||||
|
id: job.id,
|
||||||
|
proxy: job.proxy,
|
||||||
|
schedule: schedule?.id,
|
||||||
|
settings: job.settings,
|
||||||
|
vms: job.vms,
|
||||||
|
},
|
||||||
|
timestamp: Date.now(),
|
||||||
|
userId: job.userId,
|
||||||
|
userName:
|
||||||
|
(
|
||||||
|
await app.getUser(job.userId).catch(error => {
|
||||||
|
if (!noSuchObject.is(error)) {
|
||||||
|
throw error
|
||||||
|
}
|
||||||
|
})
|
||||||
|
)?.name ?? '(unknown user)',
|
||||||
}
|
}
|
||||||
|
|
||||||
|
executor = (executor =>
|
||||||
|
async function () {
|
||||||
|
await emitAsync.call(
|
||||||
|
app,
|
||||||
|
{
|
||||||
|
onError(error) {
|
||||||
|
log.warn('backup:preCall listener failure', { error })
|
||||||
|
},
|
||||||
|
},
|
||||||
|
'backup:preCall',
|
||||||
|
hookData
|
||||||
|
)
|
||||||
|
|
||||||
|
try {
|
||||||
|
const result = await executor.apply(this, arguments)
|
||||||
|
|
||||||
|
// Result of runJobSequence()
|
||||||
|
hookData.result = true
|
||||||
|
|
||||||
|
return result
|
||||||
|
} catch (error) {
|
||||||
|
hookData.error = serializeError(error)
|
||||||
|
|
||||||
|
throw error
|
||||||
|
} finally {
|
||||||
|
const now = Date.now()
|
||||||
|
hookData.duration = now - hookData.timestamp
|
||||||
|
hookData.timestamp = now
|
||||||
|
app.emit('backup:postCall', hookData)
|
||||||
|
}
|
||||||
|
})(executor)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const session = app.createUserConnection()
|
||||||
|
$defer.call(session, 'close')
|
||||||
|
session.set('user_id', job.userId)
|
||||||
|
|
||||||
|
const { cancel, token } = CancelToken.source()
|
||||||
|
|
||||||
|
const runs = this._runs
|
||||||
|
runs[runJobId] = { cancel }
|
||||||
|
$defer(() => delete runs[runJobId])
|
||||||
|
|
||||||
|
const status = await executor({
|
||||||
|
app,
|
||||||
|
cancelToken: token,
|
||||||
|
data: data_,
|
||||||
|
job,
|
||||||
|
logger,
|
||||||
|
runJobId,
|
||||||
|
schedule,
|
||||||
|
session,
|
||||||
|
})
|
||||||
|
|
||||||
|
await logger.notice(
|
||||||
|
`Execution terminated for ${job.id}.`,
|
||||||
|
{
|
||||||
|
event: 'job.end',
|
||||||
|
runJobId,
|
||||||
|
},
|
||||||
|
true
|
||||||
|
)
|
||||||
|
|
||||||
|
app.emit('job:terminated', runJobId, { status, type })
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
await logger.error(
|
await logger.error(
|
||||||
`The execution of ${id} has failed.`,
|
`The execution of ${id} has failed.`,
|
||||||
@ -376,18 +389,6 @@ export default class Jobs {
|
|||||||
},
|
},
|
||||||
true
|
true
|
||||||
)
|
)
|
||||||
const now = Date.now()
|
|
||||||
type === 'backup' &&
|
|
||||||
app.emit('backup:postCall', {
|
|
||||||
...data,
|
|
||||||
duration: now - data.timestamp,
|
|
||||||
error: serializeError(error),
|
|
||||||
timestamp: now,
|
|
||||||
})
|
|
||||||
app.emit('job:terminated', runJobId, {
|
|
||||||
type: job.type,
|
|
||||||
})
|
|
||||||
throw error
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user