Compare commits

...

1 Commits

Author SHA1 Message Date
Julien Fontanet
a4a879ad44 WiP 2024-01-11 09:57:42 +01:00
29 changed files with 191 additions and 320 deletions

View File

@@ -1,4 +1,4 @@
import { Task } from './Task.mjs'
import { Task } from '@vates/task'
export class HealthCheckVmBackup {
#restoredVm
@@ -14,7 +14,7 @@ export class HealthCheckVmBackup {
async run() {
return Task.run(
{
name: 'vmstart',
properties: { name: 'vmstart' },
},
async () => {
let restoredVm = this.#restoredVm

View File

@@ -1,8 +1,8 @@
import assert from 'node:assert'
import { Task } from '@vates/task'
import { formatFilenameDate } from './_filenameDate.mjs'
import { importIncrementalVm } from './_incrementalVm.mjs'
import { Task } from './Task.mjs'
import { watchStreamSize } from './_watchStreamSize.mjs'
import { VhdNegative, VhdSynthetic } from 'vhd-lib'
import { decorateClass } from '@vates/decorate-with'
@@ -240,7 +240,7 @@ export class ImportVmBackup {
return Task.run(
{
name: 'transfer',
properties: { name: 'transfer' },
},
async () => {
const xapi = this._xapi

View File

@@ -1,155 +0,0 @@
import CancelToken from 'promise-toolbox/CancelToken'
import Zone from 'node-zone'
const logAfterEnd = log => {
const error = new Error('task has already ended')
error.log = log
throw error
}
const noop = Function.prototype
const serializeErrors = errors => (Array.isArray(errors) ? errors.map(serializeError) : errors)
// Create a serializable object from an error.
//
// Otherwise some fields might be non-enumerable and missing from logs.
const serializeError = error =>
error instanceof Error
? {
...error, // Copy enumerable properties.
code: error.code,
errors: serializeErrors(error.errors), // supports AggregateError
message: error.message,
name: error.name,
stack: error.stack,
}
: error
const $$task = Symbol('@xen-orchestra/backups/Task')
export class Task {
static get cancelToken() {
const task = Zone.current.data[$$task]
return task !== undefined ? task.#cancelToken : CancelToken.none
}
static run(opts, fn) {
return new this(opts).run(fn, true)
}
static wrapFn(opts, fn) {
// compatibility with @decorateWith
if (typeof fn !== 'function') {
;[fn, opts] = [opts, fn]
}
return function () {
return Task.run(typeof opts === 'function' ? opts.apply(this, arguments) : opts, () => fn.apply(this, arguments))
}
}
#cancelToken
#id = Math.random().toString(36).slice(2)
#onLog
#zone
constructor({ name, data, onLog }) {
let parentCancelToken, parentId
if (onLog === undefined) {
const parent = Zone.current.data[$$task]
if (parent === undefined) {
onLog = noop
} else {
onLog = log => parent.#onLog(log)
parentCancelToken = parent.#cancelToken
parentId = parent.#id
}
}
const zone = Zone.current.fork('@xen-orchestra/backups/Task')
zone.data[$$task] = this
this.#zone = zone
const { cancel, token } = CancelToken.source(parentCancelToken && [parentCancelToken])
this.#cancelToken = token
this.cancel = cancel
this.#onLog = onLog
this.#log('start', {
data,
message: name,
parentId,
})
}
failure(error) {
this.#end('failure', serializeError(error))
}
info(message, data) {
this.#log('info', { data, message })
}
/**
* Run a function in the context of this task
*
* In case of error, the task will be failed.
*
* @typedef Result
* @param {() => Result} fn
* @param {boolean} last - Whether the task should succeed if there is no error
* @returns Result
*/
run(fn, last = false) {
return this.#zone.run(() => {
try {
const result = fn()
let then
if (result != null && typeof (then = result.then) === 'function') {
then.call(result, last && (value => this.success(value)), error => this.failure(error))
} else if (last) {
this.success(result)
}
return result
} catch (error) {
this.failure(error)
throw error
}
})
}
success(value) {
this.#end('success', value)
}
warning(message, data) {
this.#log('warning', { data, message })
}
wrapFn(fn, last) {
const task = this
return function () {
return task.run(() => fn.apply(this, arguments), last)
}
}
#end(status, result) {
this.#log('end', { result, status })
this.#onLog = logAfterEnd
}
#log(event, props) {
this.#onLog({
...props,
event,
taskId: this.#id,
timestamp: Date.now(),
})
}
}
for (const method of ['info', 'warning']) {
Task[method] = (...args) => Zone.current.data[$$task]?.[method](...args)
}

View File

