Compare commits

...

3 Commits

Author SHA1 Message Date
Florent BEAUCHAMP
9bfa6db64a fix(@xen-orchestra/backups): save the vm and vmSapshot metadata 2023-05-12 14:30:35 +02:00
Florent BEAUCHAMP
cfd3cf78f8 feat(@xen-orchestra/backups): implement secondary backups 2023-05-12 14:30:35 +02:00
Florent BEAUCHAMP
c85323baa4 refactor(@xen-orchestra/backups): reorganize backup job to prepare for secondary backup
* full and delta backup jobs are renamed to full and incremental
* inside an incremental job, the full and delta transfers are renamed to base and delta transver
* the baseVm used for computing the snapshot is renamed to comparisasonBasisVm
* split a lot of files to extract reusable methods
* use a factory to instantiates BackubJob, the Backup* classes are now private by convention
* the VmBackups class are instatiated by a XapiBackupJob => move them to the _backup folder
* the writers need a VmBackup in their constructor: move them to the vmBackup folder
2023-05-12 14:30:35 +02:00
49 changed files with 1524 additions and 954 deletions

View File

@@ -1,307 +0,0 @@
'use strict'
const { asyncMap, asyncMapSettled } = require('@xen-orchestra/async-map')
const Disposable = require('promise-toolbox/Disposable')
const ignoreErrors = require('promise-toolbox/ignoreErrors')
const pTimeout = require('promise-toolbox/timeout')
const { compileTemplate } = require('@xen-orchestra/template')
const { limitConcurrency } = require('limit-concurrency-decorator')
const { extractIdsFromSimplePattern } = require('./extractIdsFromSimplePattern.js')
const { PoolMetadataBackup } = require('./_PoolMetadataBackup.js')
const { Task } = require('./Task.js')
const { VmBackup } = require('./_VmBackup.js')
const { XoMetadataBackup } = require('./_XoMetadataBackup.js')
const createStreamThrottle = require('./_createStreamThrottle.js')
const noop = Function.prototype
const getAdaptersByRemote = adapters => {
const adaptersByRemote = {}
adapters.forEach(({ adapter, remoteId }) => {
adaptersByRemote[remoteId] = adapter
})
return adaptersByRemote
}
const runTask = (...args) => Task.run(...args).catch(noop) // errors are handled by logs
const DEFAULT_SETTINGS = {
getRemoteTimeout: 300e3,
reportWhen: 'failure',
}
const DEFAULT_VM_SETTINGS = {
bypassVdiChainsCheck: false,
checkpointSnapshot: false,
concurrency: 2,
copyRetention: 0,
deleteFirst: false,
exportRetention: 0,
fullInterval: 0,
healthCheckSr: undefined,
healthCheckVmsWithTags: [],
maxExportRate: 0,
maxMergedDeltasPerRun: Infinity,
offlineBackup: false,
offlineSnapshot: false,
snapshotRetention: 0,
timeout: 0,
useNbd: false,
unconditionalSnapshot: false,
validateVhdStreams: false,
vmTimeout: 0,
}
const DEFAULT_METADATA_SETTINGS = {
retentionPoolMetadata: 0,
retentionXoMetadata: 0,
}
class RemoteTimeoutError extends Error {
constructor(remoteId) {
super('timeout while getting the remote ' + remoteId)
this.remoteId = remoteId
}
}
exports.Backup = class Backup {
constructor({ config, getAdapter, getConnectedRecord, job, schedule }) {
this._config = config
this._getRecord = getConnectedRecord
this._job = job
this._schedule = schedule
this._getSnapshotNameLabel = compileTemplate(config.snapshotNameLabelTpl, {
'{job.name}': job.name,
'{vm.name_label}': vm => vm.name_label,
})
const { type } = job
const baseSettings = { ...DEFAULT_SETTINGS }
if (type === 'backup') {
Object.assign(baseSettings, DEFAULT_VM_SETTINGS, config.defaultSettings, config.vm?.defaultSettings)
this.run = this._runVmBackup
} else if (type === 'metadataBackup') {
Object.assign(baseSettings, DEFAULT_METADATA_SETTINGS, config.defaultSettings, config.metadata?.defaultSettings)
this.run = this._runMetadataBackup
} else {
throw new Error(`No runner for the backup type ${type}`)
}
Object.assign(baseSettings, job.settings[''])
this._baseSettings = baseSettings
this._settings = { ...baseSettings, ...job.settings[schedule.id] }
const { getRemoteTimeout } = this._settings
this._getAdapter = async function (remoteId) {
try {
const disposable = await pTimeout.call(getAdapter(remoteId), getRemoteTimeout, new RemoteTimeoutError(remoteId))
return new Disposable(() => disposable.dispose(), {
adapter: disposable.value,
remoteId,
})
} catch (error) {
// See https://github.com/vatesfr/xen-orchestra/commit/6aa6cfba8ec939c0288f0fa740f6dfad98c43cbb
runTask(
{
name: 'get remote adapter',
data: { type: 'remote', id: remoteId },
},
() => Promise.reject(error)
)
}
}
}
async _runMetadataBackup() {
const schedule = this._schedule
const job = this._job
const remoteIds = extractIdsFromSimplePattern(job.remotes)
if (remoteIds.length === 0) {
throw new Error('metadata backup job cannot run without remotes')
}
const config = this._config
const poolIds = extractIdsFromSimplePattern(job.pools)
const isEmptyPools = poolIds.length === 0
const isXoMetadata = job.xoMetadata !== undefined
if (!isXoMetadata && isEmptyPools) {
throw new Error('no metadata mode found')
}
const settings = this._settings
const { retentionPoolMetadata, retentionXoMetadata } = settings
if (
(retentionPoolMetadata === 0 && retentionXoMetadata === 0) ||
(!isXoMetadata && retentionPoolMetadata === 0) ||
(isEmptyPools && retentionXoMetadata === 0)
) {
throw new Error('no retentions corresponding to the metadata modes found')
}
await Disposable.use(
Disposable.all(
poolIds.map(id =>
this._getRecord('pool', id).catch(error => {
// See https://github.com/vatesfr/xen-orchestra/commit/6aa6cfba8ec939c0288f0fa740f6dfad98c43cbb
runTask(
{
name: 'get pool record',
data: { type: 'pool', id },
},
() => Promise.reject(error)
)
})
)
),
Disposable.all(remoteIds.map(id => this._getAdapter(id))),
async (pools, remoteAdapters) => {
// remove adapters that failed (already handled)
remoteAdapters = remoteAdapters.filter(_ => _ !== undefined)
if (remoteAdapters.length === 0) {
return
}
remoteAdapters = getAdaptersByRemote(remoteAdapters)
// remove pools that failed (already handled)
pools = pools.filter(_ => _ !== undefined)
const promises = []
if (pools.length !== 0 && settings.retentionPoolMetadata !== 0) {
promises.push(
asyncMap(pools, async pool =>
runTask(
{
name: `Starting metadata backup for the pool (${pool.$id}). (${job.id})`,
data: {
id: pool.$id,
pool,
poolMaster: await ignoreErrors.call(pool.$xapi.getRecord('host', pool.master)),
type: 'pool',
},
},
() =>
new PoolMetadataBackup({
config,
job,
pool,
remoteAdapters,
schedule,
settings,
}).run()
)
)
)
}
if (job.xoMetadata !== undefined && settings.retentionXoMetadata !== 0) {
promises.push(
runTask(
{
name: `Starting XO metadata backup. (${job.id})`,
data: {
type: 'xo',
},
},
() =>
new XoMetadataBackup({
config,
job,
remoteAdapters,
schedule,
settings,
}).run()
)
)
}
await Promise.all(promises)
}
)
}
async _runVmBackup() {
const job = this._job
// FIXME: proper SimpleIdPattern handling
const getSnapshotNameLabel = this._getSnapshotNameLabel
const schedule = this._schedule
const settings = this._settings
const throttleStream = createStreamThrottle(settings.maxExportRate)
const config = this._config
await Disposable.use(
Disposable.all(
extractIdsFromSimplePattern(job.srs).map(id =>
this._getRecord('SR', id).catch(error => {
runTask(
{
name: 'get SR record',
data: { type: 'SR', id },
},
() => Promise.reject(error)
)
})
)
),
Disposable.all(extractIdsFromSimplePattern(job.remotes).map(id => this._getAdapter(id))),
() => (settings.healthCheckSr !== undefined ? this._getRecord('SR', settings.healthCheckSr) : undefined),
async (srs, remoteAdapters, healthCheckSr) => {
// remove adapters that failed (already handled)
remoteAdapters = remoteAdapters.filter(_ => _ !== undefined)
// remove srs that failed (already handled)
srs = srs.filter(_ => _ !== undefined)
if (remoteAdapters.length === 0 && srs.length === 0 && settings.snapshotRetention === 0) {
return
}
const vmIds = extractIdsFromSimplePattern(job.vms)
Task.info('vms', { vms: vmIds })
remoteAdapters = getAdaptersByRemote(remoteAdapters)
const allSettings = this._job.settings
const baseSettings = this._baseSettings
const handleVm = vmUuid => {
const taskStart = { name: 'backup VM', data: { type: 'VM', id: vmUuid } }
return this._getRecord('VM', vmUuid).then(
disposableVm =>
Disposable.use(disposableVm, vm => {
taskStart.data.name_label = vm.name_label
return runTask(taskStart, () =>
new VmBackup({
baseSettings,
config,
getSnapshotNameLabel,
healthCheckSr,
job,
remoteAdapters,
schedule,
settings: { ...settings, ...allSettings[vm.uuid] },
srs,
throttleStream,
vm,
}).run()
)
}),
error =>
runTask(taskStart, () => {
throw error
})
)
}
const { concurrency } = settings
await asyncMapSettled(vmIds, concurrency === 0 ? handleVm : limitConcurrency(concurrency)(handleVm))
}
)
}
}

View File

