Compare commits

...

1 Commits

Author SHA1 Message Date
Julien Fontanet
a4a879ad44 WiP 2024-01-11 09:57:42 +01:00
29 changed files with 191 additions and 320 deletions

View File

@@ -1,4 +1,4 @@
import { Task } from './Task.mjs' import { Task } from '@vates/task'
export class HealthCheckVmBackup { export class HealthCheckVmBackup {
#restoredVm #restoredVm
@@ -14,7 +14,7 @@ export class HealthCheckVmBackup {
async run() { async run() {
return Task.run( return Task.run(
{ {
name: 'vmstart', properties: { name: 'vmstart' },
}, },
async () => { async () => {
let restoredVm = this.#restoredVm let restoredVm = this.#restoredVm

View File

@@ -1,8 +1,8 @@
import assert from 'node:assert' import assert from 'node:assert'
import { Task } from '@vates/task'
import { formatFilenameDate } from './_filenameDate.mjs' import { formatFilenameDate } from './_filenameDate.mjs'
import { importIncrementalVm } from './_incrementalVm.mjs' import { importIncrementalVm } from './_incrementalVm.mjs'
import { Task } from './Task.mjs'
import { watchStreamSize } from './_watchStreamSize.mjs' import { watchStreamSize } from './_watchStreamSize.mjs'
import { VhdNegative, VhdSynthetic } from 'vhd-lib' import { VhdNegative, VhdSynthetic } from 'vhd-lib'
import { decorateClass } from '@vates/decorate-with' import { decorateClass } from '@vates/decorate-with'
@@ -240,7 +240,7 @@ export class ImportVmBackup {
return Task.run( return Task.run(
{ {
name: 'transfer', properties: { name: 'transfer' },
}, },
async () => { async () => {
const xapi = this._xapi const xapi = this._xapi

View File

@@ -1,155 +0,0 @@
import CancelToken from 'promise-toolbox/CancelToken'
import Zone from 'node-zone'
const logAfterEnd = log => {
const error = new Error('task has already ended')
error.log = log
throw error
}
const noop = Function.prototype
const serializeErrors = errors => (Array.isArray(errors) ? errors.map(serializeError) : errors)
// Create a serializable object from an error.
//
// Otherwise some fields might be non-enumerable and missing from logs.
const serializeError = error =>
error instanceof Error
? {
...error, // Copy enumerable properties.
code: error.code,
errors: serializeErrors(error.errors), // supports AggregateError
message: error.message,
name: error.name,
stack: error.stack,
}
: error
const $$task = Symbol('@xen-orchestra/backups/Task')
export class Task {
static get cancelToken() {
const task = Zone.current.data[$$task]
return task !== undefined ? task.#cancelToken : CancelToken.none
}
static run(opts, fn) {
return new this(opts).run(fn, true)
}
static wrapFn(opts, fn) {
// compatibility with @decorateWith
if (typeof fn !== 'function') {
;[fn, opts] = [opts, fn]
}
return function () {
return Task.run(typeof opts === 'function' ? opts.apply(this, arguments) : opts, () => fn.apply(this, arguments))
}
}
#cancelToken
#id = Math.random().toString(36).slice(2)
#onLog
#zone
constructor({ name, data, onLog }) {
let parentCancelToken, parentId
if (onLog === undefined) {
const parent = Zone.current.data[$$task]
if (parent === undefined) {
onLog = noop
} else {
onLog = log => parent.#onLog(log)
parentCancelToken = parent.#cancelToken
parentId = parent.#id
}
}
const zone = Zone.current.fork('@xen-orchestra/backups/Task')
zone.data[$$task] = this
this.#zone = zone
const { cancel, token } = CancelToken.source(parentCancelToken && [parentCancelToken])
this.#cancelToken = token
this.cancel = cancel
this.#onLog = onLog
this.#log('start', {
data,
message: name,
parentId,
})
}
failure(error) {
this.#end('failure', serializeError(error))
}
info(message, data) {
this.#log('info', { data, message })
}
/**
* Run a function in the context of this task
*
* In case of error, the task will be failed.
*
* @typedef Result
* @param {() => Result} fn
* @param {boolean} last - Whether the task should succeed if there is no error
* @returns Result
*/
run(fn, last = false) {
return this.#zone.run(() => {
try {
const result = fn()
let then
if (result != null && typeof (then = result.then) === 'function') {
then.call(result, last && (value => this.success(value)), error => this.failure(error))
} else if (last) {
this.success(result)
}
return result
} catch (error) {
this.failure(error)
throw error
}
})
}
success(value) {
this.#end('success', value)
}
warning(message, data) {
this.#log('warning', { data, message })
}
wrapFn(fn, last) {
const task = this
return function () {
return task.run(() => fn.apply(this, arguments), last)
}
}
#end(status, result) {
this.#log('end', { result, status })
this.#onLog = logAfterEnd
}
#log(event, props) {
this.#onLog({
...props,
event,
taskId: this.#id,
timestamp: Date.now(),
})
}
}
for (const method of ['info', 'warning']) {
Task[method] = (...args) => Zone.current.data[$$task]?.[method](...args)
}

View File

@@ -11,10 +11,10 @@ import { decorateMethodsWith } from '@vates/decorate-with'
import { deduped } from '@vates/disposable/deduped.js' import { deduped } from '@vates/disposable/deduped.js'
import { getHandler } from '@xen-orchestra/fs' import { getHandler } from '@xen-orchestra/fs'
import { parseDuration } from '@vates/parse-duration' import { parseDuration } from '@vates/parse-duration'
import { Task } from '@vates/task'
import { Xapi } from '@xen-orchestra/xapi' import { Xapi } from '@xen-orchestra/xapi'
import { RemoteAdapter } from './RemoteAdapter.mjs' import { RemoteAdapter } from './RemoteAdapter.mjs'
import { Task } from './Task.mjs'
createCachedLookup().patchGlobal() createCachedLookup().patchGlobal()
@@ -154,8 +154,8 @@ process.on('message', async message => {
const result = message.runWithLogs const result = message.runWithLogs
? await Task.run( ? await Task.run(
{ {
name: 'backup run', properties: { name: 'backup run' },
onLog: data => onProgress: data =>
emitMessage({ emitMessage({
data, data,
type: 'log', type: 'log',

View File

@@ -36,32 +36,34 @@ const computeVhdsSize = (handler, vhdPaths) =>
) )
// chain is [ ancestor, child_1, ..., child_n ] // chain is [ ancestor, child_1, ..., child_n ]
async function _mergeVhdChain(handler, chain, { logInfo, remove, mergeBlockConcurrency }) { async function _mergeVhdChain(handler, chain, { logInfo, remove, merge, mergeBlockConcurrency }) {
logInfo(`merging VHD chain`, { chain }) if (merge) {
logInfo(`merging VHD chain`, { chain })
let done, total let done, total
const handle = setInterval(() => { const handle = setInterval(() => {
if (done !== undefined) { if (done !== undefined) {
logInfo('merge in progress', { logInfo('merge in progress', {
done, done,
parent: chain[0], parent: chain[0],
progress: Math.round((100 * done) / total), progress: Math.round((100 * done) / total),
total, total,
})
}
}, 10e3)
try {
return await mergeVhdChain(handler, chain, {
logInfo,
mergeBlockConcurrency,
onProgress({ done: d, total: t }) {
done = d
total = t
},
removeUnused: remove,
}) })
} finally {
clearInterval(handle)
} }
}, 10e3)
try {
return await mergeVhdChain(handler, chain, {
logInfo,
mergeBlockConcurrency,
onProgress({ done: d, total: t }) {
done = d
total = t
},
removeUnused: remove,
})
} finally {
clearInterval(handle)
} }
} }
@@ -469,20 +471,23 @@ export async function cleanVm(
const metadataWithMergedVhd = {} const metadataWithMergedVhd = {}
const doMerge = async () => { const doMerge = async () => {
await asyncMap(toMerge, async chain => { await asyncMap(toMerge, async chain => {
const { finalVhdSize } = await limitedMergeVhdChain(handler, chain, { const merged = await limitedMergeVhdChain(handler, chain, {
logInfo, logInfo,
logWarn, logWarn,
remove, remove,
merge,
mergeBlockConcurrency, mergeBlockConcurrency,
}) })
const metadataPath = vhdsToJSons[chain[chain.length - 1]] // all the chain should have the same metada file if (merged !== undefined) {
metadataWithMergedVhd[metadataPath] = (metadataWithMergedVhd[metadataPath] ?? 0) + finalVhdSize const metadataPath = vhdsToJSons[chain[chain.length - 1]] // all the chain should have the same metada file
metadataWithMergedVhd[metadataPath] = true
}
}) })
} }
await Promise.all([ await Promise.all([
...unusedVhdsDeletion, ...unusedVhdsDeletion,
toMerge.length !== 0 && (merge ? Task.run({ name: 'merge' }, doMerge) : () => Promise.resolve()), toMerge.length !== 0 && (merge ? Task.run({ properties: { name: 'merge' } }, doMerge) : () => Promise.resolve()),
asyncMap(unusedXvas, path => { asyncMap(unusedXvas, path => {
logWarn('unused XVA', { path }) logWarn('unused XVA', { path })
if (remove) { if (remove) {
@@ -504,11 +509,12 @@ export async function cleanVm(
// update size for delta metadata with merged VHD // update size for delta metadata with merged VHD
// check for the other that the size is the same as the real file size // check for the other that the size is the same as the real file size
await asyncMap(jsons, async metadataPath => { await asyncMap(jsons, async metadataPath => {
const metadata = backups.get(metadataPath) const metadata = backups.get(metadataPath)
let fileSystemSize let fileSystemSize
const mergedSize = metadataWithMergedVhd[metadataPath] const merged = metadataWithMergedVhd[metadataPath] !== undefined
const { mode, size, vhds, xva } = metadata const { mode, size, vhds, xva } = metadata
@@ -518,29 +524,26 @@ export async function cleanVm(
const linkedXva = resolve('/', vmDir, xva) const linkedXva = resolve('/', vmDir, xva)
try { try {
fileSystemSize = await handler.getSize(linkedXva) fileSystemSize = await handler.getSize(linkedXva)
if (fileSystemSize !== size && fileSystemSize !== undefined) {
logWarn('cleanVm: incorrect backup size in metadata', {
path: metadataPath,
actual: size ?? 'none',
expected: fileSystemSize,
})
}
} catch (error) { } catch (error) {
// can fail with encrypted remote // can fail with encrypted remote
} }
} else if (mode === 'delta') { } else if (mode === 'delta') {
const linkedVhds = Object.keys(vhds).map(key => resolve('/', vmDir, vhds[key]))
fileSystemSize = await computeVhdsSize(handler, linkedVhds)
// the size is not computed in some cases (e.g. VhdDirectory)
if (fileSystemSize === undefined) {
return
}
// don't warn if the size has changed after a merge // don't warn if the size has changed after a merge
if (mergedSize === undefined) { if (!merged && fileSystemSize !== size) {
const linkedVhds = Object.keys(vhds).map(key => resolve('/', vmDir, vhds[key])) // FIXME: figure out why it occurs so often and, once fixed, log the real problems with `logWarn`
fileSystemSize = await computeVhdsSize(handler, linkedVhds) console.warn('cleanVm: incorrect backup size in metadata', {
// the size is not computed in some cases (e.g. VhdDirectory) path: metadataPath,
if (fileSystemSize !== undefined && fileSystemSize !== size) { actual: size ?? 'none',
logWarn('cleanVm: incorrect backup size in metadata', { expected: fileSystemSize,
path: metadataPath, })
actual: size ?? 'none',
expected: fileSystemSize,
})
}
} }
} }
} catch (error) { } catch (error) {
@@ -548,19 +551,9 @@ export async function cleanVm(
return return
} }
// systematically update size and differentials after a merge // systematically update size after a merge
if ((merged || fixMetadata) && size !== fileSystemSize) {
// @todo : after 2024-04-01 remove the fixmetadata options since the size computation is fixed metadata.size = fileSystemSize
if (mergedSize || (fixMetadata && fileSystemSize !== size)) {
metadata.size = mergedSize ?? fileSystemSize ?? size
if (mergedSize) {
// all disks are now key disk
metadata.isVhdDifferencing = {}
for (const id of Object.values(metadata.vdis ?? {})) {
metadata.isVhdDifferencing[`${id}.vhd`] = false
}
}
mustRegenerateCache = true mustRegenerateCache = true
try { try {
await handler.writeFile(metadataPath, JSON.stringify(metadata), { flags: 'w' }) await handler.writeFile(metadataPath, JSON.stringify(metadata), { flags: 'w' })

View File

@@ -6,9 +6,9 @@ import { CancelToken } from 'promise-toolbox'
import { compareVersions } from 'compare-versions' import { compareVersions } from 'compare-versions'
import { createVhdStreamWithLength } from 'vhd-lib' import { createVhdStreamWithLength } from 'vhd-lib'
import { defer } from 'golike-defer' import { defer } from 'golike-defer'
import { Task } from '@vates/task'
import { cancelableMap } from './_cancelableMap.mjs' import { cancelableMap } from './_cancelableMap.mjs'
import { Task } from './Task.mjs'
import pick from 'lodash/pick.js' import pick from 'lodash/pick.js'
// in `other_config` of an incrementally replicated VM, contains the UUID of the source VM // in `other_config` of an incrementally replicated VM, contains the UUID of the source VM

View File

@@ -1,4 +1,5 @@
import { asyncMap } from '@xen-orchestra/async-map' import { asyncMap } from '@xen-orchestra/async-map'
import { Task } from '@vates/task'
import Disposable from 'promise-toolbox/Disposable' import Disposable from 'promise-toolbox/Disposable'
import ignoreErrors from 'promise-toolbox/ignoreErrors' import ignoreErrors from 'promise-toolbox/ignoreErrors'
@@ -6,7 +7,6 @@ import { extractIdsFromSimplePattern } from '../extractIdsFromSimplePattern.mjs'
import { PoolMetadataBackup } from './_PoolMetadataBackup.mjs' import { PoolMetadataBackup } from './_PoolMetadataBackup.mjs'
import { XoMetadataBackup } from './_XoMetadataBackup.mjs' import { XoMetadataBackup } from './_XoMetadataBackup.mjs'
import { DEFAULT_SETTINGS, Abstract } from './_Abstract.mjs' import { DEFAULT_SETTINGS, Abstract } from './_Abstract.mjs'
import { runTask } from './_runTask.mjs'
import { getAdaptersByRemote } from './_getAdaptersByRemote.mjs' import { getAdaptersByRemote } from './_getAdaptersByRemote.mjs'
const DEFAULT_METADATA_SETTINGS = { const DEFAULT_METADATA_SETTINGS = {
@@ -14,6 +14,8 @@ const DEFAULT_METADATA_SETTINGS = {
retentionXoMetadata: 0, retentionXoMetadata: 0,
} }
const noop = Function.prototype
export const Metadata = class MetadataBackupRunner extends Abstract { export const Metadata = class MetadataBackupRunner extends Abstract {
_computeBaseSettings(config, job) { _computeBaseSettings(config, job) {
const baseSettings = { ...DEFAULT_SETTINGS } const baseSettings = { ...DEFAULT_SETTINGS }
@@ -55,13 +57,16 @@ export const Metadata = class MetadataBackupRunner extends Abstract {
poolIds.map(id => poolIds.map(id =>
this._getRecord('pool', id).catch(error => { this._getRecord('pool', id).catch(error => {
// See https://github.com/vatesfr/xen-orchestra/commit/6aa6cfba8ec939c0288f0fa740f6dfad98c43cbb // See https://github.com/vatesfr/xen-orchestra/commit/6aa6cfba8ec939c0288f0fa740f6dfad98c43cbb
runTask( new Task(
{ {
name: 'get pool record', properties: {
data: { type: 'pool', id }, id,
name: 'get pool record',
type: 'pool',
},
}, },
() => Promise.reject(error) () => Promise.reject(error)
) ).catch(noop)
}) })
) )
), ),
@@ -81,11 +86,11 @@ export const Metadata = class MetadataBackupRunner extends Abstract {
if (pools.length !== 0 && settings.retentionPoolMetadata !== 0) { if (pools.length !== 0 && settings.retentionPoolMetadata !== 0) {
promises.push( promises.push(
asyncMap(pools, async pool => asyncMap(pools, async pool =>
runTask( new Task(
{ {
name: `Starting metadata backup for the pool (${pool.$id}). (${job.id})`, properties: {
data: {
id: pool.$id, id: pool.$id,
name: `Starting metadata backup for the pool (${pool.$id}). (${job.id})`,
pool, pool,
poolMaster: await ignoreErrors.call(pool.$xapi.getRecord('host', pool.master)), poolMaster: await ignoreErrors.call(pool.$xapi.getRecord('host', pool.master)),
type: 'pool', type: 'pool',
@@ -100,17 +105,17 @@ export const Metadata = class MetadataBackupRunner extends Abstract {
schedule, schedule,
settings, settings,
}).run() }).run()
) ).catch(noop)
) )
) )
} }
if (job.xoMetadata !== undefined && settings.retentionXoMetadata !== 0) { if (job.xoMetadata !== undefined && settings.retentionXoMetadata !== 0) {
promises.push( promises.push(
runTask( new Task(
{ {
name: `Starting XO metadata backup. (${job.id})`, properties: {
data: { name: `Starting XO metadata backup. (${job.id})`,
type: 'xo', type: 'xo',
}, },
}, },
@@ -122,7 +127,7 @@ export const Metadata = class MetadataBackupRunner extends Abstract {
schedule, schedule,
settings, settings,
}).run() }).run()
) ).catch(noop)
) )
} }
await Promise.all(promises) await Promise.all(promises)

View File

@@ -1,12 +1,11 @@
import { asyncMapSettled } from '@xen-orchestra/async-map' import { asyncMapSettled } from '@xen-orchestra/async-map'
import Disposable from 'promise-toolbox/Disposable' import Disposable from 'promise-toolbox/Disposable'
import { limitConcurrency } from 'limit-concurrency-decorator' import { limitConcurrency } from 'limit-concurrency-decorator'
import { Task } from '@vates/task'
import { extractIdsFromSimplePattern } from '../extractIdsFromSimplePattern.mjs' import { extractIdsFromSimplePattern } from '../extractIdsFromSimplePattern.mjs'
import { Task } from '../Task.mjs'
import createStreamThrottle from './_createStreamThrottle.mjs' import createStreamThrottle from './_createStreamThrottle.mjs'
import { DEFAULT_SETTINGS, Abstract } from './_Abstract.mjs' import { DEFAULT_SETTINGS, Abstract } from './_Abstract.mjs'
import { runTask } from './_runTask.mjs'
import { getAdaptersByRemote } from './_getAdaptersByRemote.mjs' import { getAdaptersByRemote } from './_getAdaptersByRemote.mjs'
import { FullRemote } from './_vmRunners/FullRemote.mjs' import { FullRemote } from './_vmRunners/FullRemote.mjs'
import { IncrementalRemote } from './_vmRunners/IncrementalRemote.mjs' import { IncrementalRemote } from './_vmRunners/IncrementalRemote.mjs'
@@ -25,6 +24,8 @@ const DEFAULT_REMOTE_VM_SETTINGS = {
vmTimeout: 0, vmTimeout: 0,
} }
const noop = Function.prototype
export const VmsRemote = class RemoteVmsBackupRunner extends Abstract { export const VmsRemote = class RemoteVmsBackupRunner extends Abstract {
_computeBaseSettings(config, job) { _computeBaseSettings(config, job) {
const baseSettings = { ...DEFAULT_SETTINGS } const baseSettings = { ...DEFAULT_SETTINGS }
@@ -63,7 +64,13 @@ export const VmsRemote = class RemoteVmsBackupRunner extends Abstract {
const baseSettings = this._baseSettings const baseSettings = this._baseSettings
const handleVm = vmUuid => { const handleVm = vmUuid => {
const taskStart = { name: 'backup VM', data: { type: 'VM', id: vmUuid } } const taskStart = {
properties: {
id: vmUuid,
name: 'backup VM',
type: 'VM',
},
}
const opts = { const opts = {
baseSettings, baseSettings,
@@ -86,7 +93,7 @@ export const VmsRemote = class RemoteVmsBackupRunner extends Abstract {
throw new Error(`Job mode ${job.mode} not implemented for mirror backup`) throw new Error(`Job mode ${job.mode} not implemented for mirror backup`)
} }
return runTask(taskStart, () => vmBackup.run()) return new Task(taskStart, () => vmBackup.run()).catch(noop)
} }
const { concurrency } = settings const { concurrency } = settings
await asyncMapSettled(vmsUuids, !concurrency ? handleVm : limitConcurrency(concurrency)(handleVm)) await asyncMapSettled(vmsUuids, !concurrency ? handleVm : limitConcurrency(concurrency)(handleVm))

View File

@@ -1,12 +1,11 @@
import { asyncMapSettled } from '@xen-orchestra/async-map' import { asyncMapSettled } from '@xen-orchestra/async-map'
import Disposable from 'promise-toolbox/Disposable' import Disposable from 'promise-toolbox/Disposable'
import { limitConcurrency } from 'limit-concurrency-decorator' import { limitConcurrency } from 'limit-concurrency-decorator'
import { Task } from '@vates/task'
import { extractIdsFromSimplePattern } from '../extractIdsFromSimplePattern.mjs' import { extractIdsFromSimplePattern } from '../extractIdsFromSimplePattern.mjs'
import { Task } from '../Task.mjs'
import createStreamThrottle from './_createStreamThrottle.mjs' import createStreamThrottle from './_createStreamThrottle.mjs'
import { DEFAULT_SETTINGS, Abstract } from './_Abstract.mjs' import { DEFAULT_SETTINGS, Abstract } from './_Abstract.mjs'
import { runTask } from './_runTask.mjs'
import { getAdaptersByRemote } from './_getAdaptersByRemote.mjs' import { getAdaptersByRemote } from './_getAdaptersByRemote.mjs'
import { IncrementalXapi } from './_vmRunners/IncrementalXapi.mjs' import { IncrementalXapi } from './_vmRunners/IncrementalXapi.mjs'
import { FullXapi } from './_vmRunners/FullXapi.mjs' import { FullXapi } from './_vmRunners/FullXapi.mjs'
@@ -34,6 +33,8 @@ const DEFAULT_XAPI_VM_SETTINGS = {
vmTimeout: 0, vmTimeout: 0,
} }
const noop = Function.prototype
export const VmsXapi = class VmsXapiBackupRunner extends Abstract { export const VmsXapi = class VmsXapiBackupRunner extends Abstract {
_computeBaseSettings(config, job) { _computeBaseSettings(config, job) {
const baseSettings = { ...DEFAULT_SETTINGS } const baseSettings = { ...DEFAULT_SETTINGS }
@@ -57,13 +58,16 @@ export const VmsXapi = class VmsXapiBackupRunner extends Abstract {
Disposable.all( Disposable.all(
extractIdsFromSimplePattern(job.srs).map(id => extractIdsFromSimplePattern(job.srs).map(id =>
this._getRecord('SR', id).catch(error => { this._getRecord('SR', id).catch(error => {
runTask( new Task(
{ {
name: 'get SR record', properties: {
data: { type: 'SR', id }, id,
name: 'get SR record',
type: 'SR',
},
}, },
() => Promise.reject(error) () => Promise.reject(error)
) ).catch(noop)
}) })
) )
), ),
@@ -90,13 +94,19 @@ export const VmsXapi = class VmsXapiBackupRunner extends Abstract {
const baseSettings = this._baseSettings const baseSettings = this._baseSettings
const handleVm = vmUuid => { const handleVm = vmUuid => {
const taskStart = { name: 'backup VM', data: { type: 'VM', id: vmUuid } } const taskStart = {
properties: {
id: vmUuid,
name: 'backup VM',
type: 'VM',
},
}
return this._getRecord('VM', vmUuid).then( return this._getRecord('VM', vmUuid).then(
disposableVm => disposableVm =>
Disposable.use(disposableVm, vm => { Disposable.use(disposableVm, vm => {
taskStart.data.name_label = vm.name_label taskStart.data.name_label = vm.name_label
return runTask(taskStart, () => { return new Task()(taskStart, () => {
const opts = { const opts = {
baseSettings, baseSettings,
config, config,
@@ -121,12 +131,12 @@ export const VmsXapi = class VmsXapiBackupRunner extends Abstract {
} }
} }
return vmBackup.run() return vmBackup.run()
}) }).catch(noop)
}), }),
error => error =>
runTask(taskStart, () => { new Task(taskStart, () => {
throw error throw error
}) }).catch(noop)
) )
} }
const { concurrency } = settings const { concurrency } = settings

View File

@@ -1,9 +1,12 @@
import Disposable from 'promise-toolbox/Disposable' import Disposable from 'promise-toolbox/Disposable'
import pTimeout from 'promise-toolbox/timeout' import pTimeout from 'promise-toolbox/timeout'
import { compileTemplate } from '@xen-orchestra/template' import { compileTemplate } from '@xen-orchestra/template'
import { runTask } from './_runTask.mjs' import { Task } from '@vates/task'
import { RemoteTimeoutError } from './_RemoteTimeoutError.mjs' import { RemoteTimeoutError } from './_RemoteTimeoutError.mjs'
const noop = Function.prototype
export const DEFAULT_SETTINGS = { export const DEFAULT_SETTINGS = {
getRemoteTimeout: 300e3, getRemoteTimeout: 300e3,
reportWhen: 'failure', reportWhen: 'failure',
@@ -36,13 +39,16 @@ export const Abstract = class AbstractRunner {
}) })
} catch (error) { } catch (error) {
// See https://github.com/vatesfr/xen-orchestra/commit/6aa6cfba8ec939c0288f0fa740f6dfad98c43cbb // See https://github.com/vatesfr/xen-orchestra/commit/6aa6cfba8ec939c0288f0fa740f6dfad98c43cbb
runTask( Task.run(
{ {
name: 'get remote adapter', properties: {
data: { type: 'remote', id: remoteId }, id: remoteId,
name: 'get remote adapter',
type: 'remote',
},
}, },
() => Promise.reject(error) () => Promise.reject(error)
) ).catch(noop)
} }
} }
} }

View File

@@ -1,9 +1,9 @@
import { asyncMap } from '@xen-orchestra/async-map' import { asyncMap } from '@xen-orchestra/async-map'
import { Task } from '@vates/task'
import { DIR_XO_POOL_METADATA_BACKUPS } from '../RemoteAdapter.mjs' import { DIR_XO_POOL_METADATA_BACKUPS } from '../RemoteAdapter.mjs'
import { forkStreamUnpipe } from './_forkStreamUnpipe.mjs' import { forkStreamUnpipe } from './_forkStreamUnpipe.mjs'
import { formatFilenameDate } from '../_filenameDate.mjs' import { formatFilenameDate } from '../_filenameDate.mjs'
import { Task } from '../Task.mjs'
export const PATH_DB_DUMP = '/pool/xmldbdump' export const PATH_DB_DUMP = '/pool/xmldbdump'
@@ -54,8 +54,8 @@ export class PoolMetadataBackup {
([remoteId, adapter]) => ([remoteId, adapter]) =>
Task.run( Task.run(
{ {
name: `Starting metadata backup for the pool (${pool.$id}) for the remote (${remoteId}). (${job.id})`, properties: {
data: { name: `Starting metadata backup for the pool (${pool.$id}) for the remote (${remoteId}). (${job.id})`,
id: remoteId, id: remoteId,
type: 'remote', type: 'remote',
}, },

View File

@@ -1,9 +1,9 @@
import { asyncMap } from '@xen-orchestra/async-map' import { asyncMap } from '@xen-orchestra/async-map'
import { join } from '@xen-orchestra/fs/path' import { join } from '@xen-orchestra/fs/path'
import { Task } from '@vates/task'
import { DIR_XO_CONFIG_BACKUPS } from '../RemoteAdapter.mjs' import { DIR_XO_CONFIG_BACKUPS } from '../RemoteAdapter.mjs'
import { formatFilenameDate } from '../_filenameDate.mjs' import { formatFilenameDate } from '../_filenameDate.mjs'
import { Task } from '../Task.mjs'
export class XoMetadataBackup { export class XoMetadataBackup {
constructor({ config, job, remoteAdapters, schedule, settings }) { constructor({ config, job, remoteAdapters, schedule, settings }) {
@@ -51,8 +51,8 @@ export class XoMetadataBackup {
([remoteId, adapter]) => ([remoteId, adapter]) =>
Task.run( Task.run(
{ {
name: `Starting XO metadata backup for the remote (${remoteId}). (${job.id})`, properties: {
data: { name: `Starting XO metadata backup for the remote (${remoteId}). (${job.id})`,
id: remoteId, id: remoteId,
type: 'remote', type: 'remote',
}, },

View File

@@ -1,5 +0,0 @@
import { Task } from '../Task.mjs'
const noop = Function.prototype
export const runTask = (...args) => Task.run(...args).catch(noop) // errors are handled by logs

View File

@@ -1,10 +1,11 @@
import { decorateMethodsWith } from '@vates/decorate-with' import { decorateMethodsWith } from '@vates/decorate-with'
import { defer } from 'golike-defer' import { defer } from 'golike-defer'
import { Task } from '@vates/task'
import { AbstractRemote } from './_AbstractRemote.mjs' import { AbstractRemote } from './_AbstractRemote.mjs'
import { FullRemoteWriter } from '../_writers/FullRemoteWriter.mjs' import { FullRemoteWriter } from '../_writers/FullRemoteWriter.mjs'
import { forkStreamUnpipe } from '../_forkStreamUnpipe.mjs' import { forkStreamUnpipe } from '../_forkStreamUnpipe.mjs'
import { watchStreamSize } from '../../_watchStreamSize.mjs' import { watchStreamSize } from '../../_watchStreamSize.mjs'
import { Task } from '../../Task.mjs'
export const FullRemote = class FullRemoteVmBackupRunner extends AbstractRemote { export const FullRemote = class FullRemoteVmBackupRunner extends AbstractRemote {
_getRemoteWriter() { _getRemoteWriter() {

View File

@@ -1,6 +1,7 @@
import { asyncEach } from '@vates/async-each' import { asyncEach } from '@vates/async-each'
import { decorateMethodsWith } from '@vates/decorate-with' import { decorateMethodsWith } from '@vates/decorate-with'
import { defer } from 'golike-defer' import { defer } from 'golike-defer'
import { Task } from '@vates/task'
import assert from 'node:assert' import assert from 'node:assert'
import isVhdDifferencingDisk from 'vhd-lib/isVhdDifferencingDisk.js' import isVhdDifferencingDisk from 'vhd-lib/isVhdDifferencingDisk.js'
import mapValues from 'lodash/mapValues.js' import mapValues from 'lodash/mapValues.js'
@@ -8,7 +9,6 @@ import mapValues from 'lodash/mapValues.js'
import { AbstractRemote } from './_AbstractRemote.mjs' import { AbstractRemote } from './_AbstractRemote.mjs'
import { forkDeltaExport } from './_forkDeltaExport.mjs' import { forkDeltaExport } from './_forkDeltaExport.mjs'
import { IncrementalRemoteWriter } from '../_writers/IncrementalRemoteWriter.mjs' import { IncrementalRemoteWriter } from '../_writers/IncrementalRemoteWriter.mjs'
import { Task } from '../../Task.mjs'
class IncrementalRemoteVmBackupRunner extends AbstractRemote { class IncrementalRemoteVmBackupRunner extends AbstractRemote {
_getRemoteWriter() { _getRemoteWriter() {

View File

@@ -2,6 +2,7 @@ import { asyncEach } from '@vates/async-each'
import { asyncMap } from '@xen-orchestra/async-map' import { asyncMap } from '@xen-orchestra/async-map'
import { createLogger } from '@xen-orchestra/log' import { createLogger } from '@xen-orchestra/log'
import { pipeline } from 'node:stream' import { pipeline } from 'node:stream'
import { Task } from '@vates/task'
import findLast from 'lodash/findLast.js' import findLast from 'lodash/findLast.js'
import isVhdDifferencingDisk from 'vhd-lib/isVhdDifferencingDisk.js' import isVhdDifferencingDisk from 'vhd-lib/isVhdDifferencingDisk.js'
import keyBy from 'lodash/keyBy.js' import keyBy from 'lodash/keyBy.js'
@@ -13,7 +14,6 @@ import { exportIncrementalVm } from '../../_incrementalVm.mjs'
import { forkDeltaExport } from './_forkDeltaExport.mjs' import { forkDeltaExport } from './_forkDeltaExport.mjs'
import { IncrementalRemoteWriter } from '../_writers/IncrementalRemoteWriter.mjs' import { IncrementalRemoteWriter } from '../_writers/IncrementalRemoteWriter.mjs'
import { IncrementalXapiWriter } from '../_writers/IncrementalXapiWriter.mjs' import { IncrementalXapiWriter } from '../_writers/IncrementalXapiWriter.mjs'
import { Task } from '../../Task.mjs'
import { watchStreamSize } from '../../_watchStreamSize.mjs' import { watchStreamSize } from '../../_watchStreamSize.mjs'
const { debug } = createLogger('xo:backups:IncrementalXapiVmBackup') const { debug } = createLogger('xo:backups:IncrementalXapiVmBackup')

View File

@@ -1,6 +1,6 @@
import { asyncMap } from '@xen-orchestra/async-map' import { asyncMap } from '@xen-orchestra/async-map'
import { createLogger } from '@xen-orchestra/log' import { createLogger } from '@xen-orchestra/log'
import { Task } from '../../Task.mjs' import { Task } from '@vates/task'
const { debug, warn } = createLogger('xo:backups:AbstractVmRunner') const { debug, warn } = createLogger('xo:backups:AbstractVmRunner')
@@ -80,7 +80,7 @@ export const Abstract = class AbstractVmBackupRunner {
// create a task to have an info in the logs and reports // create a task to have an info in the logs and reports
return Task.run( return Task.run(
{ {
name: 'health check', properties: { name: 'health check' },
}, },
() => { () => {
Task.info(`This VM doesn't match the health check's tags for this schedule`) Task.info(`This VM doesn't match the health check's tags for this schedule`)

View File

@@ -5,9 +5,9 @@ import { asyncMap } from '@xen-orchestra/async-map'
import { decorateMethodsWith } from '@vates/decorate-with' import { decorateMethodsWith } from '@vates/decorate-with'
import { defer } from 'golike-defer' import { defer } from 'golike-defer'
import { formatDateTime } from '@xen-orchestra/xapi' import { formatDateTime } from '@xen-orchestra/xapi'
import { Task } from '@vates/task'
import { getOldEntries } from '../../_getOldEntries.mjs' import { getOldEntries } from '../../_getOldEntries.mjs'
import { Task } from '../../Task.mjs'
import { Abstract } from './_Abstract.mjs' import { Abstract } from './_Abstract.mjs'
export const AbstractXapi = class AbstractXapiVmBackupRunner extends Abstract { export const AbstractXapi = class AbstractXapiVmBackupRunner extends Abstract {
@@ -142,7 +142,7 @@ export const AbstractXapi = class AbstractXapiVmBackupRunner extends Abstract {
const settings = this._settings const settings = this._settings
if (this._mustDoSnapshot()) { if (this._mustDoSnapshot()) {
await Task.run({ name: 'snapshot' }, async () => { await Task.run({ properties: { name: 'snapshot' } }, async () => {
if (!settings.bypassVdiChainsCheck) { if (!settings.bypassVdiChainsCheck) {
await vm.$assertHealthyVdiChains() await vm.$assertHealthyVdiChains()
} }

View File

@@ -1,6 +1,7 @@
import { Task } from '@vates/task'
import { formatFilenameDate } from '../../_filenameDate.mjs' import { formatFilenameDate } from '../../_filenameDate.mjs'
import { getOldEntries } from '../../_getOldEntries.mjs' import { getOldEntries } from '../../_getOldEntries.mjs'
import { Task } from '../../Task.mjs'
import { MixinRemoteWriter } from './_MixinRemoteWriter.mjs' import { MixinRemoteWriter } from './_MixinRemoteWriter.mjs'
import { AbstractFullWriter } from './_AbstractFullWriter.mjs' import { AbstractFullWriter } from './_AbstractFullWriter.mjs'
@@ -9,10 +10,10 @@ export class FullRemoteWriter extends MixinRemoteWriter(AbstractFullWriter) {
constructor(props) { constructor(props) {
super(props) super(props)
this.run = Task.wrapFn( this.run = Task.wrap(
{ {
name: 'export', properties: {
data: { name: 'export',
id: props.remoteId, id: props.remoteId,
type: 'remote', type: 'remote',
@@ -63,7 +64,7 @@ export class FullRemoteWriter extends MixinRemoteWriter(AbstractFullWriter) {
await deleteOldBackups() await deleteOldBackups()
} }
await Task.run({ name: 'transfer' }, async () => { await Task.run({ properties: { name: 'transfer' } }, async () => {
await adapter.outputStream(dataFilename, stream, { await adapter.outputStream(dataFilename, stream, {
maxStreamLength, maxStreamLength,
streamLength, streamLength,

View File

@@ -1,10 +1,10 @@
import ignoreErrors from 'promise-toolbox/ignoreErrors' import ignoreErrors from 'promise-toolbox/ignoreErrors'
import { asyncMap, asyncMapSettled } from '@xen-orchestra/async-map' import { asyncMap, asyncMapSettled } from '@xen-orchestra/async-map'
import { formatDateTime } from '@xen-orchestra/xapi' import { formatDateTime } from '@xen-orchestra/xapi'
import { Task } from '@vates/task'
import { formatFilenameDate } from '../../_filenameDate.mjs' import { formatFilenameDate } from '../../_filenameDate.mjs'
import { getOldEntries } from '../../_getOldEntries.mjs' import { getOldEntries } from '../../_getOldEntries.mjs'
import { Task } from '../../Task.mjs'
import { AbstractFullWriter } from './_AbstractFullWriter.mjs' import { AbstractFullWriter } from './_AbstractFullWriter.mjs'
import { MixinXapiWriter } from './_MixinXapiWriter.mjs' import { MixinXapiWriter } from './_MixinXapiWriter.mjs'
@@ -14,10 +14,10 @@ export class FullXapiWriter extends MixinXapiWriter(AbstractFullWriter) {
constructor(props) { constructor(props) {
super(props) super(props)
this.run = Task.wrapFn( this.run = Task.wrap(
{ {
name: 'export', properties: {
data: { name: 'export',
id: props.sr.uuid, id: props.sr.uuid,
name_label: this._sr.name_label, name_label: this._sr.name_label,
type: 'SR', type: 'SR',
@@ -52,7 +52,7 @@ export class FullXapiWriter extends MixinXapiWriter(AbstractFullWriter) {
} }
let targetVmRef let targetVmRef
await Task.run({ name: 'transfer' }, async () => { await Task.run({ properties: { name: 'transfer' } }, async () => {
targetVmRef = await xapi.VM_import(stream, sr.$ref, vm => targetVmRef = await xapi.VM_import(stream, sr.$ref, vm =>
Promise.all([ Promise.all([
!_warmMigration && vm.add_tags('Disaster Recovery'), !_warmMigration && vm.add_tags('Disaster Recovery'),

View File

@@ -8,11 +8,11 @@ import { createLogger } from '@xen-orchestra/log'
import { decorateClass } from '@vates/decorate-with' import { decorateClass } from '@vates/decorate-with'
import { defer } from 'golike-defer' import { defer } from 'golike-defer'
import { dirname } from 'node:path' import { dirname } from 'node:path'
import { Task } from '@vates/task'
import { formatFilenameDate } from '../../_filenameDate.mjs' import { formatFilenameDate } from '../../_filenameDate.mjs'
import { getOldEntries } from '../../_getOldEntries.mjs' import { getOldEntries } from '../../_getOldEntries.mjs'
import { TAG_BASE_DELTA } from '../../_incrementalVm.mjs' import { TAG_BASE_DELTA } from '../../_incrementalVm.mjs'
import { Task } from '../../Task.mjs'
import { MixinRemoteWriter } from './_MixinRemoteWriter.mjs' import { MixinRemoteWriter } from './_MixinRemoteWriter.mjs'
import { AbstractIncrementalWriter } from './_AbstractIncrementalWriter.mjs' import { AbstractIncrementalWriter } from './_AbstractIncrementalWriter.mjs'
@@ -71,17 +71,17 @@ export class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrement
prepare({ isFull }) { prepare({ isFull }) {
// create the task related to this export and ensure all methods are called in this context // create the task related to this export and ensure all methods are called in this context
const task = new Task({ const task = new Task({
name: 'export', properties: {
data: { name: 'export',
id: this._remoteId, id: this._remoteId,
isFull, isFull,
type: 'remote', type: 'remote',
}, },
}) })
this.transfer = task.wrapFn(this.transfer) this.transfer = task.wrapInside(this.transfer)
this.healthCheck = task.wrapFn(this.healthCheck) this.healthCheck = task.wrapInside(this.healthCheck)
this.cleanup = task.wrapFn(this.cleanup) this.cleanup = task.wrapInside(this.cleanup)
this.afterBackup = task.wrapFn(this.afterBackup, true) this.afterBackup = task.wrap(this.afterBackup)
return task.run(() => this._prepare()) return task.run(() => this._prepare())
} }
@@ -174,7 +174,7 @@ export class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrement
vm, vm,
vmSnapshot, vmSnapshot,
} }
const { size } = await Task.run({ name: 'transfer' }, async () => { const { size } = await Task.run({ properties: { name: 'transfer' } }, async () => {
let transferSize = 0 let transferSize = 0
await asyncEach( await asyncEach(
Object.entries(deltaExport.vdis), Object.entries(deltaExport.vdis),

View File

@@ -1,11 +1,11 @@
import { asyncMap, asyncMapSettled } from '@xen-orchestra/async-map' import { asyncMap, asyncMapSettled } from '@xen-orchestra/async-map'
import ignoreErrors from 'promise-toolbox/ignoreErrors' import ignoreErrors from 'promise-toolbox/ignoreErrors'
import { formatDateTime } from '@xen-orchestra/xapi' import { formatDateTime } from '@xen-orchestra/xapi'
import { Task } from '@vates/task'
import { formatFilenameDate } from '../../_filenameDate.mjs' import { formatFilenameDate } from '../../_filenameDate.mjs'
import { getOldEntries } from '../../_getOldEntries.mjs' import { getOldEntries } from '../../_getOldEntries.mjs'
import { importIncrementalVm, TAG_BACKUP_SR, TAG_BASE_DELTA, TAG_COPY_SRC } from '../../_incrementalVm.mjs' import { importIncrementalVm, TAG_BACKUP_SR, TAG_BASE_DELTA, TAG_COPY_SRC } from '../../_incrementalVm.mjs'
import { Task } from '../../Task.mjs'
import { AbstractIncrementalWriter } from './_AbstractIncrementalWriter.mjs' import { AbstractIncrementalWriter } from './_AbstractIncrementalWriter.mjs'
import { MixinXapiWriter } from './_MixinXapiWriter.mjs' import { MixinXapiWriter } from './_MixinXapiWriter.mjs'
@@ -40,18 +40,21 @@ export class IncrementalXapiWriter extends MixinXapiWriter(AbstractIncrementalWr
prepare({ isFull }) { prepare({ isFull }) {
// create the task related to this export and ensure all methods are called in this context // create the task related to this export and ensure all methods are called in this context
const task = new Task({ const task = new Task({
name: 'export', properties: {
data: { name: 'export',
id: this._sr.uuid, id: this._sr.uuid,
isFull, isFull,
name_label: this._sr.name_label, name_label: this._sr.name_label,
type: 'SR', type: 'SR',
}, },
}) })
const hasHealthCheckSr = this._healthCheckSr !== undefined this.transfer = task.wrapInside(this.transfer)
this.transfer = task.wrapFn(this.transfer) if (this._healthCheckSr !== undefined) {
this.cleanup = task.wrapFn(this.cleanup, !hasHealthCheckSr) this.cleanup = task.wrapInside(this.cleanup)
this.healthCheck = task.wrapFn(this.healthCheck, hasHealthCheckSr) this.healthCheck = task.wrap(this.healthCheck)
} else {
this.cleanup = task.wrap(this.cleanup)
}
return task.run(() => this._prepare()) return task.run(() => this._prepare())
} }
@@ -139,7 +142,7 @@ export class IncrementalXapiWriter extends MixinXapiWriter(AbstractIncrementalWr
const { uuid: srUuid, $xapi: xapi } = sr const { uuid: srUuid, $xapi: xapi } = sr
let targetVmRef let targetVmRef
await Task.run({ name: 'transfer' }, async () => { await Task.run({ properties: { name: 'transfer' } }, async () => {
targetVmRef = await importIncrementalVm(this.#decorateVmMetadata(deltaExport), sr) targetVmRef = await importIncrementalVm(this.#decorateVmMetadata(deltaExport), sr)
return { return {
size: Object.values(sizeContainers).reduce((sum, { size }) => sum + size, 0), size: Object.values(sizeContainers).reduce((sum, { size }) => sum + size, 0),

View File

@@ -1,12 +1,12 @@
import { createLogger } from '@xen-orchestra/log' import { createLogger } from '@xen-orchestra/log'
import { join } from 'node:path' import { join } from 'node:path'
import { Task } from '@vates/task'
import assert from 'node:assert' import assert from 'node:assert'
import { formatFilenameDate } from '../../_filenameDate.mjs' import { formatFilenameDate } from '../../_filenameDate.mjs'
import { getVmBackupDir } from '../../_getVmBackupDir.mjs' import { getVmBackupDir } from '../../_getVmBackupDir.mjs'
import { HealthCheckVmBackup } from '../../HealthCheckVmBackup.mjs' import { HealthCheckVmBackup } from '../../HealthCheckVmBackup.mjs'
import { ImportVmBackup } from '../../ImportVmBackup.mjs' import { ImportVmBackup } from '../../ImportVmBackup.mjs'
import { Task } from '../../Task.mjs'
import * as MergeWorker from '../../merge-worker/index.mjs' import * as MergeWorker from '../../merge-worker/index.mjs'
const { info, warn } = createLogger('xo:backups:MixinBackupWriter') const { info, warn } = createLogger('xo:backups:MixinBackupWriter')
@@ -26,7 +26,7 @@ export const MixinRemoteWriter = (BaseClass = Object) =>
async _cleanVm(options) { async _cleanVm(options) {
try { try {
return await Task.run({ name: 'clean-vm' }, () => { return await Task.run({ properties: { name: 'clean-vm' } }, () => {
return this._adapter.cleanVm(this._vmBackupDir, { return this._adapter.cleanVm(this._vmBackupDir, {
...options, ...options,
fixMetadata: true, fixMetadata: true,
@@ -84,7 +84,7 @@ export const MixinRemoteWriter = (BaseClass = Object) =>
) )
return Task.run( return Task.run(
{ {
name: 'health check', properties: { name: 'health check' },
}, },
async () => { async () => {
const xapi = sr.$xapi const xapi = sr.$xapi

View File

@@ -1,8 +1,8 @@
import { extractOpaqueRef } from '@xen-orchestra/xapi' import { extractOpaqueRef } from '@xen-orchestra/xapi'
import { Task } from '@vates/task'
import assert from 'node:assert/strict' import assert from 'node:assert/strict'
import { HealthCheckVmBackup } from '../../HealthCheckVmBackup.mjs' import { HealthCheckVmBackup } from '../../HealthCheckVmBackup.mjs'
import { Task } from '../../Task.mjs'
export const MixinXapiWriter = (BaseClass = Object) => export const MixinXapiWriter = (BaseClass = Object) =>
class MixinXapiWriter extends BaseClass { class MixinXapiWriter extends BaseClass {
@@ -32,7 +32,7 @@ export const MixinXapiWriter = (BaseClass = Object) =>
// copy VM // copy VM
return Task.run( return Task.run(
{ {
name: 'health check', properties: { name: 'health check' },
}, },
async () => { async () => {
const { $xapi: xapi } = sr const { $xapi: xapi } = sr
@@ -42,7 +42,7 @@ export const MixinXapiWriter = (BaseClass = Object) =>
if (await this.#isAlreadyOnHealthCheckSr(baseVm)) { if (await this.#isAlreadyOnHealthCheckSr(baseVm)) {
healthCheckVmRef = await Task.run( healthCheckVmRef = await Task.run(
{ name: 'cloning-vm' }, { properties: { name: 'cloning-vm' } },
async () => async () =>
await xapi await xapi
.callAsync('VM.clone', this._targetVmRef, `Health Check - ${baseVm.name_label}`) .callAsync('VM.clone', this._targetVmRef, `Health Check - ${baseVm.name_label}`)
@@ -50,7 +50,7 @@ export const MixinXapiWriter = (BaseClass = Object) =>
) )
} else { } else {
healthCheckVmRef = await Task.run( healthCheckVmRef = await Task.run(
{ name: 'copying-vm' }, { properties: { name: 'copying-vm' } },
async () => async () =>
await xapi await xapi
.callAsync('VM.copy', this._targetVmRef, `Health Check - ${baseVm.name_label}`, sr.$ref) .callAsync('VM.copy', this._targetVmRef, `Health Check - ${baseVm.name_label}`, sr.$ref)

View File

@@ -27,6 +27,7 @@
"@vates/fuse-vhd": "^2.0.0", "@vates/fuse-vhd": "^2.0.0",
"@vates/nbd-client": "^3.0.0", "@vates/nbd-client": "^3.0.0",
"@vates/parse-duration": "^0.1.1", "@vates/parse-duration": "^0.1.1",
"@vates/task": "^0.2.0",
"@xen-orchestra/async-map": "^0.1.2", "@xen-orchestra/async-map": "^0.1.2",
"@xen-orchestra/fs": "^4.1.3", "@xen-orchestra/fs": "^4.1.3",
"@xen-orchestra/log": "^0.6.0", "@xen-orchestra/log": "^0.6.0",

View File

@@ -15,7 +15,7 @@ import { Readable } from 'stream'
import { RemoteAdapter } from '@xen-orchestra/backups/RemoteAdapter.mjs' import { RemoteAdapter } from '@xen-orchestra/backups/RemoteAdapter.mjs'
import { RestoreMetadataBackup } from '@xen-orchestra/backups/RestoreMetadataBackup.mjs' import { RestoreMetadataBackup } from '@xen-orchestra/backups/RestoreMetadataBackup.mjs'
import { runBackupWorker } from '@xen-orchestra/backups/runBackupWorker.mjs' import { runBackupWorker } from '@xen-orchestra/backups/runBackupWorker.mjs'
import { Task } from '@xen-orchestra/backups/Task.mjs' import { Task } from '@vates/task'
import { Xapi } from '@xen-orchestra/xapi' import { Xapi } from '@xen-orchestra/xapi'
const noop = Function.prototype const noop = Function.prototype
@@ -122,15 +122,15 @@ export default class Backups {
try { try {
await Task.run( await Task.run(
{ {
name: 'backup run', properties: {
data: {
jobId: job.id, jobId: job.id,
jobName: job.name, jobName: job.name,
mode: job.mode, mode: job.mode,
name: 'backup run',
reportWhen: job.settings['']?.reportWhen, reportWhen: job.settings['']?.reportWhen,
scheduleId: schedule.id, scheduleId: schedule.id,
}, },
onLog, onProgress: onLog,
}, },
() => run(params) () => run(params)
) )
@@ -205,14 +205,14 @@ export default class Backups {
async (args, onLog) => async (args, onLog) =>
Task.run( Task.run(
{ {
data: { properties: {
backupId, backupId,
jobId: metadata.jobId, jobId: metadata.jobId,
name: 'restore',
srId: srUuid, srId: srUuid,
time: metadata.timestamp, time: metadata.timestamp,
}, },
name: 'restore', onProgress: onLog,
onLog,
}, },
run run
).catch(() => {}), // errors are handled by logs, ).catch(() => {}), // errors are handled by logs,
@@ -344,12 +344,14 @@ export default class Backups {
({ backupId, remote, xapi: xapiOptions }) => ({ backupId, remote, xapi: xapiOptions }) =>
Disposable.use(app.remotes.getHandler(remote), xapiOptions && this.getXapi(xapiOptions), (handler, xapi) => Disposable.use(app.remotes.getHandler(remote), xapiOptions && this.getXapi(xapiOptions), (handler, xapi) =>
runWithLogs( runWithLogs(
async (args, onLog) => async (args, onProgress) =>
Task.run( Task.run(
{ {
name: 'metadataRestore', properties: {
data: JSON.parse(String(await handler.readFile(`${backupId}/metadata.json`))), metadata: JSON.parse(String(await handler.readFile(`${backupId}/metadata.json`))),
onLog, name: 'metadataRestore',
},
onProgress,
}, },
() => () =>
new RestoreMetadataBackup({ new RestoreMetadataBackup({

View File

@@ -31,6 +31,7 @@
"@vates/compose": "^2.1.0", "@vates/compose": "^2.1.0",
"@vates/decorate-with": "^2.0.0", "@vates/decorate-with": "^2.0.0",
"@vates/disposable": "^0.1.5", "@vates/disposable": "^0.1.5",
"@vates/task": "^0.2.0",
"@xen-orchestra/async-map": "^0.1.2", "@xen-orchestra/async-map": "^0.1.2",
"@xen-orchestra/backups": "^0.44.3", "@xen-orchestra/backups": "^0.44.3",
"@xen-orchestra/fs": "^4.1.3", "@xen-orchestra/fs": "^4.1.3",

View File

@@ -40,6 +40,7 @@
"@vates/parse-duration": "^0.1.1", "@vates/parse-duration": "^0.1.1",
"@vates/predicates": "^1.1.0", "@vates/predicates": "^1.1.0",
"@vates/read-chunk": "^1.2.0", "@vates/read-chunk": "^1.2.0",
"@vates/task": "^0.2.0",
"@xen-orchestra/async-map": "^0.1.2", "@xen-orchestra/async-map": "^0.1.2",
"@xen-orchestra/backups": "^0.44.3", "@xen-orchestra/backups": "^0.44.3",
"@xen-orchestra/cron": "^1.0.6", "@xen-orchestra/cron": "^1.0.6",

View File

@@ -5,7 +5,7 @@ import { createLogger } from '@xen-orchestra/log'
import { createRunner } from '@xen-orchestra/backups/Backup.mjs' import { createRunner } from '@xen-orchestra/backups/Backup.mjs'
import { parseMetadataBackupId } from '@xen-orchestra/backups/parseMetadataBackupId.mjs' import { parseMetadataBackupId } from '@xen-orchestra/backups/parseMetadataBackupId.mjs'
import { RestoreMetadataBackup } from '@xen-orchestra/backups/RestoreMetadataBackup.mjs' import { RestoreMetadataBackup } from '@xen-orchestra/backups/RestoreMetadataBackup.mjs'
import { Task } from '@xen-orchestra/backups/Task.mjs' import { Task } from '@vates/task'
import { debounceWithKey, REMOVE_CACHE_ENTRY } from '../_pDebounceWithKey.mjs' import { debounceWithKey, REMOVE_CACHE_ENTRY } from '../_pDebounceWithKey.mjs'
import { handleBackupLog } from '../_handleBackupLog.mjs' import { handleBackupLog } from '../_handleBackupLog.mjs'
@@ -124,8 +124,8 @@ export default class metadataBackup {
const localTaskIds = { __proto__: null } const localTaskIds = { __proto__: null }
return Task.run( return Task.run(
{ {
name: 'backup run', properties: { name: 'backup run' },
onLog: log => onProgress: log =>
handleBackupLog(log, { handleBackupLog(log, {
localTaskIds, localTaskIds,
logger, logger,