@@ -11,10 +11,10 @@ import { decorateMethodsWith } from '@vates/decorate-with'
import { deduped } from '@vates/disposable/deduped.js'
import { getHandler } from '@xen-orchestra/fs'
import { parseDuration } from '@vates/parse-duration'
import { Task } from '@vates/task'
import { Xapi } from '@xen-orchestra/xapi'
import { RemoteAdapter } from './RemoteAdapter.mjs'
import { Task } from './Task.mjs'
createCachedLookup().patchGlobal()
@@ -154,8 +154,8 @@ process.on('message', async message => {
const result = message.runWithLogs
? await Task.run(
{
name: 'backup run',
onLog: data =>
properties: { name: 'backup run' },
onProgress: data =>
emitMessage({
data,
type: 'log',

View File

@@ -36,32 +36,34 @@ const computeVhdsSize = (handler, vhdPaths) =>
)
// chain is [ ancestor, child_1, ..., child_n ]
async function _mergeVhdChain(handler, chain, { logInfo, remove, mergeBlockConcurrency }) {
logInfo(`merging VHD chain`, { chain })
async function _mergeVhdChain(handler, chain, { logInfo, remove, merge, mergeBlockConcurrency }) {
if (merge) {
logInfo(`merging VHD chain`, { chain })
let done, total
const handle = setInterval(() => {
if (done !== undefined) {
logInfo('merge in progress', {
done,
parent: chain[0],
progress: Math.round((100 * done) / total),
total,
let done, total
const handle = setInterval(() => {
if (done !== undefined) {
logInfo('merge in progress', {
done,
parent: chain[0],
progress: Math.round((100 * done) / total),
total,
})
}
}, 10e3)
try {
return await mergeVhdChain(handler, chain, {
logInfo,
mergeBlockConcurrency,
onProgress({ done: d, total: t }) {
done = d
total = t
},
removeUnused: remove,
})
} finally {
clearInterval(handle)
}
}, 10e3)
try {
return await mergeVhdChain(handler, chain, {
logInfo,
mergeBlockConcurrency,
onProgress({ done: d, total: t }) {
done = d
total = t
},
removeUnused: remove,
})
} finally {
clearInterval(handle)
}
}
@@ -469,20 +471,23 @@ export async function cleanVm(
const metadataWithMergedVhd = {}
const doMerge = async () => {
await asyncMap(toMerge, async chain => {
const { finalVhdSize } = await limitedMergeVhdChain(handler, chain, {
const merged = await limitedMergeVhdChain(handler, chain, {
logInfo,
logWarn,
remove,
merge,
mergeBlockConcurrency,
})
const metadataPath = vhdsToJSons[chain[chain.length - 1]] // all the chain should have the same metada file
metadataWithMergedVhd[metadataPath] = (metadataWithMergedVhd[metadataPath] ?? 0) + finalVhdSize
if (merged !== undefined) {
const metadataPath = vhdsToJSons[chain[chain.length - 1]] // all the chain should have the same metada file
metadataWithMergedVhd[metadataPath] = true
}
})
}
await Promise.all([
...unusedVhdsDeletion,
toMerge.length !== 0 && (merge ? Task.run({ name: 'merge' }, doMerge) : () => Promise.resolve()),
toMerge.length !== 0 && (merge ? Task.run({ properties: { name: 'merge' } }, doMerge) : () => Promise.resolve()),
asyncMap(unusedXvas, path => {
logWarn('unused XVA', { path })
if (remove) {
@@ -504,11 +509,12 @@ export async function cleanVm(
// update size for delta metadata with merged VHD
// check for the other that the size is the same as the real file size
await asyncMap(jsons, async metadataPath => {
const metadata = backups.get(metadataPath)
let fileSystemSize
const mergedSize = metadataWithMergedVhd[metadataPath]
const merged = metadataWithMergedVhd[metadataPath] !== undefined
const { mode, size, vhds, xva } = metadata
@@ -518,29 +524,26 @@ export async function cleanVm(
const linkedXva = resolve('/', vmDir, xva)
try {
fileSystemSize = await handler.getSize(linkedXva)
if (fileSystemSize !== size && fileSystemSize !== undefined) {
logWarn('cleanVm: incorrect backup size in metadata', {
path: metadataPath,
actual: size ?? 'none',
expected: fileSystemSize,
})
}
} catch (error) {
// can fail with encrypted remote
}
} else if (mode === 'delta') {
const linkedVhds = Object.keys(vhds).map(key => resolve('/', vmDir, vhds[key]))
fileSystemSize = await computeVhdsSize(handler, linkedVhds)
// the size is not computed in some cases (e.g. VhdDirectory)
if (fileSystemSize === undefined) {
return
}
// don't warn if the size has changed after a merge
if (mergedSize === undefined) {
const linkedVhds = Object.keys(vhds).map(key => resolve('/', vmDir, vhds[key]))
fileSystemSize = await computeVhdsSize(handler, linkedVhds)
// the size is not computed in some cases (e.g. VhdDirectory)
if (fileSystemSize !== undefined && fileSystemSize !== size) {
logWarn('cleanVm: incorrect backup size in metadata', {
path: metadataPath,
actual: size ?? 'none',
expected: fileSystemSize,
})
}
if (!merged && fileSystemSize !== size) {
// FIXME: figure out why it occurs so often and, once fixed, log the real problems with `logWarn`
console.warn('cleanVm: incorrect backup size in metadata', {
path: metadataPath,
actual: size ?? 'none',
expected: fileSystemSize,
})
}
}
} catch (error) {
@@ -548,19 +551,9 @@ export async function cleanVm(
return
}
// systematically update size and differentials after a merge
// @todo : after 2024-04-01 remove the fixmetadata options since the size computation is fixed
if (mergedSize || (fixMetadata && fileSystemSize !== size)) {
metadata.size = mergedSize ?? fileSystemSize ?? size
if (mergedSize) {
// all disks are now key disk
metadata.isVhdDifferencing = {}
for (const id of Object.values(metadata.vdis ?? {})) {
metadata.isVhdDifferencing[`${id}.vhd`] = false
}
}
// systematically update size after a merge
if ((merged || fixMetadata) && size !== fileSystemSize) {
metadata.size = fileSystemSize
mustRegenerateCache = true
try {
await handler.writeFile(metadataPath, JSON.stringify(metadata), { flags: 'w' })

View File

@@ -6,9 +6,9 @@ import { CancelToken } from 'promise-toolbox'
import { compareVersions } from 'compare-versions'
import { createVhdStreamWithLength } from 'vhd-lib'
import { defer } from 'golike-defer'
import { Task } from '@vates/task'
import { cancelableMap } from './_cancelableMap.mjs'
import { Task } from './Task.mjs'
import pick from 'lodash/pick.js'
// in `other_config` of an incrementally replicated VM, contains the UUID of the source VM

View File

@@ -1,4 +1,5 @@
import { asyncMap } from '@xen-orchestra/async-map'
import { Task } from '@vates/task'
import Disposable from 'promise-toolbox/Disposable'
import ignoreErrors from 'promise-toolbox/ignoreErrors'
@@ -6,7 +7,6 @@ import { extractIdsFromSimplePattern } from '../extractIdsFromSimplePattern.mjs'
import { PoolMetadataBackup } from './_PoolMetadataBackup.mjs'
import { XoMetadataBackup } from './_XoMetadataBackup.mjs'
import { DEFAULT_SETTINGS, Abstract } from './_Abstract.mjs'
import { runTask } from './_runTask.mjs'
import { getAdaptersByRemote } from './_getAdaptersByRemote.mjs'
const DEFAULT_METADATA_SETTINGS = {
@@ -14,6 +14,8 @@ const DEFAULT_METADATA_SETTINGS = {
retentionXoMetadata: 0,
}
const noop = Function.prototype
export const Metadata = class MetadataBackupRunner extends Abstract {
_computeBaseSettings(config, job) {
const baseSettings = { ...DEFAULT_SETTINGS }
@@ -55,13 +57,16 @@ export const Metadata = class MetadataBackupRunner extends Abstract {
poolIds.map(id =>
this._getRecord('pool', id).catch(error => {
// See https://github.com/vatesfr/xen-orchestra/commit/6aa6cfba8ec939c0288f0fa740f6dfad98c43cbb
runTask(
new Task(
{
name: 'get pool record',
data: { type: 'pool', id },
properties: {
id,
name: 'get pool record',
type: 'pool',
},
},
() => Promise.reject(error)
)
).catch(noop)
})
)
),
@@ -81,11 +86,11 @@ export const Metadata = class MetadataBackupRunner extends Abstract {
if (pools.length !== 0 && settings.retentionPoolMetadata !== 0) {
promises.push(
asyncMap(pools, async pool =>
runTask(
new Task(
{
name: `Starting metadata backup for the pool (${pool.$id}). (${job.id})`,
data: {
properties: {
id: pool.$id,
name: `Starting metadata backup for the pool (${pool.$id}). (${job.id})`,
pool,
poolMaster: await ignoreErrors.call(pool.$xapi.getRecord('host', pool.master)),
type: 'pool',
@@ -100,17 +105,17 @@ export const Metadata = class MetadataBackupRunner extends Abstract {
schedule,
settings,
}).run()
)
).catch(noop)
)
)
}
if (job.xoMetadata !== undefined && settings.retentionXoMetadata !== 0) {
promises.push(
runTask(
new Task(
{
name: `Starting XO metadata backup. (${job.id})`,
data: {
properties: {
name: `Starting XO metadata backup. (${job.id})`,
type: 'xo',
},
},
@@ -122,7 +127,7 @@ export const Metadata = class MetadataBackupRunner extends Abstract {
schedule,
settings,
}).run()
)
).catch(noop)
)
}
await Promise.all(promises)

View File

@@ -1,12 +1,11 @@
import { asyncMapSettled } from '@xen-orchestra/async-map'
import Disposable from 'promise-toolbox/Disposable'
import { limitConcurrency } from 'limit-concurrency-decorator'
import { Task } from '@vates/task'
import { extractIdsFromSimplePattern } from '../extractIdsFromSimplePattern.mjs'
import { Task } from '../Task.mjs'
import createStreamThrottle from './_createStreamThrottle.mjs'
import { DEFAULT_SETTINGS, Abstract } from './_Abstract.mjs'
import { runTask } from './_runTask.mjs'
import { getAdaptersByRemote } from './_getAdaptersByRemote.mjs'
import { FullRemote } from './_vmRunners/FullRemote.mjs'
import { IncrementalRemote } from './_vmRunners/IncrementalRemote.mjs'
@@ -25,6 +24,8 @@ const DEFAULT_REMOTE_VM_SETTINGS = {
vmTimeout: 0,
}
const noop = Function.prototype
export const VmsRemote = class RemoteVmsBackupRunner extends Abstract {
_computeBaseSettings(config, job) {
const baseSettings = { ...DEFAULT_SETTINGS }
@@ -63,7 +64,13 @@ export const VmsRemote = class RemoteVmsBackupRunner extends Abstract {
const baseSettings = this._baseSettings
const handleVm = vmUuid => {
const taskStart = { name: 'backup VM', data: { type: 'VM', id: vmUuid } }
const taskStart = {
properties: {
id: vmUuid,
name: 'backup VM',
type: 'VM',
},
}
const opts = {
baseSettings,
@@ -86,7 +93,7 @@ export const VmsRemote = class RemoteVmsBackupRunner extends Abstract {
throw new Error(`Job mode ${job.mode} not implemented for mirror backup`)
}
return runTask(taskStart, () => vmBackup.run())
return new Task(taskStart, () => vmBackup.run()).catch(noop)
}
const { concurrency } = settings
await asyncMapSettled(vmsUuids, !concurrency ? handleVm : limitConcurrency(concurrency)(handleVm))

View File

@@ -1,12 +1,11 @@
import { asyncMapSettled } from '@xen-orchestra/async-map'
import Disposable from 'promise-toolbox/Disposable'
import { limitConcurrency } from 'limit-concurrency-decorator'
import { Task } from '@vates/task'
import { extractIdsFromSimplePattern } from '../extractIdsFromSimplePattern.mjs'
import { Task } from '../Task.mjs'
import createStreamThrottle from './_createStreamThrottle.mjs'
import { DEFAULT_SETTINGS, Abstract } from './_Abstract.mjs'
import { runTask } from './_runTask.mjs'
import { getAdaptersByRemote } from './_getAdaptersByRemote.mjs'
import { IncrementalXapi } from './_vmRunners/IncrementalXapi.mjs'
import { FullXapi } from './_vmRunners/FullXapi.mjs'
@@ -34,6 +33,8 @@ const DEFAULT_XAPI_VM_SETTINGS = {
vmTimeout: 0,
}
const noop = Function.prototype
export const VmsXapi = class VmsXapiBackupRunner extends Abstract {
_computeBaseSettings(config, job) {
const baseSettings = { ...DEFAULT_SETTINGS }
@@ -57,13 +58,16 @@ export const VmsXapi = class VmsXapiBackupRunner extends Abstract {
Disposable.all(
extractIdsFromSimplePattern(job.srs).map(id =>
this._getRecord('SR', id).catch(error => {
runTask(
new Task(
{
name: 'get SR record',
data: { type: 'SR', id },
properties: {
id,
name: 'get SR record',
type: 'SR',
},
},
() => Promise.reject(error)
)
).catch(noop)
})
)
),
@@ -90,13 +94,19 @@ export const VmsXapi = class VmsXapiBackupRunner extends Abstract {
const baseSettings = this._baseSettings
const handleVm = vmUuid => {
const taskStart = { name: 'backup VM', data: { type: 'VM', id: vmUuid } }
const taskStart = {
properties: {
id: vmUuid,
name: 'backup VM',
type: 'VM',
},
}
return this._getRecord('VM', vmUuid).then(
disposableVm =>
Disposable.use(disposableVm, vm => {
taskStart.data.name_label = vm.name_label
return runTask(taskStart, () => {
return new Task()(taskStart, () => {
const opts = {
baseSettings,
config,
@@ -121,12 +131,12 @@ export const VmsXapi = class VmsXapiBackupRunner extends Abstract {
}
}
return vmBackup.run()
})
}).catch(noop)
}),
error =>
runTask(taskStart, () => {
new Task(taskStart, () => {
throw error
})
}).catch(noop)
)
}
const { concurrency } = settings

View File

@@ -1,9 +1,12 @@
import Disposable from 'promise-toolbox/Disposable'
import pTimeout from 'promise-toolbox/timeout'
import { compileTemplate } from '@xen-orchestra/template'
import { runTask } from './_runTask.mjs'
import { Task } from '@vates/task'
import { RemoteTimeoutError } from './_RemoteTimeoutError.mjs'
const noop = Function.prototype
export const DEFAULT_SETTINGS = {
getRemoteTimeout: 300e3,
reportWhen: 'failure',
@@ -36,13 +39,16 @@ export const Abstract = class AbstractRunner {
})
} catch (error) {
// See https://github.com/vatesfr/xen-orchestra/commit/6aa6cfba8ec939c0288f0fa740f6dfad98c43cbb
runTask(
Task.run(
{
name: 'get remote adapter',
data: { type: 'remote', id: remoteId },
properties: {
id: remoteId,
name: 'get remote adapter',
type: 'remote',
},
},
() => Promise.reject(error)
)
).catch(noop)
}
}
}

View File

@@ -1,9 +1,9 @@
import { asyncMap } from '@xen-orchestra/async-map'
import { Task } from '@vates/task'
import { DIR_XO_POOL_METADATA_BACKUPS } from '../RemoteAdapter.mjs'
import { forkStreamUnpipe } from './_forkStreamUnpipe.mjs'
import { formatFilenameDate } from '../_filenameDate.mjs'
import { Task } from '../Task.mjs'
export const PATH_DB_DUMP = '/pool/xmldbdump'
@@ -54,8 +54,8 @@ export class PoolMetadataBackup {
([remoteId, adapter]) =>
Task.run(
{
name: `Starting metadata backup for the pool (${pool.$id}) for the remote (${remoteId}). (${job.id})`,
data: {
properties: {
name: `Starting metadata backup for the pool (${pool.$id}) for the remote (${remoteId}). (${job.id})`,
id: remoteId,
type: 'remote',
},

View File

@@ -1,9 +1,9 @@
import { asyncMap } from '@xen-orchestra/async-map'
import { join } from '@xen-orchestra/fs/path'
import { Task } from '@vates/task'
import { DIR_XO_CONFIG_BACKUPS } from '../RemoteAdapter.mjs'
import { formatFilenameDate } from '../_filenameDate.mjs'
import { Task } from '../Task.mjs'
export class XoMetadataBackup {
constructor({ config, job, remoteAdapters, schedule, settings }) {
@@ -51,8 +51,8 @@ export class XoMetadataBackup {
([remoteId, adapter]) =>
Task.run(
{
name: `Starting XO metadata backup for the remote (${remoteId}). (${job.id})`,
data: {
properties: {
name: `Starting XO metadata backup for the remote (${remoteId}). (${job.id})`,
id: remoteId,
type: 'remote',
},

View File

@@ -1,5 +0,0 @@
import { Task } from '../Task.mjs'
const noop = Function.prototype
export const runTask = (...args) => Task.run(...args).catch(noop) // errors are handled by logs

View File

@@ -1,10 +1,11 @@
import { decorateMethodsWith } from '@vates/decorate-with'
import { defer } from 'golike-defer'
import { Task } from '@vates/task'
import { AbstractRemote } from './_AbstractRemote.mjs'
import { FullRemoteWriter } from '../_writers/FullRemoteWriter.mjs'
import { forkStreamUnpipe } from '../_forkStreamUnpipe.mjs'
import { watchStreamSize } from '../../_watchStreamSize.mjs'
import { Task } from '../../Task.mjs'
export const FullRemote = class FullRemoteVmBackupRunner extends AbstractRemote {
_getRemoteWriter() {

View File

@@ -1,6 +1,7 @@
import { asyncEach } from '@vates/async-each'
import { decorateMethodsWith } from '@vates/decorate-with'
import { defer } from 'golike-defer'
import { Task } from '@vates/task'
import assert from 'node:assert'
import isVhdDifferencingDisk from 'vhd-lib/isVhdDifferencingDisk.js'
import mapValues from 'lodash/mapValues.js'
@@ -8,7 +9,6 @@ import mapValues from 'lodash/mapValues.js'
import { AbstractRemote } from './_AbstractRemote.mjs'
import { forkDeltaExport } from './_forkDeltaExport.mjs'
import { IncrementalRemoteWriter } from '../_writers/IncrementalRemoteWriter.mjs'
import { Task } from '../../Task.mjs'
class IncrementalRemoteVmBackupRunner extends AbstractRemote {
_getRemoteWriter() {

View File

@@ -2,6 +2,7 @@ import { asyncEach } from '@vates/async-each'
import { asyncMap } from '@xen-orchestra/async-map'
import { createLogger } from '@xen-orchestra/log'
import { pipeline } from 'node:stream'
import { Task } from '@vates/task'
import findLast from 'lodash/findLast.js'
import isVhdDifferencingDisk from 'vhd-lib/isVhdDifferencingDisk.js'
import keyBy from 'lodash/keyBy.js'
@@ -13,7 +14,6 @@ import { exportIncrementalVm } from '../../_incrementalVm.mjs'
import { forkDeltaExport } from './_forkDeltaExport.mjs'
import { IncrementalRemoteWriter } from '../_writers/IncrementalRemoteWriter.mjs'
import { IncrementalXapiWriter } from '../_writers/IncrementalXapiWriter.mjs'
import { Task } from '../../Task.mjs'
import { watchStreamSize } from '../../_watchStreamSize.mjs'
const { debug } = createLogger('xo:backups:IncrementalXapiVmBackup')

View File

@@ -1,6 +1,6 @@
import { asyncMap } from '@xen-orchestra/async-map'
import { createLogger } from '@xen-orchestra/log'
import { Task } from '../../Task.mjs'
import { Task } from '@vates/task'
const { debug, warn } = createLogger('xo:backups:AbstractVmRunner')
@@ -80,7 +80,7 @@ export const Abstract = class AbstractVmBackupRunner {
// create a task to have an info in the logs and reports
return Task.run(
{
name: 'health check',
properties: { name: 'health check' },
},
() => {
Task.info(`This VM doesn't match the health check's tags for this schedule`)

View File

@@ -5,9 +5,9 @@ import { asyncMap } from '@xen-orchestra/async-map'
import { decorateMethodsWith } from '@vates/decorate-with'
import { defer } from 'golike-defer'
import { formatDateTime } from '@xen-orchestra/xapi'
import { Task } from '@vates/task'
import { getOldEntries } from '../../_getOldEntries.mjs'
import { Task } from '../../Task.mjs'
import { Abstract } from './_Abstract.mjs'
export const AbstractXapi = class AbstractXapiVmBackupRunner extends Abstract {
@@ -142,7 +142,7 @@ export const AbstractXapi = class AbstractXapiVmBackupRunner extends Abstract {
const settings = this._settings
if (this._mustDoSnapshot()) {
await Task.run({ name: 'snapshot' }, async () => {
await Task.run({ properties: { name: 'snapshot' } }, async () => {
if (!settings.bypassVdiChainsCheck) {
await vm.$assertHealthyVdiChains()
}

View File

@@ -1,6 +1,7 @@
import { Task } from '@vates/task'
import { formatFilenameDate } from '../../_filenameDate.mjs'
import { getOldEntries } from '../../_getOldEntries.mjs'
import { Task } from '../../Task.mjs'
import { MixinRemoteWriter } from './_MixinRemoteWriter.mjs'
import { AbstractFullWriter } from './_AbstractFullWriter.mjs'
@@ -9,10 +10,10 @@ export class FullRemoteWriter extends MixinRemoteWriter(AbstractFullWriter) {
constructor(props) {
super(props)
this.run = Task.wrapFn(
this.run = Task.wrap(
{
name: 'export',
data: {
properties: {
name: 'export',
id: props.remoteId,
type: 'remote',
@@ -63,7 +64,7 @@ export class FullRemoteWriter extends MixinRemoteWriter(AbstractFullWriter) {
await deleteOldBackups()
}
await Task.run({ name: 'transfer' }, async () => {
await Task.run({ properties: { name: 'transfer' } }, async () => {
await adapter.outputStream(dataFilename, stream, {
maxStreamLength,
streamLength,

View File

@@ -1,10 +1,10 @@
import ignoreErrors from 'promise-toolbox/ignoreErrors'
import { asyncMap, asyncMapSettled } from '@xen-orchestra/async-map'
import { formatDateTime } from '@xen-orchestra/xapi'
import { Task } from '@vates/task'
import { formatFilenameDate } from '../../_filenameDate.mjs'
import { getOldEntries } from '../../_getOldEntries.mjs'
import { Task } from '../../Task.mjs'
import { AbstractFullWriter } from './_AbstractFullWriter.mjs'
import { MixinXapiWriter } from './_MixinXapiWriter.mjs'
@@ -14,10 +14,10 @@ export class FullXapiWriter extends MixinXapiWriter(AbstractFullWriter) {
constructor(props) {
super(props)
this.run = Task.wrapFn(
this.run = Task.wrap(
{
name: 'export',
data: {
properties: {
name: 'export',
id: props.sr.uuid,
name_label: this._sr.name_label,
type: 'SR',
@@ -52,7 +52,7 @@ export class FullXapiWriter extends MixinXapiWriter(AbstractFullWriter) {
}
let targetVmRef
await Task.run({ name: 'transfer' }, async () => {
await Task.run({ properties: { name: 'transfer' } }, async () => {
targetVmRef = await xapi.VM_import(stream, sr.$ref, vm =>
Promise.all([
!_warmMigration && vm.add_tags('Disaster Recovery'),

View File

@@ -8,11 +8,11 @@ import { createLogger } from '@xen-orchestra/log'
import { decorateClass } from '@vates/decorate-with'
import { defer } from 'golike-defer'
import { dirname } from 'node:path'
import { Task } from '@vates/task'
import { formatFilenameDate } from '../../_filenameDate.mjs'
import { getOldEntries } from '../../_getOldEntries.mjs'
import { TAG_BASE_DELTA } from '../../_incrementalVm.mjs'
import { Task } from '../../Task.mjs'
import { MixinRemoteWriter } from './_MixinRemoteWriter.mjs'
import { AbstractIncrementalWriter } from './_AbstractIncrementalWriter.mjs'
@@ -71,17 +71,17 @@ export class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrement
prepare({ isFull }) {
// create the task related to this export and ensure all methods are called in this context
const task = new Task({
name: 'export',
data: {
properties: {
name: 'export',
id: this._remoteId,
isFull,
type: 'remote',
},
})
this.transfer = task.wrapFn(this.transfer)
this.healthCheck = task.wrapFn(this.healthCheck)
this.cleanup = task.wrapFn(this.cleanup)
this.afterBackup = task.wrapFn(this.afterBackup, true)
this.transfer = task.wrapInside(this.transfer)
this.healthCheck = task.wrapInside(this.healthCheck)
this.cleanup = task.wrapInside(this.cleanup)
this.afterBackup = task.wrap(this.afterBackup)
return task.run(() => this._prepare())
}
@@ -174,7 +174,7 @@ export class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrement
vm,
vmSnapshot,
}
const { size } = await Task.run({ name: 'transfer' }, async () => {
const { size } = await Task.run({ properties: { name: 'transfer' } }, async () => {
let transferSize = 0
await asyncEach(
Object.entries(deltaExport.vdis),

View File

@@ -1,11 +1,11 @@
import { asyncMap, asyncMapSettled } from '@xen-orchestra/async-map'
import ignoreErrors from 'promise-toolbox/ignoreErrors'
import { formatDateTime } from '@xen-orchestra/xapi'
import { Task } from '@vates/task'
import { formatFilenameDate } from '../../_filenameDate.mjs'
import { getOldEntries } from '../../_getOldEntries.mjs'
import { importIncrementalVm, TAG_BACKUP_SR, TAG_BASE_DELTA, TAG_COPY_SRC } from '../../_incrementalVm.mjs'
import { Task } from '../../Task.mjs'
import { AbstractIncrementalWriter } from './_AbstractIncrementalWriter.mjs'
import { MixinXapiWriter } from './_MixinXapiWriter.mjs'
@@ -40,18 +40,21 @@ export class IncrementalXapiWriter extends MixinXapiWriter(AbstractIncrementalWr
prepare({ isFull }) {
// create the task related to this export and ensure all methods are called in this context
const task = new Task({
name: 'export',
data: {
properties: {
name: 'export',
id: this._sr.uuid,
isFull,
name_label: this._sr.name_label,
type: 'SR',
},
})
const hasHealthCheckSr = this._healthCheckSr !== undefined
this.transfer = task.wrapFn(this.transfer)
this.cleanup = task.wrapFn(this.cleanup, !hasHealthCheckSr)
this.healthCheck = task.wrapFn(this.healthCheck, hasHealthCheckSr)
this.transfer = task.wrapInside(this.transfer)
if (this._healthCheckSr !== undefined) {
this.cleanup = task.wrapInside(this.cleanup)
this.healthCheck = task.wrap(this.healthCheck)
} else {
this.cleanup = task.wrap(this.cleanup)
}
return task.run(() => this._prepare())
}
@@ -139,7 +142,7 @@ export class IncrementalXapiWriter extends MixinXapiWriter(AbstractIncrementalWr
const { uuid: srUuid, $xapi: xapi } = sr
let targetVmRef
await Task.run({ name: 'transfer' }, async () => {
await Task.run({ properties: { name: 'transfer' } }, async () => {
targetVmRef = await importIncrementalVm(this.#decorateVmMetadata(deltaExport), sr)
return {
size: Object.values(sizeContainers).reduce((sum, { size }) => sum + size, 0),

View File

@@ -1,12 +1,12 @@
import { createLogger } from '@xen-orchestra/log'
import { join } from 'node:path'
import { Task } from '@vates/task'
import assert from 'node:assert'
import { formatFilenameDate } from '../../_filenameDate.mjs'
import { getVmBackupDir } from '../../_getVmBackupDir.mjs'
import { HealthCheckVmBackup } from '../../HealthCheckVmBackup.mjs'
import { ImportVmBackup } from '../../ImportVmBackup.mjs'
import { Task } from '../../Task.mjs'
import * as MergeWorker from '../../merge-worker/index.mjs'
const { info, warn } = createLogger('xo:backups:MixinBackupWriter')
@@ -26,7 +26,7 @@ export const MixinRemoteWriter = (BaseClass = Object) =>
async _cleanVm(options) {
try {
return await Task.run({ name: 'clean-vm' }, () => {
return await Task.run({ properties: { name: 'clean-vm' } }, () => {
return this._adapter.cleanVm(this._vmBackupDir, {
...options,
fixMetadata: true,
@@ -84,7 +84,7 @@ export const MixinRemoteWriter = (BaseClass = Object) =>
)
return Task.run(
{
name: 'health check',
properties: { name: 'health check' },
},
async () => {
const xapi = sr.$xapi

View File

@@ -1,8 +1,8 @@
import { extractOpaqueRef } from '@xen-orchestra/xapi'
import { Task } from '@vates/task'
import assert from 'node:assert/strict'
import { HealthCheckVmBackup } from '../../HealthCheckVmBackup.mjs'
import { Task } from '../../Task.mjs'
export const MixinXapiWriter = (BaseClass = Object) =>
class MixinXapiWriter extends BaseClass {
@@ -32,7 +32,7 @@ export const MixinXapiWriter = (BaseClass = Object) =>
// copy VM
return Task.run(
{
name: 'health check',
properties: { name: 'health check' },
},
async () => {
const { $xapi: xapi } = sr
@@ -42,7 +42,7 @@ export const MixinXapiWriter = (BaseClass = Object) =>
if (await this.#isAlreadyOnHealthCheckSr(baseVm)) {
healthCheckVmRef = await Task.run(
{ name: 'cloning-vm' },
{ properties: { name: 'cloning-vm' } },
async () =>
await xapi
.callAsync('VM.clone', this._targetVmRef, `Health Check - ${baseVm.name_label}`)
@@ -50,7 +50,7 @@ export const MixinXapiWriter = (BaseClass = Object) =>
)
} else {
healthCheckVmRef = await Task.run(
{ name: 'copying-vm' },
{ properties: { name: 'copying-vm' } },
async () =>
await xapi
.callAsync('VM.copy', this._targetVmRef, `Health Check - ${baseVm.name_label}`, sr.$ref)

View File

@@ -27,6 +27,7 @@
"@vates/fuse-vhd": "^2.0.0",
"@vates/nbd-client": "^3.0.0",
"@vates/parse-duration": "^0.1.1",
"@vates/task": "^0.2.0",
"@xen-orchestra/async-map": "^0.1.2",
"@xen-orchestra/fs": "^4.1.3",
"@xen-orchestra/log": "^0.6.0",

View File

@@ -15,7 +15,7 @@ import { Readable } from 'stream'
import { RemoteAdapter } from '@xen-orchestra/backups/RemoteAdapter.mjs'
import { RestoreMetadataBackup } from '@xen-orchestra/backups/RestoreMetadataBackup.mjs'
import { runBackupWorker } from '@xen-orchestra/backups/runBackupWorker.mjs'
import { Task } from '@xen-orchestra/backups/Task.mjs'
import { Task } from '@vates/task'
import { Xapi } from '@xen-orchestra/xapi'
const noop = Function.prototype
@@ -122,15 +122,15 @@ export default class Backups {
try {
await Task.run(
{
name: 'backup run',
data: {
properties: {
jobId: job.id,
jobName: job.name,
mode: job.mode,
name: 'backup run',
reportWhen: job.settings['']?.reportWhen,
scheduleId: schedule.id,
},
onLog,
onProgress: onLog,
},
() => run(params)
)
@@ -205,14 +205,14 @@ export default class Backups {
async (args, onLog) =>
Task.run(
{
data: {
properties: {
backupId,
jobId: metadata.jobId,
name: 'restore',
srId: srUuid,
time: metadata.timestamp,
},
name: 'restore',
onLog,
onProgress: onLog,
},
run
).catch(() => {}), // errors are handled by logs,
@@ -344,12 +344,14 @@ export default class Backups {
({ backupId, remote, xapi: xapiOptions }) =>
Disposable.use(app.remotes.getHandler(remote), xapiOptions && this.getXapi(xapiOptions), (handler, xapi) =>
runWithLogs(
async (args, onLog) =>
async (args, onProgress) =>
Task.run(
{
name: 'metadataRestore',
data: JSON.parse(String(await handler.readFile(`${backupId}/metadata.json`))),
onLog,
properties: {
metadata: JSON.parse(String(await handler.readFile(`${backupId}/metadata.json`))),
name: 'metadataRestore',
},
onProgress,
},
() =>
new RestoreMetadataBackup({

View File

@@ -31,6 +31,7 @@
"@vates/compose": "^2.1.0",
"@vates/decorate-with": "^2.0.0",
"@vates/disposable": "^0.1.5",
"@vates/task": "^0.2.0",
"@xen-orchestra/async-map": "^0.1.2",
"@xen-orchestra/backups": "^0.44.3",
"@xen-orchestra/fs": "^4.1.3",

View File

@@ -40,6 +40,7 @@
"@vates/parse-duration": "^0.1.1",
"@vates/predicates": "^1.1.0",
"@vates/read-chunk": "^1.2.0",
"@vates/task": "^0.2.0",
"@xen-orchestra/async-map": "^0.1.2",
"@xen-orchestra/backups": "^0.44.3",
"@xen-orchestra/cron": "^1.0.6",

View File

@@ -5,7 +5,7 @@ import { createLogger } from '@xen-orchestra/log'
import { createRunner } from '@xen-orchestra/backups/Backup.mjs'
import { parseMetadataBackupId } from '@xen-orchestra/backups/parseMetadataBackupId.mjs'
import { RestoreMetadataBackup } from '@xen-orchestra/backups/RestoreMetadataBackup.mjs'
import { Task } from '@xen-orchestra/backups/Task.mjs'
import { Task } from '@vates/task'
import { debounceWithKey, REMOVE_CACHE_ENTRY } from '../_pDebounceWithKey.mjs'
import { handleBackupLog } from '../_handleBackupLog.mjs'
@@ -124,8 +124,8 @@ export default class metadataBackup {
const localTaskIds = { __proto__: null }
return Task.run(
{
name: 'backup run',
onLog: log =>
properties: { name: 'backup run' },
onProgress: log =>
handleBackupLog(log, {
localTaskIds,
logger,