@@ -3,14 +3,14 @@
const assert = require('assert') const assert = require('assert')
const { formatFilenameDate } = require('./_filenameDate.js') const { formatFilenameDate } = require('./_filenameDate.js')
const { importDeltaVm } = require('./_deltaVm.js') const { importIncrementalVm } = require('./_incrementalVm.js')
const { Task } = require('./Task.js') const { Task } = require('./Task.js')
const { watchStreamSize } = require('./_watchStreamSize.js') const { watchStreamSize } = require('./_watchStreamSize.js')
exports.ImportVmBackup = class ImportVmBackup { exports.ImportVmBackup = class ImportVmBackup {
constructor({ adapter, metadata, srUuid, xapi, settings: { newMacAddresses, mapVdisSrs = {} } = {} }) { constructor({ adapter, metadata, srUuid, xapi, settings: { newMacAddresses, mapVdisSrs = {} } = {} }) {
this._adapter = adapter this._adapter = adapter
this._importDeltaVmSettings = { newMacAddresses, mapVdisSrs } this._importIncrementalVmSettings = { newMacAddresses, mapVdisSrs }
this._metadata = metadata this._metadata = metadata
this._srUuid = srUuid this._srUuid = srUuid
this._xapi = xapi this._xapi = xapi
@@ -31,11 +31,11 @@ exports.ImportVmBackup = class ImportVmBackup {
assert.strictEqual(metadata.mode, 'delta') assert.strictEqual(metadata.mode, 'delta')
const ignoredVdis = new Set( const ignoredVdis = new Set(
Object.entries(this._importDeltaVmSettings.mapVdisSrs) Object.entries(this._importIncrementalVmSettings.mapVdisSrs)
.filter(([_, srUuid]) => srUuid === null) .filter(([_, srUuid]) => srUuid === null)
.map(([vdiUuid]) => vdiUuid) .map(([vdiUuid]) => vdiUuid)
) )
backup = await adapter.readDeltaVmBackup(metadata, ignoredVdis) backup = await adapter.readIncrementalVmBackup(metadata, ignoredVdis)
Object.values(backup.streams).forEach(stream => watchStreamSize(stream, sizeContainer)) Object.values(backup.streams).forEach(stream => watchStreamSize(stream, sizeContainer))
} }
@@ -49,8 +49,8 @@ exports.ImportVmBackup = class ImportVmBackup {
const vmRef = isFull const vmRef = isFull
? await xapi.VM_import(backup, srRef) ? await xapi.VM_import(backup, srRef)
: await importDeltaVm(backup, await xapi.getRecord('SR', srRef), { : await importIncrementalVm(backup, await xapi.getRecord('SR', srRef), {
...this._importDeltaVmSettings, ...this._importIncrementalVmSettings,
detectBase: false, detectBase: false,
}) })

View File

@@ -3,9 +3,9 @@
const { asyncMap } = require('@xen-orchestra/async-map') const { asyncMap } = require('@xen-orchestra/async-map')
const { DIR_XO_POOL_METADATA_BACKUPS } = require('./RemoteAdapter.js') const { DIR_XO_POOL_METADATA_BACKUPS } = require('./RemoteAdapter.js')
const { forkStreamUnpipe } = require('./_forkStreamUnpipe.js')
const { formatFilenameDate } = require('./_filenameDate.js') const { formatFilenameDate } = require('./_filenameDate.js')
const { Task } = require('./Task.js') const { Task } = require('./Task.js')
const { forkStreamUnpipe } = require('./_backupJob/forkStreamUnpipe.js')
const PATH_DB_DUMP = '/pool/xmldbdump' const PATH_DB_DUMP = '/pool/xmldbdump'
exports.PATH_DB_DUMP = PATH_DB_DUMP exports.PATH_DB_DUMP = PATH_DB_DUMP

View File

@@ -252,7 +252,7 @@ class RemoteAdapter {
) )
} }
async deleteDeltaVmBackups(backups) { async deleteIncrementalVmBackups(backups) {
const handler = this._handler const handler = this._handler
// this will delete the json, unused VHDs will be detected by `cleanVm` // this will delete the json, unused VHDs will be detected by `cleanVm`
@@ -304,7 +304,7 @@ class RemoteAdapter {
} }
await Promise.all([ await Promise.all([
delta !== undefined && this.deleteDeltaVmBackups(delta), delta !== undefined && this.deleteIncrementalVmBackups(delta),
full !== undefined && this.deleteFullVmBackups(full), full !== undefined && this.deleteFullVmBackups(full),
]) ])
@@ -717,7 +717,7 @@ class RemoteAdapter {
return stream return stream
} }
async readDeltaVmBackup(metadata, ignoredVdis) { async readIncrementalVmBackup(metadata, ignoredVdis, { useSynthetic = true } = {}) {
const handler = this._handler const handler = this._handler
const { vbds, vhds, vifs, vm, vmSnapshot } = metadata const { vbds, vhds, vifs, vm, vmSnapshot } = metadata
const dir = dirname(metadata._filename) const dir = dirname(metadata._filename)
@@ -725,7 +725,9 @@ class RemoteAdapter {
const streams = {} const streams = {}
await asyncMapSettled(Object.keys(vdis), async ref => { await asyncMapSettled(Object.keys(vdis), async ref => {
streams[`${ref}.vhd`] = await this._createSyntheticStream(handler, join(dir, vhds[ref])) streams[`${ref}.vhd`] = useSynthetic
? await this._createSyntheticStream(handler, join(dir, vhds[ref]))
: await this._handler.createReadStream(join(dir, vhds[ref]))
}) })
return { return {

View File

@@ -1,7 +1,7 @@
'use strict' 'use strict'
const { DIR_XO_POOL_METADATA_BACKUPS } = require('./RemoteAdapter.js') const { DIR_XO_POOL_METADATA_BACKUPS } = require('./RemoteAdapter.js')
const { PATH_DB_DUMP } = require('./_PoolMetadataBackup.js') const { PATH_DB_DUMP } = require('./PoolMetadataBackup.js')
exports.RestoreMetadataBackup = class RestoreMetadataBackup { exports.RestoreMetadataBackup = class RestoreMetadataBackup {
constructor({ backupId, handler, xapi }) { constructor({ backupId, handler, xapi }) {

View File

@@ -1,515 +0,0 @@
'use strict'
const assert = require('assert')
const findLast = require('lodash/findLast.js')
const groupBy = require('lodash/groupBy.js')
const ignoreErrors = require('promise-toolbox/ignoreErrors')
const keyBy = require('lodash/keyBy.js')
const mapValues = require('lodash/mapValues.js')
const vhdStreamValidator = require('vhd-lib/vhdStreamValidator.js')
const { asyncMap } = require('@xen-orchestra/async-map')
const { createLogger } = require('@xen-orchestra/log')
const { decorateMethodsWith } = require('@vates/decorate-with')
const { defer } = require('golike-defer')
const { formatDateTime } = require('@xen-orchestra/xapi')
const { pipeline } = require('node:stream')
const { DeltaBackupWriter } = require('./writers/DeltaBackupWriter.js')
const { DeltaReplicationWriter } = require('./writers/DeltaReplicationWriter.js')
const { exportDeltaVm } = require('./_deltaVm.js')
const { forkStreamUnpipe } = require('./_forkStreamUnpipe.js')
const { FullBackupWriter } = require('./writers/FullBackupWriter.js')
const { FullReplicationWriter } = require('./writers/FullReplicationWriter.js')
const { getOldEntries } = require('./_getOldEntries.js')
const { Task } = require('./Task.js')
const { watchStreamSize } = require('./_watchStreamSize.js')
const { debug, warn } = createLogger('xo:backups:VmBackup')
class AggregateError extends Error {
constructor(errors, message) {
super(message)
this.errors = errors
}
}
const asyncEach = async (iterable, fn, thisArg = iterable) => {
for (const item of iterable) {
await fn.call(thisArg, item)
}
}
const forkDeltaExport = deltaExport =>
Object.create(deltaExport, {
streams: {
value: mapValues(deltaExport.streams, forkStreamUnpipe),
},
})
const noop = Function.prototype
class VmBackup {
constructor({
config,
getSnapshotNameLabel,
healthCheckSr,
job,
remoteAdapters,
remotes,
schedule,
settings,
srs,
throttleStream,
vm,
}) {
if (vm.other_config['xo:backup:job'] === job.id && 'start' in vm.blocked_operations) {
// don't match replicated VMs created by this very job otherwise they
// will be replicated again and again
throw new Error('cannot backup a VM created by this very job')
}
this.config = config
this.job = job
this.remoteAdapters = remoteAdapters
this.scheduleId = schedule.id
this.timestamp = undefined
// VM currently backed up
this.vm = vm
const { tags } = this.vm
// VM (snapshot) that is really exported
this.exportedVm = undefined
this._fullVdisRequired = undefined
this._getSnapshotNameLabel = getSnapshotNameLabel
this._isDelta = job.mode === 'delta'
this._healthCheckSr = healthCheckSr
this._jobId = job.id
this._jobSnapshots = undefined
this._throttleStream = throttleStream
this._xapi = vm.$xapi
// Base VM for the export
this._baseVm = undefined
// Settings for this specific run (job, schedule, VM)
if (tags.includes('xo-memory-backup')) {
settings.checkpointSnapshot = true
}
if (tags.includes('xo-offline-backup')) {
settings.offlineSnapshot = true
}
this._settings = settings
// Create writers
{
const writers = new Set()
this._writers = writers
const [BackupWriter, ReplicationWriter] = this._isDelta
? [DeltaBackupWriter, DeltaReplicationWriter]
: [FullBackupWriter, FullReplicationWriter]
const allSettings = job.settings
Object.keys(remoteAdapters).forEach(remoteId => {
const targetSettings = {
...settings,
...allSettings[remoteId],
}
if (targetSettings.exportRetention !== 0) {
writers.add(new BackupWriter({ backup: this, remoteId, settings: targetSettings }))
}
})
srs.forEach(sr => {
const targetSettings = {
...settings,
...allSettings[sr.uuid],
}
if (targetSettings.copyRetention !== 0) {
writers.add(new ReplicationWriter({ backup: this, sr, settings: targetSettings }))
}
})
}
}
// calls fn for each function, warns of any errors, and throws only if there are no writers left
async _callWriters(fn, step, parallel = true) {
const writers = this._writers
const n = writers.size
if (n === 0) {
return
}
async function callWriter(writer) {
const { name } = writer.constructor
try {
debug('writer step starting', { step, writer: name })
await fn(writer)
debug('writer step succeeded', { duration: step, writer: name })
} catch (error) {
writers.delete(writer)
warn('writer step failed', { error, step, writer: name })
// these two steps are the only one that are not already in their own sub tasks
if (step === 'writer.checkBaseVdis()' || step === 'writer.beforeBackup()') {
Task.warning(
`the writer ${name} has failed the step ${step} with error ${error.message}. It won't be used anymore in this job execution.`
)
}
throw error
}
}
if (n === 1) {
const [writer] = writers
return callWriter(writer)
}
const errors = []
await (parallel ? asyncMap : asyncEach)(writers, async function (writer) {
try {
await callWriter(writer)
} catch (error) {
errors.push(error)
}
})
if (writers.size === 0) {
throw new AggregateError(errors, 'all targets have failed, step: ' + step)
}
}
// ensure the VM itself does not have any backup metadata which would be
// copied on manual snapshots and interfere with the backup jobs
async _cleanMetadata() {
const { vm } = this
if ('xo:backup:job' in vm.other_config) {
await vm.update_other_config({
'xo:backup:datetime': null,
'xo:backup:deltaChainLength': null,
'xo:backup:exported': null,
'xo:backup:job': null,
'xo:backup:schedule': null,
'xo:backup:vm': null,
})
}
}
async _snapshot() {
const { vm } = this
const xapi = this._xapi
const settings = this._settings
const doSnapshot =
settings.unconditionalSnapshot ||
this._isDelta ||
(!settings.offlineBackup && vm.power_state === 'Running') ||
settings.snapshotRetention !== 0
if (doSnapshot) {
await Task.run({ name: 'snapshot' }, async () => {
if (!settings.bypassVdiChainsCheck) {
await vm.$assertHealthyVdiChains()
}
const snapshotRef = await vm[settings.checkpointSnapshot ? '$checkpoint' : '$snapshot']({
ignoreNobakVdis: true,
name_label: this._getSnapshotNameLabel(vm),
unplugVusbs: true,
})
this.timestamp = Date.now()
await xapi.setFieldEntries('VM', snapshotRef, 'other_config', {
'xo:backup:datetime': formatDateTime(this.timestamp),
'xo:backup:job': this._jobId,
'xo:backup:schedule': this.scheduleId,
'xo:backup:vm': vm.uuid,
})
this.exportedVm = await xapi.getRecord('VM', snapshotRef)
return this.exportedVm.uuid
})
} else {
this.exportedVm = vm
this.timestamp = Date.now()
}
}
async _copyDelta() {
const { exportedVm } = this
const baseVm = this._baseVm
const fullVdisRequired = this._fullVdisRequired
const isFull = fullVdisRequired === undefined || fullVdisRequired.size !== 0
await this._callWriters(writer => writer.prepare({ isFull }), 'writer.prepare()')
const deltaExport = await exportDeltaVm(exportedVm, baseVm, {
fullVdisRequired,
})
// since NBD is network based, if one disk use nbd , all the disk use them
// except the suspended VDI
if (Object.values(deltaExport.streams).some(({ _nbd }) => _nbd)) {
Task.info('Transfer data using NBD')
}
const sizeContainers = mapValues(deltaExport.streams, stream => watchStreamSize(stream))
if (this._settings.validateVhdStreams) {
deltaExport.streams = mapValues(deltaExport.streams, stream => pipeline(stream, vhdStreamValidator, noop))
}
deltaExport.streams = mapValues(deltaExport.streams, this._throttleStream)
const timestamp = Date.now()
await this._callWriters(
writer =>
writer.transfer({
deltaExport: forkDeltaExport(deltaExport),
sizeContainers,
timestamp,
}),
'writer.transfer()'
)
this._baseVm = exportedVm
if (baseVm !== undefined) {
await exportedVm.update_other_config(
'xo:backup:deltaChainLength',
String(+(baseVm.other_config['xo:backup:deltaChainLength'] ?? 0) + 1)
)
}
// not the case if offlineBackup
if (exportedVm.is_a_snapshot) {
await exportedVm.update_other_config('xo:backup:exported', 'true')
}
const size = Object.values(sizeContainers).reduce((sum, { size }) => sum + size, 0)
const end = Date.now()
const duration = end - timestamp
debug('transfer complete', {
duration,
speed: duration !== 0 ? (size * 1e3) / 1024 / 1024 / duration : 0,
size,
})
await this._callWriters(writer => writer.cleanup(), 'writer.cleanup()')
}
async _copyFull() {
const { compression } = this.job
const stream = this._throttleStream(
await this._xapi.VM_export(this.exportedVm.$ref, {
compress: Boolean(compression) && (compression === 'native' ? 'gzip' : 'zstd'),
useSnapshot: false,
})
)
const sizeContainer = watchStreamSize(stream)
const timestamp = Date.now()
await this._callWriters(
writer =>
writer.run({
sizeContainer,
stream: forkStreamUnpipe(stream),
timestamp,
}),
'writer.run()'
)
const { size } = sizeContainer
const end = Date.now()
const duration = end - timestamp
debug('transfer complete', {
duration,
speed: duration !== 0 ? (size * 1e3) / 1024 / 1024 / duration : 0,
size,
})
}
async _fetchJobSnapshots() {
const jobId = this._jobId
const vmRef = this.vm.$ref
const xapi = this._xapi
const snapshotsRef = await xapi.getField('VM', vmRef, 'snapshots')
const snapshotsOtherConfig = await asyncMap(snapshotsRef, ref => xapi.getField('VM', ref, 'other_config'))
const snapshots = []
snapshotsOtherConfig.forEach((other_config, i) => {
if (other_config['xo:backup:job'] === jobId) {
snapshots.push({ other_config, $ref: snapshotsRef[i] })
}
})
snapshots.sort((a, b) => (a.other_config['xo:backup:datetime'] < b.other_config['xo:backup:datetime'] ? -1 : 1))
this._jobSnapshots = snapshots
}
async _removeUnusedSnapshots() {
const allSettings = this.job.settings
const baseSettings = this._baseSettings
const baseVmRef = this._baseVm?.$ref
const snapshotsPerSchedule = groupBy(this._jobSnapshots, _ => _.other_config['xo:backup:schedule'])
const xapi = this._xapi
await asyncMap(Object.entries(snapshotsPerSchedule), ([scheduleId, snapshots]) => {
const settings = {
...baseSettings,
...allSettings[scheduleId],
...allSettings[this.vm.uuid],
}
return asyncMap(getOldEntries(settings.snapshotRetention, snapshots), ({ $ref }) => {
if ($ref !== baseVmRef) {
return xapi.VM_destroy($ref)
}
})
})
}
async _selectBaseVm() {
const xapi = this._xapi
let baseVm = findLast(this._jobSnapshots, _ => 'xo:backup:exported' in _.other_config)
if (baseVm === undefined) {
debug('no base VM found')
return
}
const fullInterval = this._settings.fullInterval
const deltaChainLength = +(baseVm.other_config['xo:backup:deltaChainLength'] ?? 0) + 1
if (!(fullInterval === 0 || fullInterval > deltaChainLength)) {
debug('not using base VM becaust fullInterval reached')
return
}
const srcVdis = keyBy(await xapi.getRecords('VDI', await this.vm.$getDisks()), '$ref')
// resolve full record
baseVm = await xapi.getRecord('VM', baseVm.$ref)
const baseUuidToSrcVdi = new Map()
await asyncMap(await baseVm.$getDisks(), async baseRef => {
const [baseUuid, snapshotOf] = await Promise.all([
xapi.getField('VDI', baseRef, 'uuid'),
xapi.getField('VDI', baseRef, 'snapshot_of'),
])
const srcVdi = srcVdis[snapshotOf]
if (srcVdi !== undefined) {
baseUuidToSrcVdi.set(baseUuid, srcVdi)
} else {
debug('ignore snapshot VDI because no longer present on VM', {
vdi: baseUuid,
})
}
})
const presentBaseVdis = new Map(baseUuidToSrcVdi)
await this._callWriters(
writer => presentBaseVdis.size !== 0 && writer.checkBaseVdis(presentBaseVdis, baseVm),
'writer.checkBaseVdis()',
false
)
if (presentBaseVdis.size === 0) {
debug('no base VM found')
return
}
const fullVdisRequired = new Set()
baseUuidToSrcVdi.forEach((srcVdi, baseUuid) => {
if (presentBaseVdis.has(baseUuid)) {
debug('found base VDI', {
base: baseUuid,
vdi: srcVdi.uuid,
})
} else {
debug('missing base VDI', {
base: baseUuid,
vdi: srcVdi.uuid,
})
fullVdisRequired.add(srcVdi.uuid)
}
})
this._baseVm = baseVm
this._fullVdisRequired = fullVdisRequired
}
async _healthCheck() {
const settings = this._settings
if (this._healthCheckSr === undefined) {
return
}
// check if current VM has tags
const { tags } = this.vm
const intersect = settings.healthCheckVmsWithTags.some(t => tags.includes(t))
if (settings.healthCheckVmsWithTags.length !== 0 && !intersect) {
return
}
await this._callWriters(writer => writer.healthCheck(this._healthCheckSr), 'writer.healthCheck()')
}
async run($defer) {
const settings = this._settings
assert(
!settings.offlineBackup || settings.snapshotRetention === 0,
'offlineBackup is not compatible with snapshotRetention'
)
await this._callWriters(async writer => {
await writer.beforeBackup()
$defer(async () => {
await writer.afterBackup()
})
}, 'writer.beforeBackup()')
await this._fetchJobSnapshots()
if (this._isDelta) {
await this._selectBaseVm()
}
await this._cleanMetadata()
await this._removeUnusedSnapshots()
const { vm } = this
const isRunning = vm.power_state === 'Running'
const startAfter = isRunning && (settings.offlineBackup ? 'backup' : settings.offlineSnapshot && 'snapshot')
if (startAfter) {
await vm.$callAsync('clean_shutdown')
}
try {
await this._snapshot()
if (startAfter === 'snapshot') {
ignoreErrors.call(vm.$callAsync('start', false, false))
}
if (this._writers.size !== 0) {
await (this._isDelta ? this._copyDelta() : this._copyFull())
}
} finally {
if (startAfter) {
ignoreErrors.call(vm.$callAsync('start', false, false))
}
await this._fetchJobSnapshots()
await this._removeUnusedSnapshots()
}
await this._healthCheck()
}
}
exports.VmBackup = VmBackup
decorateMethodsWith(VmBackup, {
run: defer,
})

View File

@@ -0,0 +1,51 @@
'use strict'
const Disposable = require('promise-toolbox/Disposable')
const pTimeout = require('promise-toolbox/timeout')
const { compileTemplate } = require('@xen-orchestra/template')
const { RemoteTimeoutError } = require('./RemoteTimeoutError.js')
const { runTask } = require('./runTask.js')
exports.DEFAULT_SETTINGS = {
getRemoteTimeout: 300e3,
reportWhen: 'failure',
}
exports.AbstractBackupJob = class AbstractBackupJob {
constructor({ config, getAdapter, getConnectedRecord, job, schedule }) {
this._config = config
this._getRecord = getConnectedRecord
this._job = job
this._schedule = schedule
this._getSnapshotNameLabel = compileTemplate(config.snapshotNameLabelTpl, {
'{job.name}': job.name,
'{vm.name_label}': vm => vm.name_label,
})
const baseSettings = this._computeBaseSettings(config, job)
this._baseSettings = baseSettings
this._settings = { ...baseSettings, ...job.settings[schedule.id] }
const { getRemoteTimeout } = this._settings
this._getAdapter = async function (remoteId) {
try {
const disposable = await pTimeout.call(getAdapter(remoteId), getRemoteTimeout, new RemoteTimeoutError(remoteId))
return new Disposable(() => disposable.dispose(), {
adapter: disposable.value,
remoteId,
})
} catch (error) {
// See https://github.com/vatesfr/xen-orchestra/commit/6aa6cfba8ec939c0288f0fa740f6dfad98c43cbb
runTask(
{
name: 'get remote adapter',
data: { type: 'remote', id: remoteId },
},
() => Promise.reject(error)
)
}
}
}
}

View File

@@ -0,0 +1,134 @@
'use strict'
const { asyncMap } = require('@xen-orchestra/async-map')
const Disposable = require('promise-toolbox/Disposable')
const ignoreErrors = require('promise-toolbox/ignoreErrors')
const { extractIdsFromSimplePattern } = require('../extractIdsFromSimplePattern.js')
const { PoolMetadataBackup } = require('../PoolMetadataBackup.js')
const { XoMetadataBackup } = require('./XoMetadataBackup.js')
const { DEFAULT_SETTINGS, AbstractBackupJob } = require('./AbstractBackupJob.js')
const { runTask } = require('./runTask.js')
const { getAdaptersByRemote } = require('./getAdapterByRemote.js')
const DEFAULT_METADATA_SETTINGS = {
retentionPoolMetadata: 0,
retentionXoMetadata: 0,
}
exports.MetadatasBackupJob = class MetadatasBackupJob extends AbstractBackupJob {
_computeBaseSettings(config, job) {
const baseSettings = { ...DEFAULT_SETTINGS }
Object.assign(baseSettings, DEFAULT_METADATA_SETTINGS, config.defaultSettings, config.metadata?.defaultSettings)
Object.assign(baseSettings, job.settings[''])
return baseSettings
}
async run() {
const schedule = this._schedule
const job = this._job
const remoteIds = extractIdsFromSimplePattern(job.remotes)
if (remoteIds.length === 0) {
throw new Error('metadata backup job cannot run without remotes')
}
const config = this._config
const poolIds = extractIdsFromSimplePattern(job.pools)
const isEmptyPools = poolIds.length === 0
const isXoMetadata = job.xoMetadata !== undefined
if (!isXoMetadata && isEmptyPools) {
throw new Error('no metadata mode found')
}
const settings = this._settings
const { retentionPoolMetadata, retentionXoMetadata } = settings
if (
(retentionPoolMetadata === 0 && retentionXoMetadata === 0) ||
(!isXoMetadata && retentionPoolMetadata === 0) ||
(isEmptyPools && retentionXoMetadata === 0)
) {
throw new Error('no retentions corresponding to the metadata modes found')
}
await Disposable.use(
Disposable.all(
poolIds.map(id =>
this._getRecord('pool', id).catch(error => {
// See https://github.com/vatesfr/xen-orchestra/commit/6aa6cfba8ec939c0288f0fa740f6dfad98c43cbb
runTask(
{
name: 'get pool record',
data: { type: 'pool', id },
},
() => Promise.reject(error)
)
})
)
),
Disposable.all(remoteIds.map(id => this._getAdapter(id))),
async (pools, remoteAdapters) => {
// remove adapters that failed (already handled)
remoteAdapters = remoteAdapters.filter(_ => _ !== undefined)
if (remoteAdapters.length === 0) {
return
}
remoteAdapters = getAdaptersByRemote(remoteAdapters)
// remove pools that failed (already handled)
pools = pools.filter(_ => _ !== undefined)
const promises = []
if (pools.length !== 0 && settings.retentionPoolMetadata !== 0) {
promises.push(
asyncMap(pools, async pool =>
runTask(
{
name: `Starting metadata backup for the pool (${pool.$id}). (${job.id})`,
data: {
id: pool.$id,
pool,
poolMaster: await ignoreErrors.call(pool.$xapi.getRecord('host', pool.master)),
type: 'pool',
},
},
() =>
new PoolMetadataBackup({
config,
job,
pool,
remoteAdapters,
schedule,
settings,
}).run()
)
)
)
}
if (job.xoMetadata !== undefined && settings.retentionXoMetadata !== 0) {
promises.push(
runTask(
{
name: `Starting XO metadata backup. (${job.id})`,
data: {
type: 'xo',
},
},
() =>
new XoMetadataBackup({
config,
job,
remoteAdapters,
schedule,
settings,
}).run()
)
)
}
await Promise.all(promises)
}
)
}
}

View File

@@ -0,0 +1,7 @@
'use strict'
exports.RemoteTimeoutError = class RemoteTimeoutError extends Error {
constructor(remoteId) {
super('timeout while getting the remote ' + remoteId)
this.remoteId = remoteId
}
}

View File

@@ -0,0 +1,107 @@
'use strict'
const { asyncMapSettled, asyncMap } = require('@xen-orchestra/async-map')
const Disposable = require('promise-toolbox/Disposable')
const { limitConcurrency } = require('limit-concurrency-decorator')
const { extractIdsFromSimplePattern } = require('../extractIdsFromSimplePattern.js')
const { Task } = require('../Task.js')
const createStreamThrottle = require('./createStreamThrottle.js')
const { DEFAULT_SETTINGS, AbstractBackupJob } = require('./AbstractBackupJob.js')
const { runTask } = require('./runTask.js')
const { getAdaptersByRemote } = require('./getAdapterByRemote.js')
const { IncrementalRemoteVmBackup } = require('./VmBackup/IncrementalRemoteVmBackup.js')
const { FullRemoteVmBackup } = require('./VmBackup/FullRemoteVmBackup.js')
const DEFAULT_REMOTE_VM_SETTINGS = {
concurrency: 2,
copyRetention: 0,
deleteFirst: false,
exportRetention: 0,
fullInterval: 0,
healthCheckSr: undefined,
healthCheckVmsWithTags: [],
maxExportRate: 0,
maxMergedDeltasPerRun: Infinity,
timeout: 0,
validateVhdStreams: false,
vmTimeout: 0,
}
exports.RemoteVmBackupJob = class RemoteVmBackupJob extends AbstractBackupJob {
_computeBaseSettings(config, job) {
const baseSettings = { ...DEFAULT_SETTINGS }
Object.assign(baseSettings, DEFAULT_REMOTE_VM_SETTINGS, config.defaultSettings, config.vm?.defaultSettings)
Object.assign(baseSettings, job.settings[''])
return baseSettings
}
async run() {
const job = this._job
const schedule = this._schedule
const settings = this._settings
const throttleStream = createStreamThrottle(settings.maxExportRate)
const config = this._config
await Disposable.use(
() => this._getAdapter(job.sourceRemote),
() => (settings.healthCheckSr !== undefined ? this._getRecord('SR', settings.healthCheckSr) : undefined),
Disposable.all(
extractIdsFromSimplePattern(job.remotes).map(id => id !== job.sourceRemote && this._getAdapter(id))
),
async ({ adapter: sourceRemoteAdapter }, healthCheckSr, remoteAdapters) => {
// remove adapters that failed (already handled)
remoteAdapters = remoteAdapters.filter(_ => !!_)
if (remoteAdapters.length === 0) {
return
}
const vmsUuids = []
await asyncMap(await sourceRemoteAdapter._handler.list('xo-vm-backups'), async entry => {
// ignore hidden and lock files
if (entry[0] !== '.' && !entry.endsWith('.lock')) {
vmsUuids.push(entry)
}
})
Task.info('vms', { vms: vmsUuids })
remoteAdapters = getAdaptersByRemote(remoteAdapters)
const allSettings = this._job.settings
const baseSettings = this._baseSettings
const handleVm = vmUuid => {
const taskStart = { name: 'backup VM', data: { type: 'VM', id: vmUuid } }
const opts = {
baseSettings,
config,
job,
healthCheckSr,
remoteAdapters,
schedule,
settings: { ...settings, ...allSettings[vmUuid] },
sourceRemoteAdapter,
throttleStream,
vmUuid,
}
let vmBackup
if (job.mode === 'delta') {
vmBackup = new IncrementalRemoteVmBackup(opts)
} else {
if (job.mode === 'full') {
vmBackup = new FullRemoteVmBackup(opts)
} else {
throw new Error(`Job mode ${job.mode} not implemented`)
}
}
return runTask(taskStart, () => vmBackup.run())
}
const { concurrency } = settings
await asyncMapSettled(vmsUuids, !concurrency ? handleVm : limitConcurrency(concurrency)(handleVm))
}
)
}
}

View File

@@ -0,0 +1,100 @@
'use strict'
const { AbstractVmBackup } = require('./AbstractVmBackup')
const { getVmBackupDir } = require('../../_getVmBackupDir')
const { decorateMethodsWith } = require('@vates/decorate-with')
const { defer } = require('golike-defer')
class AbstractRemoteVmBackup extends AbstractVmBackup {
constructor({
config,
job,
healthCheckSr,
remoteAdapters,
schedule,
settings,
sourceRemoteAdapter,
throttleStream,
vmUuid,
}) {
super()
this.config = config
this.job = job
this.remoteAdapters = remoteAdapters
this.scheduleId = schedule.id
this.timestamp = undefined
// the vm object is used in writers
// remoteWriter only need vm.uuid
// @todo : how to do better ?
// missing tags for healthcheck
this.vm = { uuid: vmUuid }
this._healthCheckSr = healthCheckSr
this._sourceRemoteAdapter = sourceRemoteAdapter
this._throttleStream = throttleStream
const allSettings = job.settings
const writers = new Set()
this._writers = writers
const RemoteWriter = this._getRemoteWriter()
Object.keys(remoteAdapters).forEach(remoteId => {
const targetSettings = {
...settings,
...allSettings[remoteId],
}
if (targetSettings.exportRetention !== 0) {
writers.add(new RemoteWriter({ backup: this, remoteId, settings: targetSettings }))
}
})
}
async _computeTransferList(predicate) {
const vmBackups = await this._sourceRemoteAdapter.listVmBackups(this.vm.uuid, predicate)
const localMetada = {}
Object.values(vmBackups).forEach(metadata => {
const timestamp = metadata.timestamp
localMetada[timestamp] = metadata
})
const nbRemotes = Object.keys(this.remoteAdapters).length
const remoteMetadatas = {}
await Promise.all(
Object.values(this.remoteAdapters).map(async remoteAdapter => {
const remoteMetadata = await remoteAdapter.listVmBackups(this.vm.uuid, predicate)
remoteMetadata.forEach(metadata => {
const timestamp = metadata.timestamp
remoteMetadatas[timestamp] = (remoteMetadatas[timestamp] ?? 0) + 1
})
})
)
let chain = []
for (const timestamp in localMetada) {
if (remoteMetadatas[timestamp] !== nbRemotes) {
// this backup is not present in all the remote
// should be retransfered if not found later
chain.push(localMetada[timestamp])
} else {
// backup is present in local and remote : the chain has already been transferred
chain = []
}
}
return chain
}
async run($defer) {
const handler = this._sourceRemoteAdapter._handler
const sourceLock = await handler.lock(getVmBackupDir(this.vm.uuid))
$defer(async () => {
sourceLock.dispose()
})
await this._run()
}
}
exports.AbstractRemoteVmBackup = AbstractRemoteVmBackup
decorateMethodsWith(AbstractRemoteVmBackup, {
run: defer,
})

View File

@@ -0,0 +1,90 @@
'use strict'
const { createLogger } = require('@xen-orchestra/log')
const { Task } = require('../../Task')
const { asyncMap } = require('@xen-orchestra/async-map')
const { debug, warn } = createLogger('xo:backups:AbstractVmBackup')
class AggregateError extends Error {
constructor(errors, message) {
super(message)
this.errors = errors
}
}
const asyncEach = async (iterable, fn, thisArg = iterable) => {
for (const item of iterable) {
await fn.call(thisArg, item)
}
}
class AbstractVmBackup {
// calls fn for each function, warns of any errors, and throws only if there are no writers left
async _callWriters(fn, step, parallel = true) {
const writers = this._writers
const n = writers.size
if (n === 0) {
return
}
async function callWriter(writer) {
const { name } = writer.constructor
try {
debug('writer step starting', { step, writer: name })
await fn(writer)
debug('writer step succeeded', { duration: step, writer: name })
} catch (error) {
writers.delete(writer)
warn('writer step failed', { error, step, writer: name })
// these two steps are the only one that are not already in their own sub tasks
if (step === 'writer.checkBaseVdis()' || step === 'writer.beforeBackup()') {
Task.warning(
`the writer ${name} has failed the step ${step} with error ${error.message}. It won't be used anymore in this job execution.`
)
}
throw error
}
}
if (n === 1) {
const [writer] = writers
return callWriter(writer)
}
const errors = []
await (parallel ? asyncMap : asyncEach)(writers, async function (writer) {
try {
await callWriter(writer)
} catch (error) {
errors.push(error)
}
})
if (writers.size === 0) {
throw new AggregateError(errors, 'all targets have failed, step: ' + step)
}
}
async _healthCheck() {
const settings = this._settings
if (this._healthCheckSr === undefined) {
return
}
// check if current VM has tags
const { tags } = this.vm
const intersect = settings.healthCheckVmsWithTags.some(t => tags.includes(t))
if (settings.healthCheckVmsWithTags.length !== 0 && !intersect) {
return
}
await this._callWriters(writer => writer.healthCheck(this._healthCheckSr), 'writer.healthCheck()')
}
async run() {
throw new Error('not implemented')
}
}
exports.AbstractVmBackup = AbstractVmBackup

View File

@@ -0,0 +1,258 @@
'use strict'
const assert = require('assert')
const ignoreErrors = require('promise-toolbox/ignoreErrors')
const groupBy = require('lodash/groupBy.js')
const { asyncMap } = require('@xen-orchestra/async-map')
const { decorateMethodsWith } = require('@vates/decorate-with')
const { defer } = require('golike-defer')
const { formatDateTime } = require('@xen-orchestra/xapi')
const { getOldEntries } = require('./writers/_getOldEntries.js')
const { Task } = require('../../Task.js')
const { AbstractVmBackup } = require('./AbstractVmBackup.js')
class AbstractXapiVmBackup extends AbstractVmBackup {
constructor({
config,
getSnapshotNameLabel,
healthCheckSr,
job,
remoteAdapters,
schedule,
settings,
srs,
throttleStream,
vm,
}) {
super()
if (vm.other_config['xo:backup:job'] === job.id && 'start' in vm.blocked_operations) {
// don't match replicated VMs created by this very job otherwise they
// will be replicated again and again
throw new Error('cannot backup a VM created by this very job')
}
this.config = config
this.job = job
this.remoteAdapters = remoteAdapters
this.scheduleId = schedule.id
this.timestamp = undefined
// VM currently backed up
this.vm = vm
const { tags } = this.vm
// VM (snapshot) that is really exported
this.exportedVm = undefined
this._fullVdisRequired = undefined
this._getSnapshotNameLabel = getSnapshotNameLabel
this._healthCheckSr = healthCheckSr
this._jobId = job.id
this._jobSnapshots = undefined
this._throttleStream = throttleStream
this._xapi = vm.$xapi
// Reference VM for the incremental export
// if possible we willonly export the difference between thie vm and now
this._vmComparisonBasis = undefined
// Settings for this specific run (job, schedule, VM)
if (tags.includes('xo-memory-backup')) {
settings.checkpointSnapshot = true
}
if (tags.includes('xo-offline-backup')) {
settings.offlineSnapshot = true
}
this._settings = settings
// Create writers
{
const writers = new Set()
this._writers = writers
const [RemoteWriter, XapiWriter] = this._getWriters()
const allSettings = job.settings
Object.keys(remoteAdapters).forEach(remoteId => {
const targetSettings = {
...settings,
...allSettings[remoteId],
}
if (targetSettings.exportRetention !== 0) {
writers.add(new RemoteWriter({ backup: this, remoteId, settings: targetSettings }))
}
})
srs.forEach(sr => {
const targetSettings = {
...settings,
...allSettings[sr.uuid],
}
if (targetSettings.copyRetention !== 0) {
writers.add(new XapiWriter({ backup: this, sr, settings: targetSettings }))
}
})
}
}
// ensure the VM itself does not have any backup metadata which would be
// copied on manual snapshots and interfere with the backup jobs
async _cleanMetadata() {
const { vm } = this
if ('xo:backup:job' in vm.other_config) {
await vm.update_other_config({
'xo:backup:datetime': null,
'xo:backup:deltaChainLength': null,
'xo:backup:exported': null,
'xo:backup:job': null,
'xo:backup:schedule': null,
'xo:backup:vm': null,
})
}
}
async _snapshot() {
const { vm } = this
const xapi = this._xapi
const settings = this._settings
if (this._mustDoSnapshot()) {
await Task.run({ name: 'snapshot' }, async () => {
if (!settings.bypassVdiChainsCheck) {
await vm.$assertHealthyVdiChains()
}
const snapshotRef = await vm[settings.checkpointSnapshot ? '$checkpoint' : '$snapshot']({
ignoreNobakVdis: true,
name_label: this._getSnapshotNameLabel(vm),
unplugVusbs: true,
})
this.timestamp = Date.now()
await xapi.setFieldEntries('VM', snapshotRef, 'other_config', {
'xo:backup:datetime': formatDateTime(this.timestamp),
'xo:backup:job': this._jobId,
'xo:backup:schedule': this.scheduleId,
'xo:backup:vm': vm.uuid,
})
this.exportedVm = await xapi.getRecord('VM', snapshotRef)
return this.exportedVm.uuid
})
} else {
this.exportedVm = vm
this.timestamp = Date.now()
}
}
async _fetchJobSnapshots() {
const jobId = this._jobId
const vmRef = this.vm.$ref
const xapi = this._xapi
const snapshotsRef = await xapi.getField('VM', vmRef, 'snapshots')
const snapshotsOtherConfig = await asyncMap(snapshotsRef, ref => xapi.getField('VM', ref, 'other_config'))
const snapshots = []
snapshotsOtherConfig.forEach((other_config, i) => {
if (other_config['xo:backup:job'] === jobId) {
snapshots.push({ other_config, $ref: snapshotsRef[i] })
}
})
snapshots.sort((a, b) => (a.other_config['xo:backup:datetime'] < b.other_config['xo:backup:datetime'] ? -1 : 1))
this._jobSnapshots = snapshots
}
async _removeUnusedSnapshots() {
const allSettings = this.job.settings
const baseSettings = this._baseSettings
const vmComparisonBasisRef = this._vmComparisonBasis?.$ref
const snapshotsPerSchedule = groupBy(this._jobSnapshots, _ => _.other_config['xo:backup:schedule'])
const xapi = this._xapi
await asyncMap(Object.entries(snapshotsPerSchedule), ([scheduleId, snapshots]) => {
const settings = {
...baseSettings,
...allSettings[scheduleId],
...allSettings[this.vm.uuid],
}
return asyncMap(getOldEntries(settings.snapshotRetention, snapshots), ({ $ref }) => {
if ($ref !== vmComparisonBasisRef) {
return xapi.VM_destroy($ref)
}
})
})
}
async copy() {
throw new Error('Not implemented')
}
_getWriters() {
throw new Error('Not implemented')
}
_mustDoSnapshot() {
throw new Error('Not implemented')
}
async _selectBaseVm() {
throw new Error('Not implemented')
}
async run($defer) {
const settings = this._settings
assert(
!settings.offlineBackup || settings.snapshotRetention === 0,
'offlineBackup is not compatible with snapshotRetention'
)
await this._callWriters(async writer => {
await writer.beforeBackup()
$defer(async () => {
await writer.afterBackup()
})
}, 'writer.beforeBackup()')
await this._fetchJobSnapshots()
// will only do something during incremental Backup
await this._selectBaseVm()
await this._cleanMetadata()
await this._removeUnusedSnapshots()
const { vm } = this
const isRunning = vm.power_state === 'Running'
const startAfter = isRunning && (settings.offlineBackup ? 'backup' : settings.offlineSnapshot && 'snapshot')
if (startAfter) {
await vm.$callAsync('clean_shutdown')
}
try {
await this._snapshot()
if (startAfter === 'snapshot') {
ignoreErrors.call(vm.$callAsync('start', false, false))
}
if (this._writers.size !== 0) {
await this._copy()
}
} finally {
if (startAfter) {
ignoreErrors.call(vm.$callAsync('start', false, false))
}
await this._fetchJobSnapshots()
await this._removeUnusedSnapshots()
}
await this._healthCheck()
}
}
exports.AbstractXapiVmBackup = AbstractXapiVmBackup
decorateMethodsWith(AbstractXapiVmBackup, {
run: defer,
})

View File

@@ -0,0 +1,43 @@
'use strict'
const { decorateMethodsWith } = require('@vates/decorate-with')
const { defer } = require('golike-defer')
const { AbstractRemoteVmBackup } = require('./AbstractRemoteVmBackup')
const { FullRemoteWriter } = require('./writers/FullRemoteWriter')
const { forkStreamUnpipe } = require('../forkStreamUnpipe')
const FullRemoteVmBackup = class FullRemoteVmBackup extends AbstractRemoteVmBackup {
_getRemoteWriter() {
return FullRemoteWriter
}
async _run($defer) {
const transferList = await this._computeTransferList(({ mode }) => mode === 'full')
await this._callWriters(async writer => {
await writer.beforeBackup()
$defer(async () => {
await writer.afterBackup()
})
}, 'writer.beforeBackup()')
for (const metadata of transferList) {
const stream = await this._sourceRemoteAdapter.readFullVmBackup(metadata)
// @todo should skip if backup is already there (success on only one remote)
await this._callWriters(
writer =>
writer.run({
stream: forkStreamUnpipe(stream),
timestamp: metadata.timestamp,
vm: metadata.vm,
vmSnapshot: metadata.vmSnapshot,
}),
'writer.run()'
)
}
}
}
exports.FullRemoteVmBackup = FullRemoteVmBackup
decorateMethodsWith(FullRemoteVmBackup, {
_run: defer,
})

View File

@@ -0,0 +1,63 @@
'use strict'
const { createLogger } = require('@xen-orchestra/log')
const { forkStreamUnpipe } = require('../forkStreamUnpipe.js')
const { watchStreamSize } = require('../../_watchStreamSize.js')
const { FullRemoteWriter } = require('./writers/FullRemoteWriter.js')
const { FullXapiWriter } = require('./writers/FullXapiWriter.js')
const { AbstractXapiVmBackup } = require('./AbstractXapiVMBackup.js')
const { debug } = createLogger('xo:backups:FullVmBackup')
class FullXapiVmBackup extends AbstractXapiVmBackup {
_getWriters() {
return [FullRemoteWriter, FullXapiWriter]
}
_mustDoSnapshot() {
const { vm } = this
const settings = this._settings
return (
settings.unconditionalSnapshot ||
(!settings.offlineBackup && vm.power_state === 'Running') ||
settings.snapshotRetention !== 0
)
}
_selectBaseVm() {}
async _copy() {
const { compression } = this.job
const stream = this._throttleStream(
await this._xapi.VM_export(this.exportedVm.$ref, {
compress: Boolean(compression) && (compression === 'native' ? 'gzip' : 'zstd'),
useSnapshot: false,
})
)
const sizeContainer = watchStreamSize(stream)
const timestamp = Date.now()
await this._callWriters(
writer =>
writer.run({
sizeContainer,
stream: forkStreamUnpipe(stream),
timestamp,
}),
'writer.run()'
)
const { size } = sizeContainer
const end = Date.now()
const duration = end - timestamp
debug('transfer complete', {
duration,
speed: duration !== 0 ? (size * 1e3) / 1024 / 1024 / duration : 0,
size,
})
this._healthCheck()
}
}
exports.FullXapiVmBackup = FullXapiVmBackup

View File

@@ -0,0 +1,64 @@
'use strict'
const assert = require('node:assert')
const { decorateMethodsWith } = require('@vates/decorate-with')
const { defer } = require('golike-defer')
const { AbstractRemoteVmBackup } = require('./AbstractRemoteVmBackup')
const { mapValues } = require('lodash')
const { IncrementalRemoteWriter } = require('./writers/IncrementalRemoteWriter')
const { forkStreamUnpipe } = require('../forkStreamUnpipe')
const { Task } = require('../../Task')
const forkDeltaExport = deltaExport =>
Object.create(deltaExport, {
streams: {
value: mapValues(deltaExport.streams, forkStreamUnpipe),
},
})
class IncrementalRemoteVmBackup extends AbstractRemoteVmBackup {
_getRemoteWriter() {
return IncrementalRemoteWriter
}
async _run($defer) {
const transferList = await this._computeTransferList(({ mode }) => mode === 'delta')
await this._callWriters(async writer => {
await writer.beforeBackup()
$defer(async () => {
await writer.afterBackup()
})
}, 'writer.beforeBackup()')
if (transferList.length > 0) {
for (const metadata of transferList) {
assert.strictEqual(metadata.mode, 'delta')
await this._callWriters(writer => writer.prepare({ isBase: metadata.isBase }), 'writer.prepare()')
const incrementalExport = await this._sourceRemoteAdapter.readIncrementalVmBackup(metadata, undefined, {
useSynthetic: false,
})
incrementalExport.streams = mapValues(incrementalExport.streams, this._throttleStream)
await this._callWriters(
writer =>
writer.transfer({
deltaExport: forkDeltaExport(incrementalExport),
timestamp: metadata.timestamp,
vm: metadata.vm,
vmSnapshot: metadata.vmSnapshot,
}),
'writer.transfer()'
)
await this._callWriters(writer => writer.cleanup(), 'writer.cleanup()')
}
this._healthCheck()
} else {
Task.info('No new data to upload for this VM')
}
}
}
exports.IncrementalRemoteVmBackup = IncrementalRemoteVmBackup
decorateMethodsWith(IncrementalRemoteVmBackup, {
_run: defer,
})

View File

@@ -0,0 +1,173 @@
'use strict'
const findLast = require('lodash/findLast.js')
const keyBy = require('lodash/keyBy.js')
const mapValues = require('lodash/mapValues.js')
const vhdStreamValidator = require('vhd-lib/vhdStreamValidator.js')
const { asyncMap } = require('@xen-orchestra/async-map')
const { createLogger } = require('@xen-orchestra/log')
const { pipeline } = require('node:stream')
const { exportIncrementalVm } = require('../../_incrementalVm.js')
const { forkStreamUnpipe } = require('../forkStreamUnpipe.js')
const { Task } = require('../../Task.js')
const { watchStreamSize } = require('../../_watchStreamSize.js')
const { IncrementalRemoteWriter } = require('./writers/IncrementalRemoteWriter.js')
const { IncrementalXapiWriter } = require('./writers/IncrementalXapiWriter.js')
const { AbstractXapiVmBackup } = require('./AbstractXapiVMBackup.js')
const { debug } = createLogger('xo:backups:VmBackup')
const forkDeltaExport = deltaExport =>
Object.create(deltaExport, {
streams: {
value: mapValues(deltaExport.streams, forkStreamUnpipe),
},
})
const noop = Function.prototype
class IncrementalXapiVmBackup extends AbstractXapiVmBackup {
_getWriters() {
return [IncrementalRemoteWriter, IncrementalXapiWriter]
}
_mustDoSnapshot() {
return true
}
async _copy() {
const { exportedVm } = this
const vmComparisonBasis = this._vmComparisonBasis
const fullVdisRequired = this._fullVdisRequired
const isBase = fullVdisRequired === undefined || fullVdisRequired.size !== 0
await this._callWriters(writer => writer.prepare({ isBase }), 'writer.prepare()')
const incrementalExport = await exportIncrementalVm(exportedVm, vmComparisonBasis, {
fullVdisRequired,
})
// since NBD is network based, if one disk use nbd , all the disk use them
// except the suspended VDI
if (Object.values(incrementalExport.streams).some(({ _nbd }) => _nbd)) {
Task.info('Transfer data using NBD')
}
const sizeContainers = mapValues(incrementalExport.streams, stream => watchStreamSize(stream))
if (this._settings.validateVhdStreams) {
incrementalExport.streams = mapValues(incrementalExport.streams, stream =>
pipeline(stream, vhdStreamValidator, noop)
)
}
incrementalExport.streams = mapValues(incrementalExport.streams, this._throttleStream)
const timestamp = Date.now()
await this._callWriters(
writer =>
writer.transfer({
deltaExport: forkDeltaExport(incrementalExport),
sizeContainers,
timestamp,
}),
'writer.transfer()'
)
this._vmComparisonBasis = exportedVm
if (vmComparisonBasis !== undefined) {
await exportedVm.update_other_config(
'xo:backup:deltaChainLength',
String(+(vmComparisonBasis.other_config['xo:backup:deltaChainLength'] ?? 0) + 1)
)
}
// not the case if offlineBackup
if (exportedVm.is_a_snapshot) {
await exportedVm.update_other_config('xo:backup:exported', 'true')
}
const size = Object.values(sizeContainers).reduce((sum, { size }) => sum + size, 0)
const end = Date.now()
const duration = end - timestamp
debug('transfer complete', {
duration,
speed: duration !== 0 ? (size * 1e3) / 1024 / 1024 / duration : 0,
size,
})
await this._callWriters(writer => writer.cleanup(), 'writer.cleanup()')
}
async _selectBaseVm() {
const xapi = this._xapi
let baseVm = findLast(this._jobSnapshots, _ => 'xo:backup:exported' in _.other_config)
if (baseVm === undefined) {
debug('no base VM found')
return
}
const fullInterval = this._settings.fullInterval
const deltaChainLength = +(baseVm.other_config['xo:backup:deltaChainLength'] ?? 0) + 1
if (!(fullInterval === 0 || fullInterval > deltaChainLength)) {
debug('not using base VM because fullInterval reached')
return
}
const srcVdis = keyBy(await xapi.getRecords('VDI', await this.vm.$getDisks()), '$ref')
// resolve full record
baseVm = await xapi.getRecord('VM', baseVm.$ref)
const baseUuidToSrcVdi = new Map()
await asyncMap(await baseVm.$getDisks(), async baseRef => {
const [baseUuid, snapshotOf] = await Promise.all([
xapi.getField('VDI', baseRef, 'uuid'),
xapi.getField('VDI', baseRef, 'snapshot_of'),
])
const srcVdi = srcVdis[snapshotOf]
if (srcVdi !== undefined) {
baseUuidToSrcVdi.set(baseUuid, srcVdi)
} else {
debug('ignore snapshot VDI because no longer present on VM', {
vdi: baseUuid,
})
}
})
const presentBaseVdis = new Map(baseUuidToSrcVdi)
await this._callWriters(
writer => presentBaseVdis.size !== 0 && writer.checkBaseVdis(presentBaseVdis, baseVm),
'writer.checkBaseVdis()',
false
)
if (presentBaseVdis.size === 0) {
debug('no base VM found')
return
}
const fullVdisRequired = new Set()
baseUuidToSrcVdi.forEach((srcVdi, baseUuid) => {
if (presentBaseVdis.has(baseUuid)) {
debug('found base VDI', {
base: baseUuid,
vdi: srcVdi.uuid,
})
} else {
debug('missing base VDI', {
base: baseUuid,
vdi: srcVdi.uuid,
})
fullVdisRequired.add(srcVdi.uuid)
}
})
this._vmComparisonBasis = baseVm
this._fullVdisRequired = fullVdisRequired
}
}
exports.IncrementalXapiVmBackup = IncrementalXapiVmBackup

View File

@@ -1,13 +1,13 @@
'use strict' 'use strict'
const { formatFilenameDate } = require('../_filenameDate.js') const { formatFilenameDate } = require('../../../_filenameDate.js')
const { getOldEntries } = require('../_getOldEntries.js') const { getOldEntries } = require('./_getOldEntries.js')
const { Task } = require('../Task.js') const { Task } = require('../../../Task.js')
const { MixinBackupWriter } = require('./_MixinBackupWriter.js') const { MixinRemoteWriter } = require('./_MixinRemoteWriter.js')
const { AbstractFullWriter } = require('./_AbstractFullWriter.js') const { AbstractFullWriter } = require('./_AbstractFullWriter.js')
exports.FullBackupWriter = class FullBackupWriter extends MixinBackupWriter(AbstractFullWriter) { exports.FullRemoteWriter = class FullRemoteWriter extends MixinRemoteWriter(AbstractFullWriter) {
constructor(props) { constructor(props) {
super(props) super(props)
@@ -26,11 +26,11 @@ exports.FullBackupWriter = class FullBackupWriter extends MixinBackupWriter(Abst
) )
} }
async _run({ timestamp, sizeContainer, stream }) { async _run({ timestamp, sizeContainer, stream, vm = this._backup.vm, vmSnapshot = this._backup.exportedVm }) {
const backup = this._backup const backup = this._backup
const settings = this._settings const settings = this._settings
const { job, scheduleId, vm } = backup const { job, scheduleId } = backup
const adapter = this._adapter const adapter = this._adapter
@@ -54,7 +54,7 @@ exports.FullBackupWriter = class FullBackupWriter extends MixinBackupWriter(Abst
timestamp, timestamp,
version: '2.0.0', version: '2.0.0',
vm, vm,
vmSnapshot: this._backup.exportedVm, vmSnapshot,
xva: './' + dataBasename, xva: './' + dataBasename,
} }
@@ -67,9 +67,9 @@ exports.FullBackupWriter = class FullBackupWriter extends MixinBackupWriter(Abst
await adapter.outputStream(dataFilename, stream, { await adapter.outputStream(dataFilename, stream, {
validator: tmpPath => adapter.isValidXva(tmpPath), validator: tmpPath => adapter.isValidXva(tmpPath),
}) })
return { size: sizeContainer.size } return { size: sizeContainer?.size }
}) })
metadata.size = sizeContainer.size metadata.size = sizeContainer?.size ?? 0
this._metadataFileName = await adapter.writeVmBackupMetadata(vm.uuid, metadata) this._metadataFileName = await adapter.writeVmBackupMetadata(vm.uuid, metadata)
if (!deleteFirst) { if (!deleteFirst) {

View File

@@ -4,15 +4,15 @@ const ignoreErrors = require('promise-toolbox/ignoreErrors')
const { asyncMap, asyncMapSettled } = require('@xen-orchestra/async-map') const { asyncMap, asyncMapSettled } = require('@xen-orchestra/async-map')
const { formatDateTime } = require('@xen-orchestra/xapi') const { formatDateTime } = require('@xen-orchestra/xapi')
const { formatFilenameDate } = require('../_filenameDate.js') const { formatFilenameDate } = require('../../../_filenameDate.js')
const { getOldEntries } = require('../_getOldEntries.js') const { getOldEntries } = require('./_getOldEntries.js')
const { Task } = require('../Task.js') const { Task } = require('../../../Task.js')
const { AbstractFullWriter } = require('./_AbstractFullWriter.js') const { AbstractFullWriter } = require('./_AbstractFullWriter.js')
const { MixinReplicationWriter } = require('./_MixinReplicationWriter.js') const { MixinXapiWriter } = require('./_MixinXapiWriter.js')
const { listReplicatedVms } = require('./_listReplicatedVms.js') const { listReplicatedVms } = require('./_listReplicatedVms.js')
exports.FullReplicationWriter = class FullReplicationWriter extends MixinReplicationWriter(AbstractFullWriter) { exports.FullXapiWriter = class FullXapiWriter extends MixinXapiWriter(AbstractFullWriter) {
constructor(props) { constructor(props) {
super(props) super(props)

View File

@@ -11,19 +11,19 @@ const { decorateClass } = require('@vates/decorate-with')
const { defer } = require('golike-defer') const { defer } = require('golike-defer')
const { dirname } = require('path') const { dirname } = require('path')
const { formatFilenameDate } = require('../_filenameDate.js') const { formatFilenameDate } = require('../../../_filenameDate.js')
const { getOldEntries } = require('../_getOldEntries.js') const { getOldEntries } = require('./_getOldEntries.js')
const { Task } = require('../Task.js') const { Task } = require('../../../Task.js')
const { MixinBackupWriter } = require('./_MixinBackupWriter.js') const { MixinRemoteWriter } = require('./_MixinRemoteWriter.js')
const { AbstractDeltaWriter } = require('./_AbstractDeltaWriter.js') const { AbstractIncrementalWriter } = require('./_AbstractIncrementalWriter.js')
const { checkVhd } = require('./_checkVhd.js') const { checkVhd } = require('./_checkVhd.js')
const { packUuid } = require('./_packUuid.js') const { packUuid } = require('./_packUuid.js')
const { Disposable } = require('promise-toolbox') const { Disposable } = require('promise-toolbox')
const { warn } = createLogger('xo:backups:DeltaBackupWriter') const { warn } = createLogger('xo:backups:IncrementalRemoteWriter')
class DeltaBackupWriter extends MixinBackupWriter(AbstractDeltaWriter) { class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrementalWriter) {
async checkBaseVdis(baseUuidToSrcVdi) { async checkBaseVdis(baseUuidToSrcVdi) {
const { handler } = this._adapter const { handler } = this._adapter
const backup = this._backup const backup = this._backup
@@ -70,13 +70,13 @@ class DeltaBackupWriter extends MixinBackupWriter(AbstractDeltaWriter) {
return this._cleanVm({ merge: true }) return this._cleanVm({ merge: true })
} }
prepare({ isFull }) { prepare({ isBase }) {
// create the task related to this export and ensure all methods are called in this context // create the task related to this export and ensure all methods are called in this context
const task = new Task({ const task = new Task({
name: 'export', name: 'export',
data: { data: {
id: this._remoteId, id: this._remoteId,
isFull, isBase,
type: 'remote', type: 'remote',
}, },
}) })
@@ -130,15 +130,15 @@ class DeltaBackupWriter extends MixinBackupWriter(AbstractDeltaWriter) {
// delete sequentially from newest to oldest to avoid unnecessary merges // delete sequentially from newest to oldest to avoid unnecessary merges
for (let i = oldEntries.length; i-- > 0; ) { for (let i = oldEntries.length; i-- > 0; ) {
await adapter.deleteDeltaVmBackups([oldEntries[i]]) await adapter.deleteIncrementalVmBackups([oldEntries[i]])
} }
} }
async _transfer($defer, { timestamp, deltaExport }) { async _transfer($defer, { timestamp, deltaExport, vm = this._backup.vm, vmSnapshot = this._backup.exportedVm }) {
const adapter = this._adapter const adapter = this._adapter
const backup = this._backup const backup = this._backup
const { job, scheduleId, vm } = backup const { job, scheduleId } = backup
const jobId = job.id const jobId = job.id
const handler = adapter.handler const handler = adapter.handler
@@ -169,7 +169,7 @@ class DeltaBackupWriter extends MixinBackupWriter(AbstractDeltaWriter) {
vifs: deltaExport.vifs, vifs: deltaExport.vifs,
vhds, vhds,
vm, vm,
vmSnapshot: this._backup.exportedVm, vmSnapshot,
} }
const { size } = await Task.run({ name: 'transfer' }, async () => { const { size } = await Task.run({ name: 'transfer' }, async () => {
@@ -208,7 +208,14 @@ class DeltaBackupWriter extends MixinBackupWriter(AbstractDeltaWriter) {
}) })
if (isDelta) { if (isDelta) {
await chainVhd(handler, parentPath, handler, path) try {
await chainVhd(handler, parentPath, handler, path)
} catch (err) {
// @todo : check why if chains with full disk
if (err.message !== 'cannot chain disk of type 3') {
throw err
}
}
} }
// set the correct UUID in the VHD // set the correct UUID in the VHD
@@ -223,10 +230,9 @@ class DeltaBackupWriter extends MixinBackupWriter(AbstractDeltaWriter) {
}) })
metadataContent.size = size metadataContent.size = size
this._metadataFileName = await adapter.writeVmBackupMetadata(vm.uuid, metadataContent) this._metadataFileName = await adapter.writeVmBackupMetadata(vm.uuid, metadataContent)
// TODO: run cleanup? // TODO: run cleanup?
} }
} }
exports.DeltaBackupWriter = decorateClass(DeltaBackupWriter, { exports.IncrementalRemoteWriter = decorateClass(IncrementalRemoteWriter, {
_transfer: defer, _transfer: defer,
}) })

View File

@@ -4,16 +4,16 @@ const { asyncMap, asyncMapSettled } = require('@xen-orchestra/async-map')
const ignoreErrors = require('promise-toolbox/ignoreErrors') const ignoreErrors = require('promise-toolbox/ignoreErrors')
const { formatDateTime } = require('@xen-orchestra/xapi') const { formatDateTime } = require('@xen-orchestra/xapi')
const { formatFilenameDate } = require('../_filenameDate.js') const { formatFilenameDate } = require('../../../_filenameDate.js')
const { getOldEntries } = require('../_getOldEntries.js') const { getOldEntries } = require('./_getOldEntries.js')
const { importDeltaVm, TAG_COPY_SRC } = require('../_deltaVm.js') const { importIncrementalVm, TAG_COPY_SRC } = require('../../../_incrementalVm.js')
const { Task } = require('../Task.js') const { Task } = require('../../../Task.js')
const { AbstractDeltaWriter } = require('./_AbstractDeltaWriter.js') const { AbstractIncrementalWriter } = require('./_AbstractIncrementalWriter.js')
const { MixinReplicationWriter } = require('./_MixinReplicationWriter.js') const { MixinXapiWriter } = require('./_MixinXapiWriter.js')
const { listReplicatedVms } = require('./_listReplicatedVms.js') const { listReplicatedVms } = require('./_listReplicatedVms.js')
exports.DeltaReplicationWriter = class DeltaReplicationWriter extends MixinReplicationWriter(AbstractDeltaWriter) { exports.IncrementalXapiWriter = class IncrementalXapiWriter extends MixinXapiWriter(AbstractIncrementalWriter) {
async checkBaseVdis(baseUuidToSrcVdi, baseVm) { async checkBaseVdis(baseUuidToSrcVdi, baseVm) {
const sr = this._sr const sr = this._sr
const replicatedVm = listReplicatedVms(sr.$xapi, this._backup.job.id, sr.uuid, this._backup.vm.uuid).find( const replicatedVm = listReplicatedVms(sr.$xapi, this._backup.job.id, sr.uuid, this._backup.vm.uuid).find(
@@ -38,13 +38,13 @@ exports.DeltaReplicationWriter = class DeltaReplicationWriter extends MixinRepli
} }
} }
prepare({ isFull }) { prepare({ isBase }) {
// create the task related to this export and ensure all methods are called in this context // create the task related to this export and ensure all methods are called in this context
const task = new Task({ const task = new Task({
name: 'export', name: 'export',
data: { data: {
id: this._sr.uuid, id: this._sr.uuid,
isFull, isBase,
name_label: this._sr.name_label, name_label: this._sr.name_label,
type: 'SR', type: 'SR',
}, },
@@ -90,7 +90,7 @@ exports.DeltaReplicationWriter = class DeltaReplicationWriter extends MixinRepli
let targetVmRef let targetVmRef
await Task.run({ name: 'transfer' }, async () => { await Task.run({ name: 'transfer' }, async () => {
targetVmRef = await importDeltaVm( targetVmRef = await importIncrementalVm(
{ {
__proto__: deltaExport, __proto__: deltaExport,
vm: { vm: {

View File

@@ -2,7 +2,7 @@
const { AbstractWriter } = require('./_AbstractWriter.js') const { AbstractWriter } = require('./_AbstractWriter.js')
exports.AbstractDeltaWriter = class AbstractDeltaWriter extends AbstractWriter { exports.AbstractIncrementalWriter = class AbstractIncrementalWriter extends AbstractWriter {
checkBaseVdis(baseUuidToSrcVdi, baseVm) { checkBaseVdis(baseUuidToSrcVdi, baseVm) {
throw new Error('Not implemented') throw new Error('Not implemented')
} }
@@ -11,7 +11,7 @@ exports.AbstractDeltaWriter = class AbstractDeltaWriter extends AbstractWriter {
throw new Error('Not implemented') throw new Error('Not implemented')
} }
prepare({ isFull }) { prepare({ isBase }) {
throw new Error('Not implemented') throw new Error('Not implemented')
} }

View File

@@ -4,17 +4,17 @@ const { createLogger } = require('@xen-orchestra/log')
const { join } = require('path') const { join } = require('path')
const assert = require('assert') const assert = require('assert')
const { formatFilenameDate } = require('../_filenameDate.js') const { formatFilenameDate } = require('../../../_filenameDate.js')
const { getVmBackupDir } = require('../_getVmBackupDir.js') const { getVmBackupDir } = require('../../../_getVmBackupDir.js')
const { HealthCheckVmBackup } = require('../HealthCheckVmBackup.js') const { HealthCheckVmBackup } = require('../../../HealthCheckVmBackup.js')
const { ImportVmBackup } = require('../ImportVmBackup.js') const { ImportVmBackup } = require('../../../ImportVmBackup.js')
const { Task } = require('../Task.js') const { Task } = require('../../../Task.js')
const MergeWorker = require('../merge-worker/index.js') const MergeWorker = require('../../../merge-worker/index.js')
const { info, warn } = createLogger('xo:backups:MixinBackupWriter') const { info, warn } = createLogger('xo:backups:MixinRemoteWriter')
exports.MixinBackupWriter = (BaseClass = Object) => exports.MixinRemoteWriter = (BaseClass = Object) =>
class MixinBackupWriter extends BaseClass { class MixinRemoteWriter extends BaseClass {
#lock #lock
constructor({ remoteId, ...rest }) { constructor({ remoteId, ...rest }) {

View File

@@ -1,8 +1,8 @@
'use strict' 'use strict'
const { Task } = require('../Task') const { Task } = require('../../../Task')
const assert = require('node:assert/strict') const assert = require('node:assert/strict')
const { HealthCheckVmBackup } = require('../HealthCheckVmBackup') const { HealthCheckVmBackup } = require('../../../HealthCheckVmBackup')
function extractOpaqueRef(str) { function extractOpaqueRef(str) {
const OPAQUE_REF_RE = /OpaqueRef:[0-9a-z-]+/ const OPAQUE_REF_RE = /OpaqueRef:[0-9a-z-]+/
@@ -12,8 +12,8 @@ function extractOpaqueRef(str) {
} }
return matches[0] return matches[0]
} }
exports.MixinReplicationWriter = (BaseClass = Object) => exports.MixinXapiWriter = (BaseClass = Object) =>
class MixinReplicationWriter extends BaseClass { class MixinXapiWriter extends BaseClass {
constructor({ sr, ...rest }) { constructor({ sr, ...rest }) {
super(rest) super(rest)

View File

@@ -0,0 +1,138 @@
'use strict'
const { asyncMapSettled } = require('@xen-orchestra/async-map')
const Disposable = require('promise-toolbox/Disposable')
const { limitConcurrency } = require('limit-concurrency-decorator')
const { extractIdsFromSimplePattern } = require('../extractIdsFromSimplePattern.js')
const { Task } = require('../Task.js')
const createStreamThrottle = require('./createStreamThrottle.js')
const { IncrementalXapiVmBackup } = require('./VmBackup/IncrementalXapiVmBackup.js')
const { FullXapiVmBackup } = require('./VmBackup/FullXapiVmBackup.js')
const { DEFAULT_SETTINGS, AbstractBackupJob } = require('./AbstractBackupJob.js')
const { runTask } = require('./runTask.js')
const { getAdaptersByRemote } = require('./getAdapterByRemote.js')
const DEFAULT_XAPI_VM_SETTINGS = {
bypassVdiChainsCheck: false,
checkpointSnapshot: false,
concurrency: 2,
copyRetention: 0,
deleteFirst: false,
exportRetention: 0,
fullInterval: 0,
healthCheckSr: undefined,
healthCheckVmsWithTags: [],
maxExportRate: 0,
maxMergedDeltasPerRun: Infinity,
offlineBackup: false,
offlineSnapshot: false,
snapshotRetention: 0,
timeout: 0,
useNbd: false,
unconditionalSnapshot: false,
validateVhdStreams: false,
vmTimeout: 0,
}
exports.XapiVmBackupJob = class XapiVmBackupJob extends AbstractBackupJob {
_computeBaseSettings(config, job) {
const baseSettings = { ...DEFAULT_SETTINGS }
Object.assign(baseSettings, DEFAULT_XAPI_VM_SETTINGS, config.defaultSettings, config.vm?.defaultSettings)
Object.assign(baseSettings, job.settings[''])
return baseSettings
}
async run() {
const job = this._job
// FIXME: proper SimpleIdPattern handling
const getSnapshotNameLabel = this._getSnapshotNameLabel
const schedule = this._schedule
const settings = this._settings
const throttleStream = createStreamThrottle(settings.maxExportRate)
const config = this._config
await Disposable.use(
Disposable.all(
extractIdsFromSimplePattern(job.srs).map(id =>
this._getRecord('SR', id).catch(error => {
runTask(
{
name: 'get SR record',
data: { type: 'SR', id },
},
() => Promise.reject(error)
)
})
)
),
Disposable.all(extractIdsFromSimplePattern(job.remotes).map(id => this._getAdapter(id))),
() => (settings.healthCheckSr !== undefined ? this._getRecord('SR', settings.healthCheckSr) : undefined),
async (srs, remoteAdapters, healthCheckSr) => {
// remove adapters that failed (already handled)
remoteAdapters = remoteAdapters.filter(_ => _ !== undefined)
// remove srs that failed (already handled)
srs = srs.filter(_ => _ !== undefined)
if (remoteAdapters.length === 0 && srs.length === 0 && settings.snapshotRetention === 0) {
return
}
const vmIds = extractIdsFromSimplePattern(job.vms)
Task.info('vms', { vms: vmIds })
remoteAdapters = getAdaptersByRemote(remoteAdapters)
const allSettings = this._job.settings
const baseSettings = this._baseSettings
const handleVm = vmUuid => {
const taskStart = { name: 'backup VM', data: { type: 'VM', id: vmUuid } }
return this._getRecord('VM', vmUuid).then(
disposableVm =>
Disposable.use(disposableVm, vm => {
taskStart.data.name_label = vm.name_label
return runTask(taskStart, () => {
const opts = {
baseSettings,
config,
getSnapshotNameLabel,
healthCheckSr,
job,
remoteAdapters,
schedule,
settings: { ...settings, ...allSettings[vm.uuid] },
srs,
throttleStream,
vm,
}
let vmBackup
if (job.mode === 'delta') {
vmBackup = new IncrementalXapiVmBackup(opts)
} else {
if (job.mode === 'full') {
vmBackup = new FullXapiVmBackup(opts)
} else {
throw new Error(`Job mode ${job.mode} not implemented`)
}
}
return vmBackup.run()
})
}),
error =>
runTask(taskStart, () => {
throw error
})
)
}
const { concurrency } = settings
await asyncMapSettled(vmIds, concurrency === 0 ? handleVm : limitConcurrency(concurrency)(handleVm))
}
)
}
}

View File

@@ -2,9 +2,9 @@
const { asyncMap } = require('@xen-orchestra/async-map') const { asyncMap } = require('@xen-orchestra/async-map')
const { DIR_XO_CONFIG_BACKUPS } = require('./RemoteAdapter.js') const { DIR_XO_CONFIG_BACKUPS } = require('../RemoteAdapter.js')
const { formatFilenameDate } = require('./_filenameDate.js') const { formatFilenameDate } = require('../_filenameDate.js')
const { Task } = require('./Task.js') const { Task } = require('../Task.js')
exports.XoMetadataBackup = class XoMetadataBackup { exports.XoMetadataBackup = class XoMetadataBackup {
constructor({ config, job, remoteAdapters, schedule, settings }) { constructor({ config, job, remoteAdapters, schedule, settings }) {

View File

@@ -0,0 +1,9 @@
'use strict'
exports.getAdaptersByRemote = adapters => {
const adaptersByRemote = {}
adapters.forEach(({ adapter, remoteId }) => {
adaptersByRemote[remoteId] = adapter
})
return adaptersByRemote
}

View File

@@ -0,0 +1,7 @@
'use strict'
const { Task } = require('../Task')
const noop = Function.prototype
exports.runTask = (...args) => Task.run(...args).catch(noop) // errors are handled by logs

View File

@@ -16,7 +16,7 @@ const { getHandler } = require('@xen-orchestra/fs')
const { parseDuration } = require('@vates/parse-duration') const { parseDuration } = require('@vates/parse-duration')
const { Xapi } = require('@xen-orchestra/xapi') const { Xapi } = require('@xen-orchestra/xapi')
const { Backup } = require('./Backup.js') const { instantiateBackupJob } = require('./backupJob.js')
const { RemoteAdapter } = require('./RemoteAdapter.js') const { RemoteAdapter } = require('./RemoteAdapter.js')
const { Task } = require('./Task.js') const { Task } = require('./Task.js')
@@ -48,7 +48,7 @@ class BackupWorker {
} }
run() { run() {
return new Backup({ return instantiateBackupJob({
config: this.#config, config: this.#config,
getAdapter: remoteId => this.getAdapter(this.#remotes[remoteId]), getAdapter: remoteId => this.getAdapter(this.#remotes[remoteId]),
getConnectedRecord: Disposable.factory(async function* getConnectedRecord(type, uuid) { getConnectedRecord: Disposable.factory(async function* getConnectedRecord(type, uuid) {

View File

@@ -33,9 +33,9 @@ const resolveUuid = async (xapi, cache, uuid, type) => {
return ref return ref
} }
exports.exportDeltaVm = async function exportDeltaVm( exports.exportIncrementalVm = async function exportIncrementalVm(
vm, vm,
baseVm, vmComparisonBasis,
{ {
cancelToken = CancelToken.none, cancelToken = CancelToken.none,
@@ -46,12 +46,12 @@ exports.exportDeltaVm = async function exportDeltaVm(
} = {} } = {}
) { ) {
// refs of VM's VDIs → base's VDIs. // refs of VM's VDIs → base's VDIs.
const baseVdis = {} const vdisCompaisonBasis = {}
baseVm && vmComparisonBasis &&
baseVm.$VBDs.forEach(vbd => { vmComparisonBasis.$VBDs.forEach(vbd => {
let vdi, snapshotOf let vdi, snapshotOf
if ((vdi = vbd.$VDI) && (snapshotOf = vdi.$snapshot_of) && !fullVdisRequired.has(snapshotOf.uuid)) { if ((vdi = vbd.$VDI) && (snapshotOf = vdi.$snapshot_of) && !fullVdisRequired.has(snapshotOf.uuid)) {
baseVdis[vdi.snapshot_of] = vdi vdisCompaisonBasis[vdi.snapshot_of] = vdi
} }
}) })
@@ -74,20 +74,20 @@ exports.exportDeltaVm = async function exportDeltaVm(
} }
// Look for a snapshot of this vdi in the base VM. // Look for a snapshot of this vdi in the base VM.
const baseVdi = baseVdis[vdi.snapshot_of] const vdiComparisonBasis = vdisCompaisonBasis[vdi.snapshot_of]
vdis[vdiRef] = { vdis[vdiRef] = {
...vdi, ...vdi,
other_config: { other_config: {
...vdi.other_config, ...vdi.other_config,
[TAG_BASE_DELTA]: baseVdi && !disableBaseTags ? baseVdi.uuid : undefined, [TAG_BASE_DELTA]: vdiComparisonBasis && !disableBaseTags ? vdiComparisonBasis.uuid : undefined,
}, },
$snapshot_of$uuid: vdi.$snapshot_of?.uuid, $snapshot_of$uuid: vdi.$snapshot_of?.uuid,
$SR$uuid: vdi.$SR.uuid, $SR$uuid: vdi.$SR.uuid,
} }
streams[`${vdiRef}.vhd`] = await vdi.$exportContent({ streams[`${vdiRef}.vhd`] = await vdi.$exportContent({
baseRef: baseVdi?.$ref, baseRef: vdiComparisonBasis?.$ref,
cancelToken, cancelToken,
format: 'vhd', format: 'vhd',
}) })
@@ -126,10 +126,10 @@ exports.exportDeltaVm = async function exportDeltaVm(
vm: { vm: {
...vm, ...vm,
other_config: other_config:
baseVm && !disableBaseTags vmComparisonBasis && !disableBaseTags
? { ? {
...vm.other_config, ...vm.other_config,
[TAG_BASE_DELTA]: baseVm.uuid, [TAG_BASE_DELTA]: vmComparisonBasis.uuid,
} }
: omit(vm.other_config, TAG_BASE_DELTA), : omit(vm.other_config, TAG_BASE_DELTA),
}, },
@@ -143,18 +143,18 @@ exports.exportDeltaVm = async function exportDeltaVm(
) )
} }
exports.importDeltaVm = defer(async function importDeltaVm( exports.importIncrementalVm = defer(async function importIncrementalVm(
$defer, $defer,
deltaVm, incrementalVm,
sr, sr,
{ cancelToken = CancelToken.none, detectBase = true, mapVdisSrs = {}, newMacAddresses = false } = {} { cancelToken = CancelToken.none, detectBase = true, mapVdisSrs = {}, newMacAddresses = false } = {}
) { ) {
const { version } = deltaVm const { version } = incrementalVm
if (compareVersions(version, '1.0.0') < 0) { if (compareVersions(version, '1.0.0') < 0) {
throw new Error(`Unsupported delta backup version: ${version}`) throw new Error(`Unsupported delta backup version: ${version}`)
} }
const vmRecord = deltaVm.vm const vmRecord = incrementalVm.vm
const xapi = sr.$xapi const xapi = sr.$xapi
let baseVm let baseVm
@@ -183,7 +183,7 @@ exports.importDeltaVm = defer(async function importDeltaVm(
baseVdis[vbd.VDI] = vbd.$VDI baseVdis[vbd.VDI] = vbd.$VDI
} }
}) })
const vdiRecords = deltaVm.vdis const vdiRecords = incrementalVm.vdis
// 0. Create suspend_VDI // 0. Create suspend_VDI
let suspendVdi let suspendVdi
@@ -240,7 +240,7 @@ exports.importDeltaVm = defer(async function importDeltaVm(
await asyncMap(await xapi.getField('VM', vmRef, 'VBDs'), ref => ignoreErrors.call(xapi.call('VBD.destroy', ref))) await asyncMap(await xapi.getField('VM', vmRef, 'VBDs'), ref => ignoreErrors.call(xapi.call('VBD.destroy', ref)))
// 3. Create VDIs & VBDs. // 3. Create VDIs & VBDs.
const vbdRecords = deltaVm.vbds const vbdRecords = incrementalVm.vbds
const vbds = groupBy(vbdRecords, 'VDI') const vbds = groupBy(vbdRecords, 'VDI')
const newVdis = {} const newVdis = {}
await asyncMap(Object.keys(vdiRecords), async vdiRef => { await asyncMap(Object.keys(vdiRecords), async vdiRef => {
@@ -309,7 +309,7 @@ exports.importDeltaVm = defer(async function importDeltaVm(
} }
}) })
const { streams } = deltaVm const { streams } = incrementalVm
await Promise.all([ await Promise.all([
// Import VDI contents. // Import VDI contents.
@@ -326,7 +326,7 @@ exports.importDeltaVm = defer(async function importDeltaVm(
}), }),
// Create VIFs. // Create VIFs.
asyncMap(Object.values(deltaVm.vifs), vif => { asyncMap(Object.values(incrementalVm.vifs), vif => {
let network = vif.$network$uuid && xapi.getObjectByUuid(vif.$network$uuid, undefined) let network = vif.$network$uuid && xapi.getObjectByUuid(vif.$network$uuid, undefined)
if (network === undefined) { if (network === undefined) {
@@ -358,8 +358,8 @@ exports.importDeltaVm = defer(async function importDeltaVm(
]) ])
await Promise.all([ await Promise.all([
deltaVm.vm.ha_always_run && xapi.setField('VM', vmRef, 'ha_always_run', true), incrementalVm.vm.ha_always_run && xapi.setField('VM', vmRef, 'ha_always_run', true),
xapi.setField('VM', vmRef, 'name_label', deltaVm.vm.name_label), xapi.setField('VM', vmRef, 'name_label', incrementalVm.vm.name_label),
]) ])
return vmRef return vmRef

View File

@@ -0,0 +1,24 @@
'use strict'
const { MetadatasBackupJob } = require('./_backupJob/MetadatasBackupJob.js')
const { RemoteVmBackupJob } = require('./_backupJob/RemoteVmBackupJob.js')
const { XapiVmBackupJob } = require('./_backupJob/XapiVmBackupJob.js')
exports.instantiateBackupJob = function instantiateBackupJob({
config,
getAdapter,
getConnectedRecord,
job,
schedule,
}) {
switch (job.type) {
case 'backup':
return new XapiVmBackupJob({ config, getAdapter, getConnectedRecord, job, schedule })
case 'remote-to-remote':
return new RemoteVmBackupJob({ config, getAdapter, getConnectedRecord, job, schedule })
case 'metadataBackup':
return new MetadatasBackupJob({ config, getAdapter, getConnectedRecord, job, schedule })
default:
throw new Error(`No runners for the backup type ${job.type}`)
}
}

View File

@@ -136,7 +136,7 @@ task.start(message: 'restore', data: { jobId: string, srId: string, time: number
## API ## API
### Run description object ### Run description object Metadata / Pool backup
This is a JavaScript object containing all the information necessary to run a backup job. This is a JavaScript object containing all the information necessary to run a backup job.
@@ -150,11 +150,65 @@ job:
# Human readable identifier # Human readable identifier
name: string name: string
# Whether this job is doing Full Backup / Disaster Recovery or # which pools to saved, can be undefined
# Delta Backup / Continuous Replication pools : IdPattern
# which remotes to use
remotes: IdPattern
settings:
# Used for the whole job
'': Settings
# Used for a specific schedule
[ScheduleId]: Settings
# Used for a specific VM
[VmId]: Settings
# if defined : backup the xo metadata
xoMetadata : string
type: 'metadataBackup'
# Information necessary to connect to each remote
remotes:
[RemoteId]:
url: string
# Indicates which schedule is used for this run
schedule:
id: ScheduleId
# Information necessary to connect to each XAPI
xapis:
[XapiId]:
allowUnauthorized: boolean
credentials:
password: string
username: string
url: string
```
### Run description object Vms Xapi to remote and/or Xapi to Xapi
This is a JavaScript object containing all the information necessary to run a backup job.
```coffee
# Information about the job itself
job:
# Unique identifier
id: string
# Human readable identifier
name: string
# Whether this job is doing Full Backup / Full Replication or
# Incremental Backup / Incremental Replication
mode: 'full' | 'delta' mode: 'full' | 'delta'
# For backup jobs, indicates which remotes to use # indicates which remotes used to writes. Can be empty.
remotes: IdPattern remotes: IdPattern
settings: settings:
@@ -168,10 +222,10 @@ job:
# Used for a specific VM # Used for a specific VM
[VmId]: Settings [VmId]: Settings
# For replication jobs, indicates which SRs to use # indicates which SRs to use for replication jobs. Can be empty.
srs: IdPattern srs: IdPattern
# Here for historical reasons # Here for historical reasons, xapi to remote or xapi to xapi
type: 'backup' type: 'backup'
# Indicates which VMs to backup/replicate # Indicates which VMs to backup/replicate
@@ -200,6 +254,62 @@ xapis:
url: string url: string
``` ```
### Run description object Vms remote to remote
This is a JavaScript object containing all the information necessary to run a backup job.
```coffee
# Information about the job itself
job:
# Unique identifier
id: string
# Human readable identifier
name: string
# Whether this job is doing Full Backup / Full Replication or
# Incremental Backup / Incremental Replication
mode: 'full' | 'delta'
# Indicates which remotes to write VMs
remotes: IdPattern
settings:
# Used for the whole job
'': Settings
# Used for a specific schedule
[ScheduleId]: Settings
# Used for a specific VM
[VmId]: Settings
# only transfer data saved by one of theses Job
# transfer all if empty
sourceJobIds: IdPattern
# Here for historical reasons, xapi to remote or xapi to xapi
type: 'remote-to-remote'
# Indicates which VMs to backup/replicate
vms: IdPattern
# Indicate the remote used to read Vms
sourceRemote:
[ObjectId]: XapiId
# Information necessary to connect to each remote (read or write)
remotes:
[RemoteId]:
url: string
# Indicates which schedule is used for this run
schedule:
id: ScheduleId
```
### `IdPattern` ### `IdPattern`
For a single object: For a single object:

View File

@@ -1,7 +1,6 @@
import Disposable from 'promise-toolbox/Disposable' import Disposable from 'promise-toolbox/Disposable'
import fromCallback from 'promise-toolbox/fromCallback' import fromCallback from 'promise-toolbox/fromCallback'
import { asyncMap } from '@xen-orchestra/async-map' import { asyncMap } from '@xen-orchestra/async-map'
import { Backup } from '@xen-orchestra/backups/Backup.js'
import { compose } from '@vates/compose' import { compose } from '@vates/compose'
import { createLogger } from '@xen-orchestra/log' import { createLogger } from '@xen-orchestra/log'
import { decorateMethodsWith } from '@vates/decorate-with' import { decorateMethodsWith } from '@vates/decorate-with'
@@ -18,6 +17,7 @@ import { RestoreMetadataBackup } from '@xen-orchestra/backups/RestoreMetadataBac
import { runBackupWorker } from '@xen-orchestra/backups/runBackupWorker.js' import { runBackupWorker } from '@xen-orchestra/backups/runBackupWorker.js'
import { Task } from '@xen-orchestra/backups/Task.js' import { Task } from '@xen-orchestra/backups/Task.js'
import { Xapi } from '@xen-orchestra/xapi' import { Xapi } from '@xen-orchestra/xapi'
import { instantiateBackupJob } from '@xen-orchestra/backups/backupJob.js'
const noop = Function.prototype const noop = Function.prototype
@@ -52,7 +52,7 @@ export default class Backups {
const config = app.config.get('backups') const config = app.config.get('backups')
if (config.disableWorkers) { if (config.disableWorkers) {
const { recordToXapi, remotes, xapis, ...rest } = params const { recordToXapi, remotes, xapis, ...rest } = params
return new Backup({ return instantiateBackupJob({
...rest, ...rest,
config, config,

View File

@@ -134,7 +134,7 @@ exports[`backupNg execute three times a delta backup with 2 remotes, 2 as retent
Object { Object {
"data": Object { "data": Object {
"id": Any<String>, "id": Any<String>,
"isFull": true, "isBase": true,
"type": "remote", "type": "remote",
}, },
"end": Any<Number>, "end": Any<Number>,
@@ -175,7 +175,7 @@ exports[`backupNg execute three times a delta backup with 2 remotes, 2 as retent
Object { Object {
"data": Object { "data": Object {
"id": Any<String>, "id": Any<String>,
"isFull": true, "isBase": true,
"type": "remote", "type": "remote",
}, },
"end": Any<Number>, "end": Any<Number>,
@@ -241,7 +241,7 @@ exports[`backupNg execute three times a delta backup with 2 remotes, 2 as retent
Object { Object {
"data": Object { "data": Object {
"id": Any<String>, "id": Any<String>,
"isFull": false, "isBase": false,
"type": "remote", "type": "remote",
}, },
"end": Any<Number>, "end": Any<Number>,
@@ -282,7 +282,7 @@ exports[`backupNg execute three times a delta backup with 2 remotes, 2 as retent
Object { Object {
"data": Object { "data": Object {
"id": Any<String>, "id": Any<String>,
"isFull": false, "isBase": false,
"type": "remote", "type": "remote",
}, },
"end": Any<Number>, "end": Any<Number>,
@@ -348,7 +348,7 @@ exports[`backupNg execute three times a delta backup with 2 remotes, 2 as retent
Object { Object {
"data": Object { "data": Object {
"id": Any<String>, "id": Any<String>,
"isFull": true, "isBase": true,
"type": "remote", "type": "remote",
}, },
"end": Any<Number>, "end": Any<Number>,
@@ -389,7 +389,7 @@ exports[`backupNg execute three times a delta backup with 2 remotes, 2 as retent
Object { Object {
"data": Object { "data": Object {
"id": Any<String>, "id": Any<String>,
"isFull": true, "isBase": true,
"type": "remote", "type": "remote",
}, },
"end": Any<Number>, "end": Any<Number>,

View File

@@ -3,7 +3,6 @@ import Disposable from 'promise-toolbox/Disposable'
import forOwn from 'lodash/forOwn.js' import forOwn from 'lodash/forOwn.js'
import groupBy from 'lodash/groupBy.js' import groupBy from 'lodash/groupBy.js'
import merge from 'lodash/merge.js' import merge from 'lodash/merge.js'
import { Backup } from '@xen-orchestra/backups/Backup.js'
import { createLogger } from '@xen-orchestra/log' import { createLogger } from '@xen-orchestra/log'
import { createPredicate } from 'value-matcher' import { createPredicate } from 'value-matcher'
import { decorateWith } from '@vates/decorate-with' import { decorateWith } from '@vates/decorate-with'
@@ -18,6 +17,7 @@ import { debounceWithKey, REMOVE_CACHE_ENTRY } from '../../_pDebounceWithKey.mjs
import { handleBackupLog } from '../../_handleBackupLog.mjs' import { handleBackupLog } from '../../_handleBackupLog.mjs'
import { serializeError, unboxIdsFromPattern } from '../../utils.mjs' import { serializeError, unboxIdsFromPattern } from '../../utils.mjs'
import { waitAll } from '../../_waitAll.mjs' import { waitAll } from '../../_waitAll.mjs'
import { instantiateBackupJob } from '@xen-orchestra/backups/backupJob.js'
const log = createLogger('xo:xo-mixins:backups-ng') const log = createLogger('xo:xo-mixins:backups-ng')
@@ -146,6 +146,9 @@ export default class BackupNg {
const proxyId = job.proxy const proxyId = job.proxy
const useXoProxy = proxyId !== undefined const useXoProxy = proxyId !== undefined
const remoteIds = unboxIdsFromPattern(job.remotes) const remoteIds = unboxIdsFromPattern(job.remotes)
if (job.sourceRemote) {
remoteIds.push(job.sourceRemote)
}
try { try {
if (!useXoProxy && backupsConfig.disableWorkers) { if (!useXoProxy && backupsConfig.disableWorkers) {
const localTaskIds = { __proto__: null } const localTaskIds = { __proto__: null }
@@ -164,7 +167,7 @@ export default class BackupNg {
}), }),
}, },
() => () =>
new Backup({ instantiateBackupJob({
config: backupsConfig, config: backupsConfig,
getAdapter: async remoteId => getAdapter: async remoteId =>
app.getBackupsRemoteAdapter(await app.getRemoteWithCredentials(remoteId)), app.getBackupsRemoteAdapter(await app.getRemoteWithCredentials(remoteId)),

View File

@@ -1,8 +1,8 @@
import asyncMapSettled from '@xen-orchestra/async-map/legacy.js' import asyncMapSettled from '@xen-orchestra/async-map/legacy.js'
import cloneDeep from 'lodash/cloneDeep.js' import cloneDeep from 'lodash/cloneDeep.js'
import Disposable from 'promise-toolbox/Disposable' import Disposable from 'promise-toolbox/Disposable'
import { Backup } from '@xen-orchestra/backups/Backup.js'
import { createLogger } from '@xen-orchestra/log' import { createLogger } from '@xen-orchestra/log'
import { instantiateBackupJob } from '@xen-orchestra/backups/backupJob.js'
import { parseMetadataBackupId } from '@xen-orchestra/backups/parseMetadataBackupId.js' import { parseMetadataBackupId } from '@xen-orchestra/backups/parseMetadataBackupId.js'
import { RestoreMetadataBackup } from '@xen-orchestra/backups/RestoreMetadataBackup.js' import { RestoreMetadataBackup } from '@xen-orchestra/backups/RestoreMetadataBackup.js'
import { Task } from '@xen-orchestra/backups/Task.js' import { Task } from '@xen-orchestra/backups/Task.js'
@@ -129,7 +129,7 @@ export default class metadataBackup {
}), }),
}, },
() => () =>
new Backup({ instantiateBackupJob({
config: this._app.config.get('backups'), config: this._app.config.get('backups'),
getAdapter: async remoteId => app.getBackupsRemoteAdapter(await app.getRemoteWithCredentials(remoteId)), getAdapter: async remoteId => app.getBackupsRemoteAdapter(await app.getRemoteWithCredentials(remoteId)),

View File

@@ -1,4 +1,3 @@
import { Backup } from '@xen-orchestra/backups/Backup.js'
import { decorateWith } from '@vates/decorate-with' import { decorateWith } from '@vates/decorate-with'
import { defer as deferrable } from 'golike-defer' import { defer as deferrable } from 'golike-defer'
import { fromEvent } from 'promise-toolbox' import { fromEvent } from 'promise-toolbox'
@@ -10,6 +9,7 @@ import Esxi from '@xen-orchestra/vmware-explorer/esxi.mjs'
import openDeltaVmdkasVhd from '@xen-orchestra/vmware-explorer/openDeltaVmdkAsVhd.mjs' import openDeltaVmdkasVhd from '@xen-orchestra/vmware-explorer/openDeltaVmdkAsVhd.mjs'
import OTHER_CONFIG_TEMPLATE from '../xapi/other-config-template.mjs' import OTHER_CONFIG_TEMPLATE from '../xapi/other-config-template.mjs'
import VhdEsxiRaw from '@xen-orchestra/vmware-explorer/VhdEsxiRaw.mjs' import VhdEsxiRaw from '@xen-orchestra/vmware-explorer/VhdEsxiRaw.mjs'
import { instantiateBackupJob } from '@xen-orchestra/backups/backupJob.js'
export default class MigrateVm { export default class MigrateVm {
constructor(app) { constructor(app) {
@@ -41,7 +41,7 @@ export default class MigrateVm {
const schedule = { id: 'one-time' } const schedule = { id: 'one-time' }
// for now we only support this from the main OA, no proxy // for now we only support this from the main OA, no proxy
return new Backup({ return instantiateBackupJob({
config, config,
job, job,
schedule, schedule,

View File

@@ -268,7 +268,7 @@ const New = decorate([
await createBackupNgJob({ await createBackupNgJob({
name: state.name, name: state.name,
mode: state.isDelta ? 'delta' : 'full', mode: state.isIncremental ? 'delta' : 'full',
compression: state.compression, compression: state.compression,
proxy: state.proxyId === null ? undefined : state.proxyId, proxy: state.proxyId === null ? undefined : state.proxyId,
schedules, schedules,
@@ -335,7 +335,7 @@ const New = decorate([
await editBackupNgJob({ await editBackupNgJob({
id: props.job.id, id: props.job.id,
name: state.name, name: state.name,
mode: state.isDelta ? 'delta' : 'full', mode: state.isIncremental ? 'delta' : 'full',
compression: state.compression, compression: state.compression,
proxy: state.proxyId, proxy: state.proxyId,
settings: normalizeSettings({ settings: normalizeSettings({
@@ -435,10 +435,10 @@ const New = decorate([
showScheduleModal: showScheduleModal:
({ saveSchedule }, storedSchedule = DEFAULT_SCHEDULE) => ({ saveSchedule }, storedSchedule = DEFAULT_SCHEDULE) =>
async ( async (
{ copyMode, exportMode, deltaMode, isDelta, propSettings, settings = propSettings, snapshotMode }, { copyMode, exportMode, deltaMode, isIncremental, propSettings, settings = propSettings, snapshotMode },
{ intl: { formatMessage } } { intl: { formatMessage } }
) => { ) => {
const modes = { copyMode, isDelta, exportMode, snapshotMode } const modes = { copyMode, isIncremental, exportMode, snapshotMode }
const schedule = await form({ const schedule = await form({
defaultValue: storedSchedule, defaultValue: storedSchedule,
render: props => ( render: props => (
@@ -650,7 +650,7 @@ const New = decorate([
state.missingSnapshotRetention, state.missingSnapshotRetention,
missingName: state => state.name.trim() === '', missingName: state => state.name.trim() === '',
missingVms: state => isEmpty(state.vms) && !state.smartMode, missingVms: state => isEmpty(state.vms) && !state.smartMode,
missingBackupMode: state => !state.isDelta && !state.isFull && !state.snapshotMode, missingBackupMode: state => !state.isIncremental && !state.isFull && !state.snapshotMode,
missingRemotes: state => (state.backupMode || state.deltaMode) && isEmpty(state.remotes), missingRemotes: state => (state.backupMode || state.deltaMode) && isEmpty(state.remotes),
missingSrs: state => (state.drMode || state.crMode) && isEmpty(state.srs), missingSrs: state => (state.drMode || state.crMode) && isEmpty(state.srs),
missingSchedules: (state, { job }) => job !== undefined && isEmpty(state.schedules), missingSchedules: (state, { job }) => job !== undefined && isEmpty(state.schedules),
@@ -663,16 +663,16 @@ const New = decorate([
exportRetentionExists: createDoesRetentionExist('exportRetention'), exportRetentionExists: createDoesRetentionExist('exportRetention'),
copyRetentionExists: createDoesRetentionExist('copyRetention'), copyRetentionExists: createDoesRetentionExist('copyRetention'),
snapshotRetentionExists: createDoesRetentionExist('snapshotRetention'), snapshotRetentionExists: createDoesRetentionExist('snapshotRetention'),
isDelta: state => state.deltaMode || state.crMode, isIncremental: state => state.deltaMode || state.crMode,
isFull: state => state.backupMode || state.drMode, isFull: state => state.backupMode || state.drMode,
vmsSmartPattern: ({ tags, vmsPattern }) => ({ vmsSmartPattern: ({ tags, vmsPattern }) => ({
...vmsPattern, ...vmsPattern,
tags: constructSmartPattern(tags, normalizeTagValues), tags: constructSmartPattern(tags, normalizeTagValues),
}), }),
vmPredicate: vmPredicate:
({ isDelta }, { hostsById, poolsById }) => ({ isIncremental }, { hostsById, poolsById }) =>
({ $container }) => ({ $container }) =>
!isDelta || !isIncremental ||
canDeltaBackup( canDeltaBackup(
get(() => hostsById[$container].version) || get(() => hostsById[poolsById[$container].master].version) get(() => hostsById[$container].version) || get(() => hostsById[poolsById[$container].master].version)
), ),
@@ -781,7 +781,7 @@ const New = decorate([
<ActionButton <ActionButton
active={state.backupMode} active={state.backupMode}
data-mode='backupMode' data-mode='backupMode'
disabled={state.isDelta} disabled={state.isIncremental}
handler={effects.toggleMode} handler={effects.toggleMode}
icon='backup' icon='backup'
> >
@@ -799,7 +799,7 @@ const New = decorate([
<ActionButton <ActionButton
active={state.drMode} active={state.drMode}
data-mode='drMode' data-mode='drMode'
disabled={state.isDelta || (!state.drMode && process.env.XOA_PLAN < 3)} disabled={state.isIncremental || (!state.drMode && process.env.XOA_PLAN < 3)}
handler={effects.toggleMode} handler={effects.toggleMode}
icon='disaster-recovery' icon='disaster-recovery'
> >
@@ -985,7 +985,7 @@ const New = decorate([
placeholder={formatMessage(messages.timeoutUnit)} placeholder={formatMessage(messages.timeoutUnit)}
/> />
</FormGroup> </FormGroup>
{state.isDelta && ( {state.isIncremental && (
<FormGroup> <FormGroup>
<label htmlFor={state.inputFullIntervalId}> <label htmlFor={state.inputFullIntervalId}>
<strong>{_('fullBackupInterval')}</strong> <strong>{_('fullBackupInterval')}</strong>
@@ -1078,7 +1078,7 @@ const New = decorate([
</ActionButton> </ActionButton>
</CardHeader> </CardHeader>
<CardBlock> <CardBlock>
{state.isDelta && ( {state.isIncremental && (
<span className='text-muted'> <span className='text-muted'>
<Icon icon='info' /> {_('deltaBackupOnOutdatedXenServerWarning')} <Icon icon='info' /> {_('deltaBackupOnOutdatedXenServerWarning')}
</span> </span>
@@ -1087,7 +1087,7 @@ const New = decorate([
{state.smartMode ? ( {state.smartMode ? (
<Upgrade place='newBackup' required={3}> <Upgrade place='newBackup' required={3}>
<SmartBackup <SmartBackup
deltaMode={state.isDelta} deltaMode={state.isIncremental}
onChange={effects.onVmsPatternChange} onChange={effects.onVmsPatternChange}
pattern={state.vmsPattern} pattern={state.vmsPattern}
/> />

View File

@@ -177,7 +177,7 @@ const New = decorate([
</div> </div>
)} )}
</FormGroup> </FormGroup>
{modes.isDelta && ( {modes.isIncremental && (
<FormGroup> <FormGroup>
<label> <label>
<strong>{_('forceFullBackup')}</strong>{' '} <strong>{_('forceFullBackup')}</strong>{' '}

View File

@@ -80,7 +80,7 @@ export default decorate([
}, },
] ]
if (state.isDelta) { if (state.isIncremental) {
columns.push({ columns.push({
itemRenderer: schedule => (schedule.fullInterval === 1 ? _('stateEnabled') : _('stateDisabled')), itemRenderer: schedule => (schedule.fullInterval === 1 ? _('stateEnabled') : _('stateDisabled')),
sortCriteria: 'fullInterval', sortCriteria: 'fullInterval',

View File

@@ -252,7 +252,8 @@ const VmTask = ({ children, className, restartVmJob, task }) => (
{_.keyValue(_('taskMergedDataSpeed'), formatSpeed(task.merge.size, task.merge.duration))} {_.keyValue(_('taskMergedDataSpeed'), formatSpeed(task.merge.size, task.merge.duration))}
</div> </div>
)} )}
{task.isFull !== undefined && _.keyValue(_('exportType'), task.isFull ? 'full' : 'delta')} {(task.isBase !== undefined || task.isFull !== undefined) &&
_.keyValue(_('exportType'), task.isBase || task.isFull /* legacy */ ? 'base' : 'delta')}
</li> </li>
) )
@@ -501,8 +502,10 @@ export default decorate([
: 'xo' : 'xo'
if (task.tasks !== undefined) { if (task.tasks !== undefined) {
const subTaskWithIsFull = task.tasks.find(({ data = {} }) => data.isFull !== undefined) const subTaskWithIsBase = task.tasks.find(
task.isFull = get(() => subTaskWithIsFull.data.isFull) ({ data = {} }) => data.isBase !== undefined || data.isFUll !== undefined
)
task.isBase = get(() => subTaskWithIsBase.data.isFull || subTaskWithIsBase.data.isBase)
} }
}) })