Compare commits
11 Commits
restApi-cr
...
backup_ref
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
49b28abd12 | ||
|
|
1878a8cd44 | ||
|
|
ea24d7cf51 | ||
|
|
a064d9ad33 | ||
|
|
4562929ace | ||
|
|
6e1c67e0fc | ||
|
|
375e47dc61 | ||
|
|
95d985b8a8 | ||
|
|
26354ac164 | ||
|
|
79cfec3205 | ||
|
|
f482d4a14c |
@@ -1,307 +0,0 @@
|
||||
'use strict'
|
||||
|
||||
const { asyncMap, asyncMapSettled } = require('@xen-orchestra/async-map')
|
||||
const Disposable = require('promise-toolbox/Disposable')
|
||||
const ignoreErrors = require('promise-toolbox/ignoreErrors')
|
||||
const pTimeout = require('promise-toolbox/timeout')
|
||||
const { compileTemplate } = require('@xen-orchestra/template')
|
||||
const { limitConcurrency } = require('limit-concurrency-decorator')
|
||||
|
||||
const { extractIdsFromSimplePattern } = require('./extractIdsFromSimplePattern.js')
|
||||
const { PoolMetadataBackup } = require('./_PoolMetadataBackup.js')
|
||||
const { Task } = require('./Task.js')
|
||||
const { VmBackup } = require('./_VmBackup.js')
|
||||
const { XoMetadataBackup } = require('./_XoMetadataBackup.js')
|
||||
const createStreamThrottle = require('./_createStreamThrottle.js')
|
||||
|
||||
const noop = Function.prototype
|
||||
|
||||
const getAdaptersByRemote = adapters => {
|
||||
const adaptersByRemote = {}
|
||||
adapters.forEach(({ adapter, remoteId }) => {
|
||||
adaptersByRemote[remoteId] = adapter
|
||||
})
|
||||
return adaptersByRemote
|
||||
}
|
||||
|
||||
const runTask = (...args) => Task.run(...args).catch(noop) // errors are handled by logs
|
||||
|
||||
const DEFAULT_SETTINGS = {
|
||||
getRemoteTimeout: 300e3,
|
||||
reportWhen: 'failure',
|
||||
}
|
||||
|
||||
const DEFAULT_VM_SETTINGS = {
|
||||
bypassVdiChainsCheck: false,
|
||||
checkpointSnapshot: false,
|
||||
concurrency: 2,
|
||||
copyRetention: 0,
|
||||
deleteFirst: false,
|
||||
exportRetention: 0,
|
||||
fullInterval: 0,
|
||||
healthCheckSr: undefined,
|
||||
healthCheckVmsWithTags: [],
|
||||
maxExportRate: 0,
|
||||
maxMergedDeltasPerRun: Infinity,
|
||||
offlineBackup: false,
|
||||
offlineSnapshot: false,
|
||||
snapshotRetention: 0,
|
||||
timeout: 0,
|
||||
useNbd: false,
|
||||
unconditionalSnapshot: false,
|
||||
validateVhdStreams: false,
|
||||
vmTimeout: 0,
|
||||
}
|
||||
|
||||
const DEFAULT_METADATA_SETTINGS = {
|
||||
retentionPoolMetadata: 0,
|
||||
retentionXoMetadata: 0,
|
||||
}
|
||||
|
||||
class RemoteTimeoutError extends Error {
|
||||
constructor(remoteId) {
|
||||
super('timeout while getting the remote ' + remoteId)
|
||||
this.remoteId = remoteId
|
||||
}
|
||||
}
|
||||
|
||||
exports.Backup = class Backup {
|
||||
constructor({ config, getAdapter, getConnectedRecord, job, schedule }) {
|
||||
this._config = config
|
||||
this._getRecord = getConnectedRecord
|
||||
this._job = job
|
||||
this._schedule = schedule
|
||||
|
||||
this._getSnapshotNameLabel = compileTemplate(config.snapshotNameLabelTpl, {
|
||||
'{job.name}': job.name,
|
||||
'{vm.name_label}': vm => vm.name_label,
|
||||
})
|
||||
|
||||
const { type } = job
|
||||
const baseSettings = { ...DEFAULT_SETTINGS }
|
||||
if (type === 'backup') {
|
||||
Object.assign(baseSettings, DEFAULT_VM_SETTINGS, config.defaultSettings, config.vm?.defaultSettings)
|
||||
this.run = this._runVmBackup
|
||||
} else if (type === 'metadataBackup') {
|
||||
Object.assign(baseSettings, DEFAULT_METADATA_SETTINGS, config.defaultSettings, config.metadata?.defaultSettings)
|
||||
this.run = this._runMetadataBackup
|
||||
} else {
|
||||
throw new Error(`No runner for the backup type ${type}`)
|
||||
}
|
||||
Object.assign(baseSettings, job.settings[''])
|
||||
|
||||
this._baseSettings = baseSettings
|
||||
this._settings = { ...baseSettings, ...job.settings[schedule.id] }
|
||||
|
||||
const { getRemoteTimeout } = this._settings
|
||||
this._getAdapter = async function (remoteId) {
|
||||
try {
|
||||
const disposable = await pTimeout.call(getAdapter(remoteId), getRemoteTimeout, new RemoteTimeoutError(remoteId))
|
||||
|
||||
return new Disposable(() => disposable.dispose(), {
|
||||
adapter: disposable.value,
|
||||
remoteId,
|
||||
})
|
||||
} catch (error) {
|
||||
// See https://github.com/vatesfr/xen-orchestra/commit/6aa6cfba8ec939c0288f0fa740f6dfad98c43cbb
|
||||
runTask(
|
||||
{
|
||||
name: 'get remote adapter',
|
||||
data: { type: 'remote', id: remoteId },
|
||||
},
|
||||
() => Promise.reject(error)
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async _runMetadataBackup() {
|
||||
const schedule = this._schedule
|
||||
const job = this._job
|
||||
const remoteIds = extractIdsFromSimplePattern(job.remotes)
|
||||
if (remoteIds.length === 0) {
|
||||
throw new Error('metadata backup job cannot run without remotes')
|
||||
}
|
||||
|
||||
const config = this._config
|
||||
const poolIds = extractIdsFromSimplePattern(job.pools)
|
||||
const isEmptyPools = poolIds.length === 0
|
||||
const isXoMetadata = job.xoMetadata !== undefined
|
||||
if (!isXoMetadata && isEmptyPools) {
|
||||
throw new Error('no metadata mode found')
|
||||
}
|
||||
|
||||
const settings = this._settings
|
||||
|
||||
const { retentionPoolMetadata, retentionXoMetadata } = settings
|
||||
|
||||
if (
|
||||
(retentionPoolMetadata === 0 && retentionXoMetadata === 0) ||
|
||||
(!isXoMetadata && retentionPoolMetadata === 0) ||
|
||||
(isEmptyPools && retentionXoMetadata === 0)
|
||||
) {
|
||||
throw new Error('no retentions corresponding to the metadata modes found')
|
||||
}
|
||||
|
||||
await Disposable.use(
|
||||
Disposable.all(
|
||||
poolIds.map(id =>
|
||||
this._getRecord('pool', id).catch(error => {
|
||||
// See https://github.com/vatesfr/xen-orchestra/commit/6aa6cfba8ec939c0288f0fa740f6dfad98c43cbb
|
||||
runTask(
|
||||
{
|
||||
name: 'get pool record',
|
||||
data: { type: 'pool', id },
|
||||
},
|
||||
() => Promise.reject(error)
|
||||
)
|
||||
})
|
||||
)
|
||||
),
|
||||
Disposable.all(remoteIds.map(id => this._getAdapter(id))),
|
||||
async (pools, remoteAdapters) => {
|
||||
// remove adapters that failed (already handled)
|
||||
remoteAdapters = remoteAdapters.filter(_ => _ !== undefined)
|
||||
if (remoteAdapters.length === 0) {
|
||||
return
|
||||
}
|
||||
remoteAdapters = getAdaptersByRemote(remoteAdapters)
|
||||
|
||||
// remove pools that failed (already handled)
|
||||
pools = pools.filter(_ => _ !== undefined)
|
||||
|
||||
const promises = []
|
||||
if (pools.length !== 0 && settings.retentionPoolMetadata !== 0) {
|
||||
promises.push(
|
||||
asyncMap(pools, async pool =>
|
||||
runTask(
|
||||
{
|
||||
name: `Starting metadata backup for the pool (${pool.$id}). (${job.id})`,
|
||||
data: {
|
||||
id: pool.$id,
|
||||
pool,
|
||||
poolMaster: await ignoreErrors.call(pool.$xapi.getRecord('host', pool.master)),
|
||||
type: 'pool',
|
||||
},
|
||||
},
|
||||
() =>
|
||||
new PoolMetadataBackup({
|
||||
config,
|
||||
job,
|
||||
pool,
|
||||
remoteAdapters,
|
||||
schedule,
|
||||
settings,
|
||||
}).run()
|
||||
)
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
if (job.xoMetadata !== undefined && settings.retentionXoMetadata !== 0) {
|
||||
promises.push(
|
||||
runTask(
|
||||
{
|
||||
name: `Starting XO metadata backup. (${job.id})`,
|
||||
data: {
|
||||
type: 'xo',
|
||||
},
|
||||
},
|
||||
() =>
|
||||
new XoMetadataBackup({
|
||||
config,
|
||||
job,
|
||||
remoteAdapters,
|
||||
schedule,
|
||||
settings,
|
||||
}).run()
|
||||
)
|
||||
)
|
||||
}
|
||||
await Promise.all(promises)
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
async _runVmBackup() {
|
||||
const job = this._job
|
||||
|
||||
// FIXME: proper SimpleIdPattern handling
|
||||
const getSnapshotNameLabel = this._getSnapshotNameLabel
|
||||
const schedule = this._schedule
|
||||
const settings = this._settings
|
||||
|
||||
const throttleStream = createStreamThrottle(settings.maxExportRate)
|
||||
|
||||
const config = this._config
|
||||
await Disposable.use(
|
||||
Disposable.all(
|
||||
extractIdsFromSimplePattern(job.srs).map(id =>
|
||||
this._getRecord('SR', id).catch(error => {
|
||||
runTask(
|
||||
{
|
||||
name: 'get SR record',
|
||||
data: { type: 'SR', id },
|
||||
},
|
||||
() => Promise.reject(error)
|
||||
)
|
||||
})
|
||||
)
|
||||
),
|
||||
Disposable.all(extractIdsFromSimplePattern(job.remotes).map(id => this._getAdapter(id))),
|
||||
() => (settings.healthCheckSr !== undefined ? this._getRecord('SR', settings.healthCheckSr) : undefined),
|
||||
async (srs, remoteAdapters, healthCheckSr) => {
|
||||
// remove adapters that failed (already handled)
|
||||
remoteAdapters = remoteAdapters.filter(_ => _ !== undefined)
|
||||
|
||||
// remove srs that failed (already handled)
|
||||
srs = srs.filter(_ => _ !== undefined)
|
||||
|
||||
if (remoteAdapters.length === 0 && srs.length === 0 && settings.snapshotRetention === 0) {
|
||||
return
|
||||
}
|
||||
|
||||
const vmIds = extractIdsFromSimplePattern(job.vms)
|
||||
|
||||
Task.info('vms', { vms: vmIds })
|
||||
|
||||
remoteAdapters = getAdaptersByRemote(remoteAdapters)
|
||||
|
||||
const allSettings = this._job.settings
|
||||
const baseSettings = this._baseSettings
|
||||
|
||||
const handleVm = vmUuid => {
|
||||
const taskStart = { name: 'backup VM', data: { type: 'VM', id: vmUuid } }
|
||||
|
||||
return this._getRecord('VM', vmUuid).then(
|
||||
disposableVm =>
|
||||
Disposable.use(disposableVm, vm => {
|
||||
taskStart.data.name_label = vm.name_label
|
||||
return runTask(taskStart, () =>
|
||||
new VmBackup({
|
||||
baseSettings,
|
||||
config,
|
||||
getSnapshotNameLabel,
|
||||
healthCheckSr,
|
||||
job,
|
||||
remoteAdapters,
|
||||
schedule,
|
||||
settings: { ...settings, ...allSettings[vm.uuid] },
|
||||
srs,
|
||||
throttleStream,
|
||||
vm,
|
||||
}).run()
|
||||
)
|
||||
}),
|
||||
error =>
|
||||
runTask(taskStart, () => {
|
||||
throw error
|
||||
})
|
||||
)
|
||||
}
|
||||
const { concurrency } = settings
|
||||
await asyncMapSettled(vmIds, concurrency === 0 ? handleVm : limitConcurrency(concurrency)(handleVm))
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -3,7 +3,7 @@
|
||||
const assert = require('assert')
|
||||
|
||||
const { formatFilenameDate } = require('./_filenameDate.js')
|
||||
const { importDeltaVm } = require('./_deltaVm.js')
|
||||
const { importIncrementalVm } = require('./_incrementalVm.js')
|
||||
const { Task } = require('./Task.js')
|
||||
const { watchStreamSize } = require('./_watchStreamSize.js')
|
||||
|
||||
@@ -49,7 +49,7 @@ exports.ImportVmBackup = class ImportVmBackup {
|
||||
|
||||
const vmRef = isFull
|
||||
? await xapi.VM_import(backup, srRef)
|
||||
: await importDeltaVm(backup, await xapi.getRecord('SR', srRef), {
|
||||
: await importIncrementalVm(backup, await xapi.getRecord('SR', srRef), {
|
||||
...this._importDeltaVmSettings,
|
||||
detectBase: false,
|
||||
})
|
||||
|
||||
@@ -28,7 +28,7 @@ const { isMetadataFile } = require('./_backupType.js')
|
||||
const { isValidXva } = require('./_isValidXva.js')
|
||||
const { listPartitions, LVM_PARTITION_TYPE } = require('./_listPartitions.js')
|
||||
const { lvs, pvs } = require('./_lvm.js')
|
||||
const { watchStreamSize } = require('./_watchStreamSize')
|
||||
const { watchStreamSize } = require('./_watchStreamSize.js')
|
||||
// @todo : this import is marked extraneous , sould be fixed when lib is published
|
||||
const { mount } = require('@vates/fuse-vhd')
|
||||
const { asyncEach } = require('@vates/async-each')
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
'use strict'
|
||||
|
||||
const { DIR_XO_POOL_METADATA_BACKUPS } = require('./RemoteAdapter.js')
|
||||
const { PATH_DB_DUMP } = require('./_PoolMetadataBackup.js')
|
||||
const { PATH_DB_DUMP } = require('./_backupJob/PoolMetadataBackup.js')
|
||||
|
||||
exports.RestoreMetadataBackup = class RestoreMetadataBackup {
|
||||
constructor({ backupId, handler, xapi }) {
|
||||
|
||||
@@ -1,515 +0,0 @@
|
||||
'use strict'
|
||||
|
||||
const assert = require('assert')
|
||||
const findLast = require('lodash/findLast.js')
|
||||
const groupBy = require('lodash/groupBy.js')
|
||||
const ignoreErrors = require('promise-toolbox/ignoreErrors')
|
||||
const keyBy = require('lodash/keyBy.js')
|
||||
const mapValues = require('lodash/mapValues.js')
|
||||
const vhdStreamValidator = require('vhd-lib/vhdStreamValidator.js')
|
||||
const { asyncMap } = require('@xen-orchestra/async-map')
|
||||
const { createLogger } = require('@xen-orchestra/log')
|
||||
const { decorateMethodsWith } = require('@vates/decorate-with')
|
||||
const { defer } = require('golike-defer')
|
||||
const { formatDateTime } = require('@xen-orchestra/xapi')
|
||||
const { pipeline } = require('node:stream')
|
||||
|
||||
const { DeltaBackupWriter } = require('./writers/DeltaBackupWriter.js')
|
||||
const { DeltaReplicationWriter } = require('./writers/DeltaReplicationWriter.js')
|
||||
const { exportDeltaVm } = require('./_deltaVm.js')
|
||||
const { forkStreamUnpipe } = require('./_forkStreamUnpipe.js')
|
||||
const { FullBackupWriter } = require('./writers/FullBackupWriter.js')
|
||||
const { FullReplicationWriter } = require('./writers/FullReplicationWriter.js')
|
||||
const { getOldEntries } = require('./_getOldEntries.js')
|
||||
const { Task } = require('./Task.js')
|
||||
const { watchStreamSize } = require('./_watchStreamSize.js')
|
||||
|
||||
const { debug, warn } = createLogger('xo:backups:VmBackup')
|
||||
|
||||
class AggregateError extends Error {
|
||||
constructor(errors, message) {
|
||||
super(message)
|
||||
this.errors = errors
|
||||
}
|
||||
}
|
||||
|
||||
const asyncEach = async (iterable, fn, thisArg = iterable) => {
|
||||
for (const item of iterable) {
|
||||
await fn.call(thisArg, item)
|
||||
}
|
||||
}
|
||||
|
||||
const forkDeltaExport = deltaExport =>
|
||||
Object.create(deltaExport, {
|
||||
streams: {
|
||||
value: mapValues(deltaExport.streams, forkStreamUnpipe),
|
||||
},
|
||||
})
|
||||
|
||||
const noop = Function.prototype
|
||||
|
||||
class VmBackup {
|
||||
constructor({
|
||||
config,
|
||||
getSnapshotNameLabel,
|
||||
healthCheckSr,
|
||||
job,
|
||||
remoteAdapters,
|
||||
remotes,
|
||||
schedule,
|
||||
settings,
|
||||
srs,
|
||||
throttleStream,
|
||||
vm,
|
||||
}) {
|
||||
if (vm.other_config['xo:backup:job'] === job.id && 'start' in vm.blocked_operations) {
|
||||
// don't match replicated VMs created by this very job otherwise they
|
||||
// will be replicated again and again
|
||||
throw new Error('cannot backup a VM created by this very job')
|
||||
}
|
||||
|
||||
this.config = config
|
||||
this.job = job
|
||||
this.remoteAdapters = remoteAdapters
|
||||
this.scheduleId = schedule.id
|
||||
this.timestamp = undefined
|
||||
|
||||
// VM currently backed up
|
||||
this.vm = vm
|
||||
const { tags } = this.vm
|
||||
|
||||
// VM (snapshot) that is really exported
|
||||
this.exportedVm = undefined
|
||||
|
||||
this._fullVdisRequired = undefined
|
||||
this._getSnapshotNameLabel = getSnapshotNameLabel
|
||||
this._isDelta = job.mode === 'delta'
|
||||
this._healthCheckSr = healthCheckSr
|
||||
this._jobId = job.id
|
||||
this._jobSnapshots = undefined
|
||||
this._throttleStream = throttleStream
|
||||
this._xapi = vm.$xapi
|
||||
|
||||
// Base VM for the export
|
||||
this._baseVm = undefined
|
||||
|
||||
// Settings for this specific run (job, schedule, VM)
|
||||
if (tags.includes('xo-memory-backup')) {
|
||||
settings.checkpointSnapshot = true
|
||||
}
|
||||
if (tags.includes('xo-offline-backup')) {
|
||||
settings.offlineSnapshot = true
|
||||
}
|
||||
this._settings = settings
|
||||
|
||||
// Create writers
|
||||
{
|
||||
const writers = new Set()
|
||||
this._writers = writers
|
||||
|
||||
const [BackupWriter, ReplicationWriter] = this._isDelta
|
||||
? [DeltaBackupWriter, DeltaReplicationWriter]
|
||||
: [FullBackupWriter, FullReplicationWriter]
|
||||
|
||||
const allSettings = job.settings
|
||||
Object.keys(remoteAdapters).forEach(remoteId => {
|
||||
const targetSettings = {
|
||||
...settings,
|
||||
...allSettings[remoteId],
|
||||
}
|
||||
if (targetSettings.exportRetention !== 0) {
|
||||
writers.add(new BackupWriter({ backup: this, remoteId, settings: targetSettings }))
|
||||
}
|
||||
})
|
||||
srs.forEach(sr => {
|
||||
const targetSettings = {
|
||||
...settings,
|
||||
...allSettings[sr.uuid],
|
||||
}
|
||||
if (targetSettings.copyRetention !== 0) {
|
||||
writers.add(new ReplicationWriter({ backup: this, sr, settings: targetSettings }))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// calls fn for each function, warns of any errors, and throws only if there are no writers left
|
||||
async _callWriters(fn, step, parallel = true) {
|
||||
const writers = this._writers
|
||||
const n = writers.size
|
||||
if (n === 0) {
|
||||
return
|
||||
}
|
||||
|
||||
async function callWriter(writer) {
|
||||
const { name } = writer.constructor
|
||||
try {
|
||||
debug('writer step starting', { step, writer: name })
|
||||
await fn(writer)
|
||||
debug('writer step succeeded', { duration: step, writer: name })
|
||||
} catch (error) {
|
||||
writers.delete(writer)
|
||||
|
||||
warn('writer step failed', { error, step, writer: name })
|
||||
|
||||
// these two steps are the only one that are not already in their own sub tasks
|
||||
if (step === 'writer.checkBaseVdis()' || step === 'writer.beforeBackup()') {
|
||||
Task.warning(
|
||||
`the writer ${name} has failed the step ${step} with error ${error.message}. It won't be used anymore in this job execution.`
|
||||
)
|
||||
}
|
||||
|
||||
throw error
|
||||
}
|
||||
}
|
||||
if (n === 1) {
|
||||
const [writer] = writers
|
||||
return callWriter(writer)
|
||||
}
|
||||
|
||||
const errors = []
|
||||
await (parallel ? asyncMap : asyncEach)(writers, async function (writer) {
|
||||
try {
|
||||
await callWriter(writer)
|
||||
} catch (error) {
|
||||
errors.push(error)
|
||||
}
|
||||
})
|
||||
if (writers.size === 0) {
|
||||
throw new AggregateError(errors, 'all targets have failed, step: ' + step)
|
||||
}
|
||||
}
|
||||
|
||||
// ensure the VM itself does not have any backup metadata which would be
|
||||
// copied on manual snapshots and interfere with the backup jobs
|
||||
async _cleanMetadata() {
|
||||
const { vm } = this
|
||||
if ('xo:backup:job' in vm.other_config) {
|
||||
await vm.update_other_config({
|
||||
'xo:backup:datetime': null,
|
||||
'xo:backup:deltaChainLength': null,
|
||||
'xo:backup:exported': null,
|
||||
'xo:backup:job': null,
|
||||
'xo:backup:schedule': null,
|
||||
'xo:backup:vm': null,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
async _snapshot() {
|
||||
const { vm } = this
|
||||
const xapi = this._xapi
|
||||
|
||||
const settings = this._settings
|
||||
|
||||
const doSnapshot =
|
||||
settings.unconditionalSnapshot ||
|
||||
this._isDelta ||
|
||||
(!settings.offlineBackup && vm.power_state === 'Running') ||
|
||||
settings.snapshotRetention !== 0
|
||||
if (doSnapshot) {
|
||||
await Task.run({ name: 'snapshot' }, async () => {
|
||||
if (!settings.bypassVdiChainsCheck) {
|
||||
await vm.$assertHealthyVdiChains()
|
||||
}
|
||||
|
||||
const snapshotRef = await vm[settings.checkpointSnapshot ? '$checkpoint' : '$snapshot']({
|
||||
ignoreNobakVdis: true,
|
||||
name_label: this._getSnapshotNameLabel(vm),
|
||||
unplugVusbs: true,
|
||||
})
|
||||
this.timestamp = Date.now()
|
||||
|
||||
await xapi.setFieldEntries('VM', snapshotRef, 'other_config', {
|
||||
'xo:backup:datetime': formatDateTime(this.timestamp),
|
||||
'xo:backup:job': this._jobId,
|
||||
'xo:backup:schedule': this.scheduleId,
|
||||
'xo:backup:vm': vm.uuid,
|
||||
})
|
||||
|
||||
this.exportedVm = await xapi.getRecord('VM', snapshotRef)
|
||||
|
||||
return this.exportedVm.uuid
|
||||
})
|
||||
} else {
|
||||
this.exportedVm = vm
|
||||
this.timestamp = Date.now()
|
||||
}
|
||||
}
|
||||
|
||||
async _copyDelta() {
|
||||
const { exportedVm } = this
|
||||
const baseVm = this._baseVm
|
||||
const fullVdisRequired = this._fullVdisRequired
|
||||
|
||||
const isFull = fullVdisRequired === undefined || fullVdisRequired.size !== 0
|
||||
|
||||
await this._callWriters(writer => writer.prepare({ isFull }), 'writer.prepare()')
|
||||
|
||||
const deltaExport = await exportDeltaVm(exportedVm, baseVm, {
|
||||
fullVdisRequired,
|
||||
})
|
||||
// since NBD is network based, if one disk use nbd , all the disk use them
|
||||
// except the suspended VDI
|
||||
if (Object.values(deltaExport.streams).some(({ _nbd }) => _nbd)) {
|
||||
Task.info('Transfer data using NBD')
|
||||
}
|
||||
const sizeContainers = mapValues(deltaExport.streams, stream => watchStreamSize(stream))
|
||||
|
||||
if (this._settings.validateVhdStreams) {
|
||||
deltaExport.streams = mapValues(deltaExport.streams, stream => pipeline(stream, vhdStreamValidator, noop))
|
||||
}
|
||||
|
||||
deltaExport.streams = mapValues(deltaExport.streams, this._throttleStream)
|
||||
|
||||
const timestamp = Date.now()
|
||||
|
||||
await this._callWriters(
|
||||
writer =>
|
||||
writer.transfer({
|
||||
deltaExport: forkDeltaExport(deltaExport),
|
||||
sizeContainers,
|
||||
timestamp,
|
||||
}),
|
||||
'writer.transfer()'
|
||||
)
|
||||
|
||||
this._baseVm = exportedVm
|
||||
|
||||
if (baseVm !== undefined) {
|
||||
await exportedVm.update_other_config(
|
||||
'xo:backup:deltaChainLength',
|
||||
String(+(baseVm.other_config['xo:backup:deltaChainLength'] ?? 0) + 1)
|
||||
)
|
||||
}
|
||||
|
||||
// not the case if offlineBackup
|
||||
if (exportedVm.is_a_snapshot) {
|
||||
await exportedVm.update_other_config('xo:backup:exported', 'true')
|
||||
}
|
||||
|
||||
const size = Object.values(sizeContainers).reduce((sum, { size }) => sum + size, 0)
|
||||
const end = Date.now()
|
||||
const duration = end - timestamp
|
||||
debug('transfer complete', {
|
||||
duration,
|
||||
speed: duration !== 0 ? (size * 1e3) / 1024 / 1024 / duration : 0,
|
||||
size,
|
||||
})
|
||||
|
||||
await this._callWriters(writer => writer.cleanup(), 'writer.cleanup()')
|
||||
}
|
||||
|
||||
async _copyFull() {
|
||||
const { compression } = this.job
|
||||
const stream = this._throttleStream(
|
||||
await this._xapi.VM_export(this.exportedVm.$ref, {
|
||||
compress: Boolean(compression) && (compression === 'native' ? 'gzip' : 'zstd'),
|
||||
useSnapshot: false,
|
||||
})
|
||||
)
|
||||
const sizeContainer = watchStreamSize(stream)
|
||||
|
||||
const timestamp = Date.now()
|
||||
|
||||
await this._callWriters(
|
||||
writer =>
|
||||
writer.run({
|
||||
sizeContainer,
|
||||
stream: forkStreamUnpipe(stream),
|
||||
timestamp,
|
||||
}),
|
||||
'writer.run()'
|
||||
)
|
||||
|
||||
const { size } = sizeContainer
|
||||
const end = Date.now()
|
||||
const duration = end - timestamp
|
||||
debug('transfer complete', {
|
||||
duration,
|
||||
speed: duration !== 0 ? (size * 1e3) / 1024 / 1024 / duration : 0,
|
||||
size,
|
||||
})
|
||||
}
|
||||
|
||||
async _fetchJobSnapshots() {
|
||||
const jobId = this._jobId
|
||||
const vmRef = this.vm.$ref
|
||||
const xapi = this._xapi
|
||||
|
||||
const snapshotsRef = await xapi.getField('VM', vmRef, 'snapshots')
|
||||
const snapshotsOtherConfig = await asyncMap(snapshotsRef, ref => xapi.getField('VM', ref, 'other_config'))
|
||||
|
||||
const snapshots = []
|
||||
snapshotsOtherConfig.forEach((other_config, i) => {
|
||||
if (other_config['xo:backup:job'] === jobId) {
|
||||
snapshots.push({ other_config, $ref: snapshotsRef[i] })
|
||||
}
|
||||
})
|
||||
snapshots.sort((a, b) => (a.other_config['xo:backup:datetime'] < b.other_config['xo:backup:datetime'] ? -1 : 1))
|
||||
this._jobSnapshots = snapshots
|
||||
}
|
||||
|
||||
async _removeUnusedSnapshots() {
|
||||
const allSettings = this.job.settings
|
||||
const baseSettings = this._baseSettings
|
||||
const baseVmRef = this._baseVm?.$ref
|
||||
|
||||
const snapshotsPerSchedule = groupBy(this._jobSnapshots, _ => _.other_config['xo:backup:schedule'])
|
||||
const xapi = this._xapi
|
||||
await asyncMap(Object.entries(snapshotsPerSchedule), ([scheduleId, snapshots]) => {
|
||||
const settings = {
|
||||
...baseSettings,
|
||||
...allSettings[scheduleId],
|
||||
...allSettings[this.vm.uuid],
|
||||
}
|
||||
return asyncMap(getOldEntries(settings.snapshotRetention, snapshots), ({ $ref }) => {
|
||||
if ($ref !== baseVmRef) {
|
||||
return xapi.VM_destroy($ref)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
async _selectBaseVm() {
|
||||
const xapi = this._xapi
|
||||
|
||||
let baseVm = findLast(this._jobSnapshots, _ => 'xo:backup:exported' in _.other_config)
|
||||
if (baseVm === undefined) {
|
||||
debug('no base VM found')
|
||||
return
|
||||
}
|
||||
|
||||
const fullInterval = this._settings.fullInterval
|
||||
const deltaChainLength = +(baseVm.other_config['xo:backup:deltaChainLength'] ?? 0) + 1
|
||||
if (!(fullInterval === 0 || fullInterval > deltaChainLength)) {
|
||||
debug('not using base VM becaust fullInterval reached')
|
||||
return
|
||||
}
|
||||
|
||||
const srcVdis = keyBy(await xapi.getRecords('VDI', await this.vm.$getDisks()), '$ref')
|
||||
|
||||
// resolve full record
|
||||
baseVm = await xapi.getRecord('VM', baseVm.$ref)
|
||||
|
||||
const baseUuidToSrcVdi = new Map()
|
||||
await asyncMap(await baseVm.$getDisks(), async baseRef => {
|
||||
const [baseUuid, snapshotOf] = await Promise.all([
|
||||
xapi.getField('VDI', baseRef, 'uuid'),
|
||||
xapi.getField('VDI', baseRef, 'snapshot_of'),
|
||||
])
|
||||
const srcVdi = srcVdis[snapshotOf]
|
||||
if (srcVdi !== undefined) {
|
||||
baseUuidToSrcVdi.set(baseUuid, srcVdi)
|
||||
} else {
|
||||
debug('ignore snapshot VDI because no longer present on VM', {
|
||||
vdi: baseUuid,
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
const presentBaseVdis = new Map(baseUuidToSrcVdi)
|
||||
await this._callWriters(
|
||||
writer => presentBaseVdis.size !== 0 && writer.checkBaseVdis(presentBaseVdis, baseVm),
|
||||
'writer.checkBaseVdis()',
|
||||
false
|
||||
)
|
||||
|
||||
if (presentBaseVdis.size === 0) {
|
||||
debug('no base VM found')
|
||||
return
|
||||
}
|
||||
|
||||
const fullVdisRequired = new Set()
|
||||
baseUuidToSrcVdi.forEach((srcVdi, baseUuid) => {
|
||||
if (presentBaseVdis.has(baseUuid)) {
|
||||
debug('found base VDI', {
|
||||
base: baseUuid,
|
||||
vdi: srcVdi.uuid,
|
||||
})
|
||||
} else {
|
||||
debug('missing base VDI', {
|
||||
base: baseUuid,
|
||||
vdi: srcVdi.uuid,
|
||||
})
|
||||
fullVdisRequired.add(srcVdi.uuid)
|
||||
}
|
||||
})
|
||||
|
||||
this._baseVm = baseVm
|
||||
this._fullVdisRequired = fullVdisRequired
|
||||
}
|
||||
|
||||
async _healthCheck() {
|
||||
const settings = this._settings
|
||||
|
||||
if (this._healthCheckSr === undefined) {
|
||||
return
|
||||
}
|
||||
|
||||
// check if current VM has tags
|
||||
const { tags } = this.vm
|
||||
const intersect = settings.healthCheckVmsWithTags.some(t => tags.includes(t))
|
||||
|
||||
if (settings.healthCheckVmsWithTags.length !== 0 && !intersect) {
|
||||
return
|
||||
}
|
||||
|
||||
await this._callWriters(writer => writer.healthCheck(this._healthCheckSr), 'writer.healthCheck()')
|
||||
}
|
||||
|
||||
async run($defer) {
|
||||
const settings = this._settings
|
||||
assert(
|
||||
!settings.offlineBackup || settings.snapshotRetention === 0,
|
||||
'offlineBackup is not compatible with snapshotRetention'
|
||||
)
|
||||
|
||||
await this._callWriters(async writer => {
|
||||
await writer.beforeBackup()
|
||||
$defer(async () => {
|
||||
await writer.afterBackup()
|
||||
})
|
||||
}, 'writer.beforeBackup()')
|
||||
|
||||
await this._fetchJobSnapshots()
|
||||
|
||||
if (this._isDelta) {
|
||||
await this._selectBaseVm()
|
||||
}
|
||||
|
||||
await this._cleanMetadata()
|
||||
await this._removeUnusedSnapshots()
|
||||
|
||||
const { vm } = this
|
||||
const isRunning = vm.power_state === 'Running'
|
||||
const startAfter = isRunning && (settings.offlineBackup ? 'backup' : settings.offlineSnapshot && 'snapshot')
|
||||
if (startAfter) {
|
||||
await vm.$callAsync('clean_shutdown')
|
||||
}
|
||||
|
||||
try {
|
||||
await this._snapshot()
|
||||
if (startAfter === 'snapshot') {
|
||||
ignoreErrors.call(vm.$callAsync('start', false, false))
|
||||
}
|
||||
|
||||
if (this._writers.size !== 0) {
|
||||
await (this._isDelta ? this._copyDelta() : this._copyFull())
|
||||
}
|
||||
} finally {
|
||||
if (startAfter) {
|
||||
ignoreErrors.call(vm.$callAsync('start', false, false))
|
||||
}
|
||||
|
||||
await this._fetchJobSnapshots()
|
||||
await this._removeUnusedSnapshots()
|
||||
}
|
||||
await this._healthCheck()
|
||||
}
|
||||
}
|
||||
exports.VmBackup = VmBackup
|
||||
|
||||
decorateMethodsWith(VmBackup, {
|
||||
run: defer,
|
||||
})
|
||||
51
@xen-orchestra/backups/_backupJob/AbstractBackupJob.js
Normal file
51
@xen-orchestra/backups/_backupJob/AbstractBackupJob.js
Normal file
@@ -0,0 +1,51 @@
|
||||
'use strict'
|
||||
|
||||
const Disposable = require('promise-toolbox/Disposable')
|
||||
const pTimeout = require('promise-toolbox/timeout')
|
||||
const { compileTemplate } = require('@xen-orchestra/template')
|
||||
const { runTask } = require('./runTask.js')
|
||||
const { RemoteTimeoutError } = require('./RemoteTimeoutError.js')
|
||||
|
||||
exports.DEFAULT_SETTINGS = {
|
||||
getRemoteTimeout: 300e3,
|
||||
reportWhen: 'failure',
|
||||
}
|
||||
|
||||
exports.AbstractBackupJob = class AbstractBackupJob {
|
||||
constructor({ config, getAdapter, getConnectedRecord, job, schedule }) {
|
||||
this._config = config
|
||||
this._getRecord = getConnectedRecord
|
||||
this._job = job
|
||||
this._schedule = schedule
|
||||
|
||||
this._getSnapshotNameLabel = compileTemplate(config.snapshotNameLabelTpl, {
|
||||
'{job.name}': job.name,
|
||||
'{vm.name_label}': vm => vm.name_label,
|
||||
})
|
||||
|
||||
const baseSettings = this._computeBaseSettings(config, job)
|
||||
this._baseSettings = baseSettings
|
||||
this._settings = { ...baseSettings, ...job.settings[schedule.id] }
|
||||
|
||||
const { getRemoteTimeout } = this._settings
|
||||
this._getAdapter = async function (remoteId) {
|
||||
try {
|
||||
const disposable = await pTimeout.call(getAdapter(remoteId), getRemoteTimeout, new RemoteTimeoutError(remoteId))
|
||||
|
||||
return new Disposable(() => disposable.dispose(), {
|
||||
adapter: disposable.value,
|
||||
remoteId,
|
||||
})
|
||||
} catch (error) {
|
||||
// See https://github.com/vatesfr/xen-orchestra/commit/6aa6cfba8ec939c0288f0fa740f6dfad98c43cbb
|
||||
runTask(
|
||||
{
|
||||
name: 'get remote adapter',
|
||||
data: { type: 'remote', id: remoteId },
|
||||
},
|
||||
() => Promise.reject(error)
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
134
@xen-orchestra/backups/_backupJob/MetadatasBackupJob.js
Normal file
134
@xen-orchestra/backups/_backupJob/MetadatasBackupJob.js
Normal file
@@ -0,0 +1,134 @@
|
||||
'use strict'
|
||||
|
||||
const { asyncMap } = require('@xen-orchestra/async-map')
|
||||
const Disposable = require('promise-toolbox/Disposable')
|
||||
const ignoreErrors = require('promise-toolbox/ignoreErrors')
|
||||
|
||||
const { extractIdsFromSimplePattern } = require('../extractIdsFromSimplePattern.js')
|
||||
const { PoolMetadataBackup } = require('./PoolMetadataBackup.js')
|
||||
const { XoMetadataBackup } = require('./XoMetadataBackup.js')
|
||||
const { DEFAULT_SETTINGS, AbstractBackupJob } = require('./AbstractBackupJob.js')
|
||||
const { runTask } = require('./runTask.js')
|
||||
const { getAdaptersByRemote } = require('./getAdaptersByRemote.js')
|
||||
|
||||
const DEFAULT_METADATA_SETTINGS = {
|
||||
retentionPoolMetadata: 0,
|
||||
retentionXoMetadata: 0,
|
||||
}
|
||||
|
||||
exports.MetadatasBackupJob = class MetadatasBackupJob extends AbstractBackupJob {
|
||||
_computeBaseSettings(config, job) {
|
||||
const baseSettings = { ...DEFAULT_SETTINGS }
|
||||
Object.assign(baseSettings, DEFAULT_METADATA_SETTINGS, config.defaultSettings, config.metadata?.defaultSettings)
|
||||
Object.assign(baseSettings, job.settings[''])
|
||||
return baseSettings
|
||||
}
|
||||
|
||||
async run() {
|
||||
const schedule = this._schedule
|
||||
const job = this._job
|
||||
const remoteIds = extractIdsFromSimplePattern(job.remotes)
|
||||
if (remoteIds.length === 0) {
|
||||
throw new Error('metadata backup job cannot run without remotes')
|
||||
}
|
||||
|
||||
const config = this._config
|
||||
const poolIds = extractIdsFromSimplePattern(job.pools)
|
||||
const isEmptyPools = poolIds.length === 0
|
||||
const isXoMetadata = job.xoMetadata !== undefined
|
||||
if (!isXoMetadata && isEmptyPools) {
|
||||
throw new Error('no metadata mode found')
|
||||
}
|
||||
|
||||
const settings = this._settings
|
||||
|
||||
const { retentionPoolMetadata, retentionXoMetadata } = settings
|
||||
|
||||
if (
|
||||
(retentionPoolMetadata === 0 && retentionXoMetadata === 0) ||
|
||||
(!isXoMetadata && retentionPoolMetadata === 0) ||
|
||||
(isEmptyPools && retentionXoMetadata === 0)
|
||||
) {
|
||||
throw new Error('no retentions corresponding to the metadata modes found')
|
||||
}
|
||||
|
||||
await Disposable.use(
|
||||
Disposable.all(
|
||||
poolIds.map(id =>
|
||||
this._getRecord('pool', id).catch(error => {
|
||||
// See https://github.com/vatesfr/xen-orchestra/commit/6aa6cfba8ec939c0288f0fa740f6dfad98c43cbb
|
||||
runTask(
|
||||
{
|
||||
name: 'get pool record',
|
||||
data: { type: 'pool', id },
|
||||
},
|
||||
() => Promise.reject(error)
|
||||
)
|
||||
})
|
||||
)
|
||||
),
|
||||
Disposable.all(remoteIds.map(id => this._getAdapter(id))),
|
||||
async (pools, remoteAdapters) => {
|
||||
// remove adapters that failed (already handled)
|
||||
remoteAdapters = remoteAdapters.filter(_ => _ !== undefined)
|
||||
if (remoteAdapters.length === 0) {
|
||||
return
|
||||
}
|
||||
remoteAdapters = getAdaptersByRemote(remoteAdapters)
|
||||
|
||||
// remove pools that failed (already handled)
|
||||
pools = pools.filter(_ => _ !== undefined)
|
||||
|
||||
const promises = []
|
||||
if (pools.length !== 0 && settings.retentionPoolMetadata !== 0) {
|
||||
promises.push(
|
||||
asyncMap(pools, async pool =>
|
||||
runTask(
|
||||
{
|
||||
name: `Starting metadata backup for the pool (${pool.$id}). (${job.id})`,
|
||||
data: {
|
||||
id: pool.$id,
|
||||
pool,
|
||||
poolMaster: await ignoreErrors.call(pool.$xapi.getRecord('host', pool.master)),
|
||||
type: 'pool',
|
||||
},
|
||||
},
|
||||
() =>
|
||||
new PoolMetadataBackup({
|
||||
config,
|
||||
job,
|
||||
pool,
|
||||
remoteAdapters,
|
||||
schedule,
|
||||
settings,
|
||||
}).run()
|
||||
)
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
if (job.xoMetadata !== undefined && settings.retentionXoMetadata !== 0) {
|
||||
promises.push(
|
||||
runTask(
|
||||
{
|
||||
name: `Starting XO metadata backup. (${job.id})`,
|
||||
data: {
|
||||
type: 'xo',
|
||||
},
|
||||
},
|
||||
() =>
|
||||
new XoMetadataBackup({
|
||||
config,
|
||||
job,
|
||||
remoteAdapters,
|
||||
schedule,
|
||||
settings,
|
||||
}).run()
|
||||
)
|
||||
)
|
||||
}
|
||||
await Promise.all(promises)
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -2,10 +2,10 @@
|
||||
|
||||
const { asyncMap } = require('@xen-orchestra/async-map')
|
||||
|
||||
const { DIR_XO_POOL_METADATA_BACKUPS } = require('./RemoteAdapter.js')
|
||||
const { forkStreamUnpipe } = require('./_forkStreamUnpipe.js')
|
||||
const { formatFilenameDate } = require('./_filenameDate.js')
|
||||
const { Task } = require('./Task.js')
|
||||
const { DIR_XO_POOL_METADATA_BACKUPS } = require('../RemoteAdapter.js')
|
||||
const { forkStreamUnpipe } = require('./forkStreamUnpipe.js')
|
||||
const { formatFilenameDate } = require('../_filenameDate.js')
|
||||
const { Task } = require('../Task.js')
|
||||
|
||||
const PATH_DB_DUMP = '/pool/xmldbdump'
|
||||
exports.PATH_DB_DUMP = PATH_DB_DUMP
|
||||
8
@xen-orchestra/backups/_backupJob/RemoteTimeoutError.js
Normal file
8
@xen-orchestra/backups/_backupJob/RemoteTimeoutError.js
Normal file
@@ -0,0 +1,8 @@
|
||||
'use strict'
|
||||
class RemoteTimeoutError extends Error {
|
||||
constructor(remoteId) {
|
||||
super('timeout while getting the remote ' + remoteId)
|
||||
this.remoteId = remoteId
|
||||
}
|
||||
}
|
||||
exports.RemoteTimeoutError = RemoteTimeoutError
|
||||
@@ -0,0 +1,87 @@
|
||||
'use strict'
|
||||
|
||||
const { asyncMap } = require('@xen-orchestra/async-map')
|
||||
const { createLogger } = require('@xen-orchestra/log')
|
||||
const { Task } = require('../../Task.js')
|
||||
|
||||
const { debug, warn } = createLogger('xo:backups:AbstractVmBackup')
|
||||
|
||||
class AggregateError extends Error {
|
||||
constructor(errors, message) {
|
||||
super(message)
|
||||
this.errors = errors
|
||||
}
|
||||
}
|
||||
|
||||
const asyncEach = async (iterable, fn, thisArg = iterable) => {
|
||||
for (const item of iterable) {
|
||||
await fn.call(thisArg, item)
|
||||
}
|
||||
}
|
||||
|
||||
exports.AbstractVmBackup = class AbstractVmBackup {
|
||||
// calls fn for each function, warns of any errors, and throws only if there are no writers left
|
||||
async _callWriters(fn, step, parallel = true) {
|
||||
const writers = this._writers
|
||||
const n = writers.size
|
||||
if (n === 0) {
|
||||
return
|
||||
}
|
||||
|
||||
async function callWriter(writer) {
|
||||
const { name } = writer.constructor
|
||||
try {
|
||||
debug('writer step starting', { step, writer: name })
|
||||
await fn(writer)
|
||||
debug('writer step succeeded', { duration: step, writer: name })
|
||||
} catch (error) {
|
||||
writers.delete(writer)
|
||||
|
||||
warn('writer step failed', { error, step, writer: name })
|
||||
|
||||
// these two steps are the only one that are not already in their own sub tasks
|
||||
if (step === 'writer.checkBaseVdis()' || step === 'writer.beforeBackup()') {
|
||||
Task.warning(
|
||||
`the writer ${name} has failed the step ${step} with error ${error.message}. It won't be used anymore in this job execution.`
|
||||
)
|
||||
}
|
||||
|
||||
throw error
|
||||
}
|
||||
}
|
||||
if (n === 1) {
|
||||
const [writer] = writers
|
||||
return callWriter(writer)
|
||||
}
|
||||
|
||||
const errors = []
|
||||
await (parallel ? asyncMap : asyncEach)(writers, async function (writer) {
|
||||
try {
|
||||
await callWriter(writer)
|
||||
} catch (error) {
|
||||
errors.push(error)
|
||||
}
|
||||
})
|
||||
if (writers.size === 0) {
|
||||
throw new AggregateError(errors, 'all targets have failed, step: ' + step)
|
||||
}
|
||||
}
|
||||
|
||||
async _healthCheck() {
|
||||
const settings = this._settings
|
||||
|
||||
if (this._healthCheckSr === undefined) {
|
||||
return
|
||||
}
|
||||
|
||||
// check if current VM has tags
|
||||
const { tags } = this.vm
|
||||
const intersect = settings.healthCheckVmsWithTags.some(t => tags.includes(t))
|
||||
|
||||
if (settings.healthCheckVmsWithTags.length !== 0 && !intersect) {
|
||||
return
|
||||
}
|
||||
|
||||
await this._callWriters(writer => writer.healthCheck(this._healthCheckSr), 'writer.healthCheck()')
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,258 @@
|
||||
'use strict'
|
||||
|
||||
const assert = require('assert')
|
||||
const groupBy = require('lodash/groupBy.js')
|
||||
const ignoreErrors = require('promise-toolbox/ignoreErrors')
|
||||
const { asyncMap } = require('@xen-orchestra/async-map')
|
||||
const { decorateMethodsWith } = require('@vates/decorate-with')
|
||||
const { defer } = require('golike-defer')
|
||||
const { formatDateTime } = require('@xen-orchestra/xapi')
|
||||
|
||||
const { getOldEntries } = require('../../_getOldEntries.js')
|
||||
const { Task } = require('../../Task.js')
|
||||
const { AbstractVmBackup } = require('./AbstractVMBackup.js')
|
||||
|
||||
class AbstractXapiVmBackup extends AbstractVmBackup {
|
||||
constructor({
|
||||
config,
|
||||
getSnapshotNameLabel,
|
||||
healthCheckSr,
|
||||
job,
|
||||
remoteAdapters,
|
||||
remotes,
|
||||
schedule,
|
||||
settings,
|
||||
srs,
|
||||
throttleStream,
|
||||
vm,
|
||||
}) {
|
||||
super()
|
||||
if (vm.other_config['xo:backup:job'] === job.id && 'start' in vm.blocked_operations) {
|
||||
// don't match replicated VMs created by this very job otherwise they
|
||||
// will be replicated again and again
|
||||
throw new Error('cannot backup a VM created by this very job')
|
||||
}
|
||||
|
||||
this.config = config
|
||||
this.job = job
|
||||
this.remoteAdapters = remoteAdapters
|
||||
this.scheduleId = schedule.id
|
||||
this.timestamp = undefined
|
||||
|
||||
// VM currently backed up
|
||||
this.vm = vm
|
||||
const { tags } = this.vm
|
||||
|
||||
// VM (snapshot) that is really exported
|
||||
this.exportedVm = undefined
|
||||
|
||||
this._fullVdisRequired = undefined
|
||||
this._getSnapshotNameLabel = getSnapshotNameLabel
|
||||
this._isIncremental = job.mode === 'delta'
|
||||
this._healthCheckSr = healthCheckSr
|
||||
this._jobId = job.id
|
||||
this._jobSnapshots = undefined
|
||||
this._throttleStream = throttleStream
|
||||
this._xapi = vm.$xapi
|
||||
|
||||
// Base VM for the export
|
||||
this._baseVm = undefined
|
||||
|
||||
// Settings for this specific run (job, schedule, VM)
|
||||
if (tags.includes('xo-memory-backup')) {
|
||||
settings.checkpointSnapshot = true
|
||||
}
|
||||
if (tags.includes('xo-offline-backup')) {
|
||||
settings.offlineSnapshot = true
|
||||
}
|
||||
this._settings = settings
|
||||
|
||||
// Create writers
|
||||
{
|
||||
const writers = new Set()
|
||||
this._writers = writers
|
||||
|
||||
const [BackupWriter, ReplicationWriter] = this._getWriters()
|
||||
|
||||
const allSettings = job.settings
|
||||
Object.keys(remoteAdapters).forEach(remoteId => {
|
||||
const targetSettings = {
|
||||
...settings,
|
||||
...allSettings[remoteId],
|
||||
}
|
||||
if (targetSettings.exportRetention !== 0) {
|
||||
writers.add(new BackupWriter({ backup: this, remoteId, settings: targetSettings }))
|
||||
}
|
||||
})
|
||||
srs.forEach(sr => {
|
||||
const targetSettings = {
|
||||
...settings,
|
||||
...allSettings[sr.uuid],
|
||||
}
|
||||
if (targetSettings.copyRetention !== 0) {
|
||||
writers.add(new ReplicationWriter({ backup: this, sr, settings: targetSettings }))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// ensure the VM itself does not have any backup metadata which would be
|
||||
// copied on manual snapshots and interfere with the backup jobs
|
||||
async _cleanMetadata() {
|
||||
const { vm } = this
|
||||
if ('xo:backup:job' in vm.other_config) {
|
||||
await vm.update_other_config({
|
||||
'xo:backup:datetime': null,
|
||||
'xo:backup:deltaChainLength': null,
|
||||
'xo:backup:exported': null,
|
||||
'xo:backup:job': null,
|
||||
'xo:backup:schedule': null,
|
||||
'xo:backup:vm': null,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
async _snapshot() {
|
||||
const { vm } = this
|
||||
const xapi = this._xapi
|
||||
|
||||
const settings = this._settings
|
||||
|
||||
if (this._mustDoSnapshot()) {
|
||||
await Task.run({ name: 'snapshot' }, async () => {
|
||||
if (!settings.bypassVdiChainsCheck) {
|
||||
await vm.$assertHealthyVdiChains()
|
||||
}
|
||||
|
||||
const snapshotRef = await vm[settings.checkpointSnapshot ? '$checkpoint' : '$snapshot']({
|
||||
ignoreNobakVdis: true,
|
||||
name_label: this._getSnapshotNameLabel(vm),
|
||||
unplugVusbs: true,
|
||||
})
|
||||
this.timestamp = Date.now()
|
||||
|
||||
await xapi.setFieldEntries('VM', snapshotRef, 'other_config', {
|
||||
'xo:backup:datetime': formatDateTime(this.timestamp),
|
||||
'xo:backup:job': this._jobId,
|
||||
'xo:backup:schedule': this.scheduleId,
|
||||
'xo:backup:vm': vm.uuid,
|
||||
})
|
||||
|
||||
this.exportedVm = await xapi.getRecord('VM', snapshotRef)
|
||||
|
||||
return this.exportedVm.uuid
|
||||
})
|
||||
} else {
|
||||
this.exportedVm = vm
|
||||
this.timestamp = Date.now()
|
||||
}
|
||||
}
|
||||
|
||||
async _fetchJobSnapshots() {
|
||||
const jobId = this._jobId
|
||||
const vmRef = this.vm.$ref
|
||||
const xapi = this._xapi
|
||||
|
||||
const snapshotsRef = await xapi.getField('VM', vmRef, 'snapshots')
|
||||
const snapshotsOtherConfig = await asyncMap(snapshotsRef, ref => xapi.getField('VM', ref, 'other_config'))
|
||||
|
||||
const snapshots = []
|
||||
snapshotsOtherConfig.forEach((other_config, i) => {
|
||||
if (other_config['xo:backup:job'] === jobId) {
|
||||
snapshots.push({ other_config, $ref: snapshotsRef[i] })
|
||||
}
|
||||
})
|
||||
snapshots.sort((a, b) => (a.other_config['xo:backup:datetime'] < b.other_config['xo:backup:datetime'] ? -1 : 1))
|
||||
this._jobSnapshots = snapshots
|
||||
}
|
||||
|
||||
async _removeUnusedSnapshots() {
|
||||
const allSettings = this.job.settings
|
||||
const baseSettings = this._baseSettings
|
||||
const baseVmRef = this._baseVm?.$ref
|
||||
|
||||
const snapshotsPerSchedule = groupBy(this._jobSnapshots, _ => _.other_config['xo:backup:schedule'])
|
||||
const xapi = this._xapi
|
||||
await asyncMap(Object.entries(snapshotsPerSchedule), ([scheduleId, snapshots]) => {
|
||||
const settings = {
|
||||
...baseSettings,
|
||||
...allSettings[scheduleId],
|
||||
...allSettings[this.vm.uuid],
|
||||
}
|
||||
return asyncMap(getOldEntries(settings.snapshotRetention, snapshots), ({ $ref }) => {
|
||||
if ($ref !== baseVmRef) {
|
||||
return xapi.VM_destroy($ref)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
async copy() {
|
||||
throw new Error('Not implemented')
|
||||
}
|
||||
|
||||
_getWriters() {
|
||||
throw new Error('Not implemented')
|
||||
}
|
||||
|
||||
_mustDoSnapshot() {
|
||||
throw new Error('Not implemented')
|
||||
}
|
||||
|
||||
async _selectBaseVm() {
|
||||
throw new Error('Not implemented')
|
||||
}
|
||||
|
||||
async run($defer) {
|
||||
const settings = this._settings
|
||||
assert(
|
||||
!settings.offlineBackup || settings.snapshotRetention === 0,
|
||||
'offlineBackup is not compatible with snapshotRetention'
|
||||
)
|
||||
|
||||
await this._callWriters(async writer => {
|
||||
await writer.beforeBackup()
|
||||
$defer(async () => {
|
||||
await writer.afterBackup()
|
||||
})
|
||||
}, 'writer.beforeBackup()')
|
||||
|
||||
await this._fetchJobSnapshots()
|
||||
|
||||
await this._selectBaseVm()
|
||||
|
||||
await this._cleanMetadata()
|
||||
await this._removeUnusedSnapshots()
|
||||
|
||||
const { vm } = this
|
||||
const isRunning = vm.power_state === 'Running'
|
||||
const startAfter = isRunning && (settings.offlineBackup ? 'backup' : settings.offlineSnapshot && 'snapshot')
|
||||
if (startAfter) {
|
||||
await vm.$callAsync('clean_shutdown')
|
||||
}
|
||||
|
||||
try {
|
||||
await this._snapshot()
|
||||
if (startAfter === 'snapshot') {
|
||||
ignoreErrors.call(vm.$callAsync('start', false, false))
|
||||
}
|
||||
|
||||
if (this._writers.size !== 0) {
|
||||
await this._copy()
|
||||
}
|
||||
} finally {
|
||||
if (startAfter) {
|
||||
ignoreErrors.call(vm.$callAsync('start', false, false))
|
||||
}
|
||||
|
||||
await this._fetchJobSnapshots()
|
||||
await this._removeUnusedSnapshots()
|
||||
}
|
||||
await this._healthCheck()
|
||||
}
|
||||
}
|
||||
exports.AbstractXapiVmBackup = AbstractXapiVmBackup
|
||||
|
||||
decorateMethodsWith(AbstractXapiVmBackup, {
|
||||
run: defer,
|
||||
})
|
||||
@@ -0,0 +1,61 @@
|
||||
'use strict'
|
||||
|
||||
const { createLogger } = require('@xen-orchestra/log')
|
||||
|
||||
const { forkStreamUnpipe } = require('../forkStreamUnpipe.js')
|
||||
const { FullRemoteWriter } = require('./writers/FullRemoteWriter.js')
|
||||
const { FullXapiWriter } = require('./writers/FullXapiWriter.js')
|
||||
const { watchStreamSize } = require('../../_watchStreamSize.js')
|
||||
const { AbstractXapiVmBackup } = require('./AbstractXapiVmBackup.js')
|
||||
|
||||
const { debug } = createLogger('xo:backups:FullXapiVmBackup')
|
||||
|
||||
exports.FullXapiVmBackup = class FullXapiVmBackup extends AbstractXapiVmBackup {
|
||||
_getWriters() {
|
||||
return [FullRemoteWriter, FullXapiWriter]
|
||||
}
|
||||
|
||||
_mustDoSnapshot() {
|
||||
const { vm } = this
|
||||
|
||||
const settings = this._settings
|
||||
return (
|
||||
settings.unconditionalSnapshot ||
|
||||
(!settings.offlineBackup && vm.power_state === 'Running') ||
|
||||
settings.snapshotRetention !== 0
|
||||
)
|
||||
}
|
||||
_selectBaseVm() {}
|
||||
|
||||
async _copy() {
|
||||
const { compression } = this.job
|
||||
const stream = this._throttleStream(
|
||||
await this._xapi.VM_export(this.exportedVm.$ref, {
|
||||
compress: Boolean(compression) && (compression === 'native' ? 'gzip' : 'zstd'),
|
||||
useSnapshot: false,
|
||||
})
|
||||
)
|
||||
const sizeContainer = watchStreamSize(stream)
|
||||
|
||||
const timestamp = Date.now()
|
||||
|
||||
await this._callWriters(
|
||||
writer =>
|
||||
writer.run({
|
||||
sizeContainer,
|
||||
stream: forkStreamUnpipe(stream),
|
||||
timestamp,
|
||||
}),
|
||||
'writer.run()'
|
||||
)
|
||||
|
||||
const { size } = sizeContainer
|
||||
const end = Date.now()
|
||||
const duration = end - timestamp
|
||||
debug('transfer complete', {
|
||||
duration,
|
||||
speed: duration !== 0 ? (size * 1e3) / 1024 / 1024 / duration : 0,
|
||||
size,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,170 @@
|
||||
'use strict'
|
||||
|
||||
const findLast = require('lodash/findLast.js')
|
||||
const keyBy = require('lodash/keyBy.js')
|
||||
const mapValues = require('lodash/mapValues.js')
|
||||
const vhdStreamValidator = require('vhd-lib/vhdStreamValidator.js')
|
||||
const { asyncMap } = require('@xen-orchestra/async-map')
|
||||
const { createLogger } = require('@xen-orchestra/log')
|
||||
const { pipeline } = require('node:stream')
|
||||
|
||||
const { IncrementalRemoteWriter } = require('./writers/IncrementalRemoteWriter.js')
|
||||
const { IncrementalXapiWriter } = require('./writers/IncrementalXapiWriter.js')
|
||||
const { exportIncrementalVm } = require('../../_incrementalVm.js')
|
||||
const { forkStreamUnpipe } = require('../forkStreamUnpipe.js')
|
||||
const { Task } = require('../../Task.js')
|
||||
const { watchStreamSize } = require('../../_watchStreamSize.js')
|
||||
const { AbstractXapiVmBackup } = require('./AbstractXapiVmBackup.js')
|
||||
|
||||
const { debug } = createLogger('xo:backups:IncrementalXapiVmBackup')
|
||||
|
||||
const forkDeltaExport = deltaExport =>
|
||||
Object.create(deltaExport, {
|
||||
streams: {
|
||||
value: mapValues(deltaExport.streams, forkStreamUnpipe),
|
||||
},
|
||||
})
|
||||
|
||||
const noop = Function.prototype
|
||||
|
||||
exports.IncrementalXapiVmBackup = class IncrementalXapiVmBackup extends AbstractXapiVmBackup {
|
||||
_getWriters() {
|
||||
return [IncrementalRemoteWriter, IncrementalXapiWriter]
|
||||
}
|
||||
|
||||
_mustDoSnapshot() {
|
||||
return true
|
||||
}
|
||||
|
||||
async _copy() {
|
||||
const { exportedVm } = this
|
||||
const baseVm = this._baseVm
|
||||
const fullVdisRequired = this._fullVdisRequired
|
||||
|
||||
const isFull = fullVdisRequired === undefined || fullVdisRequired.size !== 0
|
||||
|
||||
await this._callWriters(writer => writer.prepare({ isFull }), 'writer.prepare()')
|
||||
|
||||
const deltaExport = await exportIncrementalVm(exportedVm, baseVm, {
|
||||
fullVdisRequired,
|
||||
})
|
||||
// since NBD is network based, if one disk use nbd , all the disk use them
|
||||
// except the suspended VDI
|
||||
if (Object.values(deltaExport.streams).some(({ _nbd }) => _nbd)) {
|
||||
Task.info('Transfer data using NBD')
|
||||
}
|
||||
const sizeContainers = mapValues(deltaExport.streams, stream => watchStreamSize(stream))
|
||||
|
||||
if (this._settings.validateVhdStreams) {
|
||||
deltaExport.streams = mapValues(deltaExport.streams, stream => pipeline(stream, vhdStreamValidator, noop))
|
||||
}
|
||||
|
||||
deltaExport.streams = mapValues(deltaExport.streams, this._throttleStream)
|
||||
|
||||
const timestamp = Date.now()
|
||||
|
||||
await this._callWriters(
|
||||
writer =>
|
||||
writer.transfer({
|
||||
deltaExport: forkDeltaExport(deltaExport),
|
||||
sizeContainers,
|
||||
timestamp,
|
||||
}),
|
||||
'writer.transfer()'
|
||||
)
|
||||
|
||||
this._baseVm = exportedVm
|
||||
|
||||
if (baseVm !== undefined) {
|
||||
await exportedVm.update_other_config(
|
||||
'xo:backup:deltaChainLength',
|
||||
String(+(baseVm.other_config['xo:backup:deltaChainLength'] ?? 0) + 1)
|
||||
)
|
||||
}
|
||||
|
||||
// not the case if offlineBackup
|
||||
if (exportedVm.is_a_snapshot) {
|
||||
await exportedVm.update_other_config('xo:backup:exported', 'true')
|
||||
}
|
||||
|
||||
const size = Object.values(sizeContainers).reduce((sum, { size }) => sum + size, 0)
|
||||
const end = Date.now()
|
||||
const duration = end - timestamp
|
||||
debug('transfer complete', {
|
||||
duration,
|
||||
speed: duration !== 0 ? (size * 1e3) / 1024 / 1024 / duration : 0,
|
||||
size,
|
||||
})
|
||||
|
||||
await this._callWriters(writer => writer.cleanup(), 'writer.cleanup()')
|
||||
}
|
||||
|
||||
async _selectBaseVm() {
|
||||
const xapi = this._xapi
|
||||
|
||||
let baseVm = findLast(this._jobSnapshots, _ => 'xo:backup:exported' in _.other_config)
|
||||
if (baseVm === undefined) {
|
||||
debug('no base VM found')
|
||||
return
|
||||
}
|
||||
|
||||
const fullInterval = this._settings.fullInterval
|
||||
const deltaChainLength = +(baseVm.other_config['xo:backup:deltaChainLength'] ?? 0) + 1
|
||||
if (!(fullInterval === 0 || fullInterval > deltaChainLength)) {
|
||||
debug('not using base VM becaust fullInterval reached')
|
||||
return
|
||||
}
|
||||
|
||||
const srcVdis = keyBy(await xapi.getRecords('VDI', await this.vm.$getDisks()), '$ref')
|
||||
|
||||
// resolve full record
|
||||
baseVm = await xapi.getRecord('VM', baseVm.$ref)
|
||||
|
||||
const baseUuidToSrcVdi = new Map()
|
||||
await asyncMap(await baseVm.$getDisks(), async baseRef => {
|
||||
const [baseUuid, snapshotOf] = await Promise.all([
|
||||
xapi.getField('VDI', baseRef, 'uuid'),
|
||||
xapi.getField('VDI', baseRef, 'snapshot_of'),
|
||||
])
|
||||
const srcVdi = srcVdis[snapshotOf]
|
||||
if (srcVdi !== undefined) {
|
||||
baseUuidToSrcVdi.set(baseUuid, srcVdi)
|
||||
} else {
|
||||
debug('ignore snapshot VDI because no longer present on VM', {
|
||||
vdi: baseUuid,
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
const presentBaseVdis = new Map(baseUuidToSrcVdi)
|
||||
await this._callWriters(
|
||||
writer => presentBaseVdis.size !== 0 && writer.checkBaseVdis(presentBaseVdis, baseVm),
|
||||
'writer.checkBaseVdis()',
|
||||
false
|
||||
)
|
||||
|
||||
if (presentBaseVdis.size === 0) {
|
||||
debug('no base VM found')
|
||||
return
|
||||
}
|
||||
|
||||
const fullVdisRequired = new Set()
|
||||
baseUuidToSrcVdi.forEach((srcVdi, baseUuid) => {
|
||||
if (presentBaseVdis.has(baseUuid)) {
|
||||
debug('found base VDI', {
|
||||
base: baseUuid,
|
||||
vdi: srcVdi.uuid,
|
||||
})
|
||||
} else {
|
||||
debug('missing base VDI', {
|
||||
base: baseUuid,
|
||||
vdi: srcVdi.uuid,
|
||||
})
|
||||
fullVdisRequired.add(srcVdi.uuid)
|
||||
}
|
||||
})
|
||||
|
||||
this._baseVm = baseVm
|
||||
this._fullVdisRequired = fullVdisRequired
|
||||
}
|
||||
}
|
||||
@@ -1,13 +1,13 @@
|
||||
'use strict'
|
||||
|
||||
const { formatFilenameDate } = require('../_filenameDate.js')
|
||||
const { getOldEntries } = require('../_getOldEntries.js')
|
||||
const { Task } = require('../Task.js')
|
||||
const { formatFilenameDate } = require('../../../_filenameDate.js')
|
||||
const { getOldEntries } = require('../../../_getOldEntries.js')
|
||||
const { Task } = require('../../../Task.js')
|
||||
|
||||
const { MixinBackupWriter } = require('./_MixinBackupWriter.js')
|
||||
const { MixinRemoteWriter } = require('./_MixinRemoteWriter.js')
|
||||
const { AbstractFullWriter } = require('./_AbstractFullWriter.js')
|
||||
|
||||
exports.FullBackupWriter = class FullBackupWriter extends MixinBackupWriter(AbstractFullWriter) {
|
||||
exports.FullRemoteWriter = class FullRemoteWriter extends MixinRemoteWriter(AbstractFullWriter) {
|
||||
constructor(props) {
|
||||
super(props)
|
||||
|
||||
@@ -4,15 +4,15 @@ const ignoreErrors = require('promise-toolbox/ignoreErrors')
|
||||
const { asyncMap, asyncMapSettled } = require('@xen-orchestra/async-map')
|
||||
const { formatDateTime } = require('@xen-orchestra/xapi')
|
||||
|
||||
const { formatFilenameDate } = require('../_filenameDate.js')
|
||||
const { getOldEntries } = require('../_getOldEntries.js')
|
||||
const { Task } = require('../Task.js')
|
||||
const { formatFilenameDate } = require('../../../_filenameDate.js')
|
||||
const { getOldEntries } = require('../../../_getOldEntries.js')
|
||||
const { Task } = require('../../../Task.js')
|
||||
|
||||
const { AbstractFullWriter } = require('./_AbstractFullWriter.js')
|
||||
const { MixinReplicationWriter } = require('./_MixinReplicationWriter.js')
|
||||
const { MixinXapiWriter } = require('./_MixinXapiWriter.js')
|
||||
const { listReplicatedVms } = require('./_listReplicatedVms.js')
|
||||
|
||||
exports.FullReplicationWriter = class FullReplicationWriter extends MixinReplicationWriter(AbstractFullWriter) {
|
||||
exports.FullXapiWriter = class FullXapiWriter extends MixinXapiWriter(AbstractFullWriter) {
|
||||
constructor(props) {
|
||||
super(props)
|
||||
|
||||
@@ -11,19 +11,19 @@ const { decorateClass } = require('@vates/decorate-with')
|
||||
const { defer } = require('golike-defer')
|
||||
const { dirname } = require('path')
|
||||
|
||||
const { formatFilenameDate } = require('../_filenameDate.js')
|
||||
const { getOldEntries } = require('../_getOldEntries.js')
|
||||
const { Task } = require('../Task.js')
|
||||
const { formatFilenameDate } = require('../../../_filenameDate.js')
|
||||
const { getOldEntries } = require('../../../_getOldEntries.js')
|
||||
const { Task } = require('../../../Task.js')
|
||||
|
||||
const { MixinBackupWriter } = require('./_MixinBackupWriter.js')
|
||||
const { AbstractDeltaWriter } = require('./_AbstractDeltaWriter.js')
|
||||
const { MixinRemoteWriter } = require('./_MixinRemoteWriter.js')
|
||||
const { AbstractIncrementalWriter } = require('./_AbstractIncrementalWriter.js')
|
||||
const { checkVhd } = require('./_checkVhd.js')
|
||||
const { packUuid } = require('./_packUuid.js')
|
||||
const { Disposable } = require('promise-toolbox')
|
||||
|
||||
const { warn } = createLogger('xo:backups:DeltaBackupWriter')
|
||||
|
||||
class DeltaBackupWriter extends MixinBackupWriter(AbstractDeltaWriter) {
|
||||
class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrementalWriter) {
|
||||
async checkBaseVdis(baseUuidToSrcVdi) {
|
||||
const { handler } = this._adapter
|
||||
const backup = this._backup
|
||||
@@ -227,6 +227,6 @@ class DeltaBackupWriter extends MixinBackupWriter(AbstractDeltaWriter) {
|
||||
// TODO: run cleanup?
|
||||
}
|
||||
}
|
||||
exports.DeltaBackupWriter = decorateClass(DeltaBackupWriter, {
|
||||
exports.IncrementalRemoteWriter = decorateClass(IncrementalRemoteWriter, {
|
||||
_transfer: defer,
|
||||
})
|
||||
@@ -4,16 +4,16 @@ const { asyncMap, asyncMapSettled } = require('@xen-orchestra/async-map')
|
||||
const ignoreErrors = require('promise-toolbox/ignoreErrors')
|
||||
const { formatDateTime } = require('@xen-orchestra/xapi')
|
||||
|
||||
const { formatFilenameDate } = require('../_filenameDate.js')
|
||||
const { getOldEntries } = require('../_getOldEntries.js')
|
||||
const { importDeltaVm, TAG_COPY_SRC } = require('../_deltaVm.js')
|
||||
const { Task } = require('../Task.js')
|
||||
const { formatFilenameDate } = require('../../../_filenameDate.js')
|
||||
const { getOldEntries } = require('../../../_getOldEntries.js')
|
||||
const { importIncrementalVm, TAG_COPY_SRC } = require('../../../_incrementalVm.js')
|
||||
const { Task } = require('../../../Task.js')
|
||||
|
||||
const { AbstractDeltaWriter } = require('./_AbstractDeltaWriter.js')
|
||||
const { MixinReplicationWriter } = require('./_MixinReplicationWriter.js')
|
||||
const { AbstractIncrementalWriter } = require('./_AbstractIncrementalWriter.js')
|
||||
const { MixinXapiWriter } = require('./_MixinXapiWriter.js')
|
||||
const { listReplicatedVms } = require('./_listReplicatedVms.js')
|
||||
|
||||
exports.DeltaReplicationWriter = class DeltaReplicationWriter extends MixinReplicationWriter(AbstractDeltaWriter) {
|
||||
exports.IncrementalXapiWriter = class IncrementalXapiWriter extends MixinXapiWriter(AbstractIncrementalWriter) {
|
||||
async checkBaseVdis(baseUuidToSrcVdi, baseVm) {
|
||||
const sr = this._sr
|
||||
const replicatedVm = listReplicatedVms(sr.$xapi, this._backup.job.id, sr.uuid, this._backup.vm.uuid).find(
|
||||
@@ -90,7 +90,7 @@ exports.DeltaReplicationWriter = class DeltaReplicationWriter extends MixinRepli
|
||||
|
||||
let targetVmRef
|
||||
await Task.run({ name: 'transfer' }, async () => {
|
||||
targetVmRef = await importDeltaVm(
|
||||
targetVmRef = await importIncrementalVm(
|
||||
{
|
||||
__proto__: deltaExport,
|
||||
vm: {
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
const { AbstractWriter } = require('./_AbstractWriter.js')
|
||||
|
||||
exports.AbstractDeltaWriter = class AbstractDeltaWriter extends AbstractWriter {
|
||||
exports.AbstractIncrementalWriter = class AbstractIncrementalWriter extends AbstractWriter {
|
||||
checkBaseVdis(baseUuidToSrcVdi, baseVm) {
|
||||
throw new Error('Not implemented')
|
||||
}
|
||||
@@ -4,17 +4,17 @@ const { createLogger } = require('@xen-orchestra/log')
|
||||
const { join } = require('path')
|
||||
|
||||
const assert = require('assert')
|
||||
const { formatFilenameDate } = require('../_filenameDate.js')
|
||||
const { getVmBackupDir } = require('../_getVmBackupDir.js')
|
||||
const { HealthCheckVmBackup } = require('../HealthCheckVmBackup.js')
|
||||
const { ImportVmBackup } = require('../ImportVmBackup.js')
|
||||
const { Task } = require('../Task.js')
|
||||
const MergeWorker = require('../merge-worker/index.js')
|
||||
const { formatFilenameDate } = require('../../../_filenameDate.js')
|
||||
const { getVmBackupDir } = require('../../../_getVmBackupDir.js')
|
||||
const { HealthCheckVmBackup } = require('../../../HealthCheckVmBackup.js')
|
||||
const { ImportVmBackup } = require('../../../ImportVmBackup.js')
|
||||
const { Task } = require('../../../Task.js')
|
||||
const MergeWorker = require('../../../merge-worker/index.js')
|
||||
|
||||
const { info, warn } = createLogger('xo:backups:MixinBackupWriter')
|
||||
|
||||
exports.MixinBackupWriter = (BaseClass = Object) =>
|
||||
class MixinBackupWriter extends BaseClass {
|
||||
exports.MixinRemoteWriter = (BaseClass = Object) =>
|
||||
class MixinRemoteWriter extends BaseClass {
|
||||
#lock
|
||||
|
||||
constructor({ remoteId, ...rest }) {
|
||||
@@ -1,8 +1,8 @@
|
||||
'use strict'
|
||||
|
||||
const { Task } = require('../Task')
|
||||
const { Task } = require('../../../Task')
|
||||
const assert = require('node:assert/strict')
|
||||
const { HealthCheckVmBackup } = require('../HealthCheckVmBackup')
|
||||
const { HealthCheckVmBackup } = require('../../../HealthCheckVmBackup')
|
||||
|
||||
function extractOpaqueRef(str) {
|
||||
const OPAQUE_REF_RE = /OpaqueRef:[0-9a-z-]+/
|
||||
@@ -12,8 +12,8 @@ function extractOpaqueRef(str) {
|
||||
}
|
||||
return matches[0]
|
||||
}
|
||||
exports.MixinReplicationWriter = (BaseClass = Object) =>
|
||||
class MixinReplicationWriter extends BaseClass {
|
||||
exports.MixinXapiWriter = (BaseClass = Object) =>
|
||||
class MixinXapiWriter extends BaseClass {
|
||||
constructor({ sr, ...rest }) {
|
||||
super(rest)
|
||||
|
||||
138
@xen-orchestra/backups/_backupJob/XapiVmBackupJob.js
Normal file
138
@xen-orchestra/backups/_backupJob/XapiVmBackupJob.js
Normal file
@@ -0,0 +1,138 @@
|
||||
'use strict'
|
||||
|
||||
const { asyncMapSettled } = require('@xen-orchestra/async-map')
|
||||
const Disposable = require('promise-toolbox/Disposable')
|
||||
const { limitConcurrency } = require('limit-concurrency-decorator')
|
||||
|
||||
const { extractIdsFromSimplePattern } = require('../extractIdsFromSimplePattern.js')
|
||||
const { Task } = require('../Task.js')
|
||||
const { createStreamThrottle } = require('./createStreamThrottle.js')
|
||||
const { DEFAULT_SETTINGS, AbstractBackupJob } = require('./AbstractBackupJob.js')
|
||||
const { runTask } = require('./runTask.js')
|
||||
const { getAdaptersByRemote } = require('./getAdaptersByRemote.js')
|
||||
const { IncrementalXapiVmBackup } = require('./VmBackup/IncrementalXapiVmBackup.js')
|
||||
const { FullXapiVmBackup } = require('./VmBackup/FullXapiVmBackup.js')
|
||||
|
||||
const DEFAULT_XAPI_VM_SETTINGS = {
|
||||
bypassVdiChainsCheck: false,
|
||||
checkpointSnapshot: false,
|
||||
concurrency: 2,
|
||||
copyRetention: 0,
|
||||
deleteFirst: false,
|
||||
exportRetention: 0,
|
||||
fullInterval: 0,
|
||||
healthCheckSr: undefined,
|
||||
healthCheckVmsWithTags: [],
|
||||
maxExportRate: 0,
|
||||
maxMergedDeltasPerRun: Infinity,
|
||||
offlineBackup: false,
|
||||
offlineSnapshot: false,
|
||||
snapshotRetention: 0,
|
||||
timeout: 0,
|
||||
useNbd: false,
|
||||
unconditionalSnapshot: false,
|
||||
validateVhdStreams: false,
|
||||
vmTimeout: 0,
|
||||
}
|
||||
|
||||
exports.XapiVmBackupJob = class XapiVmBackupJob extends AbstractBackupJob {
|
||||
_computeBaseSettings(config, job) {
|
||||
const baseSettings = { ...DEFAULT_SETTINGS }
|
||||
Object.assign(baseSettings, DEFAULT_XAPI_VM_SETTINGS, config.defaultSettings, config.vm?.defaultSettings)
|
||||
Object.assign(baseSettings, job.settings[''])
|
||||
return baseSettings
|
||||
}
|
||||
|
||||
async run() {
|
||||
const job = this._job
|
||||
|
||||
// FIXME: proper SimpleIdPattern handling
|
||||
const getSnapshotNameLabel = this._getSnapshotNameLabel
|
||||
const schedule = this._schedule
|
||||
const settings = this._settings
|
||||
|
||||
const throttleStream = createStreamThrottle(settings.maxExportRate)
|
||||
|
||||
const config = this._config
|
||||
await Disposable.use(
|
||||
Disposable.all(
|
||||
extractIdsFromSimplePattern(job.srs).map(id =>
|
||||
this._getRecord('SR', id).catch(error => {
|
||||
runTask(
|
||||
{
|
||||
name: 'get SR record',
|
||||
data: { type: 'SR', id },
|
||||
},
|
||||
() => Promise.reject(error)
|
||||
)
|
||||
})
|
||||
)
|
||||
),
|
||||
Disposable.all(extractIdsFromSimplePattern(job.remotes).map(id => this._getAdapter(id))),
|
||||
() => (settings.healthCheckSr !== undefined ? this._getRecord('SR', settings.healthCheckSr) : undefined),
|
||||
async (srs, remoteAdapters, healthCheckSr) => {
|
||||
// remove adapters that failed (already handled)
|
||||
remoteAdapters = remoteAdapters.filter(_ => _ !== undefined)
|
||||
|
||||
// remove srs that failed (already handled)
|
||||
srs = srs.filter(_ => _ !== undefined)
|
||||
|
||||
if (remoteAdapters.length === 0 && srs.length === 0 && settings.snapshotRetention === 0) {
|
||||
return
|
||||
}
|
||||
|
||||
const vmIds = extractIdsFromSimplePattern(job.vms)
|
||||
|
||||
Task.info('vms', { vms: vmIds })
|
||||
|
||||
remoteAdapters = getAdaptersByRemote(remoteAdapters)
|
||||
|
||||
const allSettings = this._job.settings
|
||||
const baseSettings = this._baseSettings
|
||||
|
||||
const handleVm = vmUuid => {
|
||||
const taskStart = { name: 'backup VM', data: { type: 'VM', id: vmUuid } }
|
||||
|
||||
return this._getRecord('VM', vmUuid).then(
|
||||
disposableVm =>
|
||||
Disposable.use(disposableVm, vm => {
|
||||
taskStart.data.name_label = vm.name_label
|
||||
return runTask(taskStart, () => {
|
||||
const opts = {
|
||||
baseSettings,
|
||||
config,
|
||||
getSnapshotNameLabel,
|
||||
healthCheckSr,
|
||||
job,
|
||||
remoteAdapters,
|
||||
schedule,
|
||||
settings: { ...settings, ...allSettings[vm.uuid] },
|
||||
srs,
|
||||
throttleStream,
|
||||
vm,
|
||||
}
|
||||
let vmBackup
|
||||
if (job.mode === 'delta') {
|
||||
vmBackup = new IncrementalXapiVmBackup(opts)
|
||||
} else {
|
||||
if (job.mode === 'full') {
|
||||
vmBackup = new FullXapiVmBackup(opts)
|
||||
} else {
|
||||
throw new Error(`Job mode ${job.mode} not implemented`)
|
||||
}
|
||||
}
|
||||
return vmBackup.run()
|
||||
})
|
||||
}),
|
||||
error =>
|
||||
runTask(taskStart, () => {
|
||||
throw error
|
||||
})
|
||||
)
|
||||
}
|
||||
const { concurrency } = settings
|
||||
await asyncMapSettled(vmIds, concurrency === 0 ? handleVm : limitConcurrency(concurrency)(handleVm))
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -2,9 +2,9 @@
|
||||
|
||||
const { asyncMap } = require('@xen-orchestra/async-map')
|
||||
|
||||
const { DIR_XO_CONFIG_BACKUPS } = require('./RemoteAdapter.js')
|
||||
const { formatFilenameDate } = require('./_filenameDate.js')
|
||||
const { Task } = require('./Task.js')
|
||||
const { DIR_XO_CONFIG_BACKUPS } = require('../RemoteAdapter.js')
|
||||
const { formatFilenameDate } = require('../_filenameDate.js')
|
||||
const { Task } = require('../Task.js')
|
||||
|
||||
exports.XoMetadataBackup = class XoMetadataBackup {
|
||||
constructor({ config, job, remoteAdapters, schedule, settings }) {
|
||||
@@ -6,7 +6,7 @@ const identity = require('lodash/identity.js')
|
||||
|
||||
const noop = Function.prototype
|
||||
|
||||
module.exports = function createStreamThrottle(rate) {
|
||||
exports.createStreamThrottle = function createStreamThrottle(rate) {
|
||||
if (rate === 0) {
|
||||
return identity
|
||||
}
|
||||
9
@xen-orchestra/backups/_backupJob/getAdaptersByRemote.js
Normal file
9
@xen-orchestra/backups/_backupJob/getAdaptersByRemote.js
Normal file
@@ -0,0 +1,9 @@
|
||||
'use strict'
|
||||
const getAdaptersByRemote = adapters => {
|
||||
const adaptersByRemote = {}
|
||||
adapters.forEach(({ adapter, remoteId }) => {
|
||||
adaptersByRemote[remoteId] = adapter
|
||||
})
|
||||
return adaptersByRemote
|
||||
}
|
||||
exports.getAdaptersByRemote = getAdaptersByRemote
|
||||
6
@xen-orchestra/backups/_backupJob/runTask.js
Normal file
6
@xen-orchestra/backups/_backupJob/runTask.js
Normal file
@@ -0,0 +1,6 @@
|
||||
'use strict'
|
||||
const { Task } = require('../Task.js')
|
||||
const noop = Function.prototype
|
||||
const runTask = (...args) => Task.run(...args).catch(noop) // errors are handled by logs
|
||||
|
||||
exports.runTask = runTask
|
||||
@@ -13,10 +13,10 @@ const { createDebounceResource } = require('@vates/disposable/debounceResource.j
|
||||
const { decorateMethodsWith } = require('@vates/decorate-with')
|
||||
const { deduped } = require('@vates/disposable/deduped.js')
|
||||
const { getHandler } = require('@xen-orchestra/fs')
|
||||
const { instantiateBackupJob } = require('./backupJob.js')
|
||||
const { parseDuration } = require('@vates/parse-duration')
|
||||
const { Xapi } = require('@xen-orchestra/xapi')
|
||||
|
||||
const { Backup } = require('./Backup.js')
|
||||
const { RemoteAdapter } = require('./RemoteAdapter.js')
|
||||
const { Task } = require('./Task.js')
|
||||
|
||||
@@ -48,7 +48,7 @@ class BackupWorker {
|
||||
}
|
||||
|
||||
run() {
|
||||
return new Backup({
|
||||
return instantiateBackupJob({
|
||||
config: this.#config,
|
||||
getAdapter: remoteId => this.getAdapter(this.#remotes[remoteId]),
|
||||
getConnectedRecord: Disposable.factory(async function* getConnectedRecord(type, uuid) {
|
||||
|
||||
@@ -33,7 +33,7 @@ const resolveUuid = async (xapi, cache, uuid, type) => {
|
||||
return ref
|
||||
}
|
||||
|
||||
exports.exportDeltaVm = async function exportDeltaVm(
|
||||
exports.exportIncrementalVm = async function exportIncrementalVm(
|
||||
vm,
|
||||
baseVm,
|
||||
{
|
||||
@@ -143,18 +143,18 @@ exports.exportDeltaVm = async function exportDeltaVm(
|
||||
)
|
||||
}
|
||||
|
||||
exports.importDeltaVm = defer(async function importDeltaVm(
|
||||
exports.importIncrementalVm = defer(async function importIncrementalVm(
|
||||
$defer,
|
||||
deltaVm,
|
||||
incrementalVm,
|
||||
sr,
|
||||
{ cancelToken = CancelToken.none, detectBase = true, mapVdisSrs = {}, newMacAddresses = false } = {}
|
||||
) {
|
||||
const { version } = deltaVm
|
||||
const { version } = incrementalVm
|
||||
if (compareVersions(version, '1.0.0') < 0) {
|
||||
throw new Error(`Unsupported delta backup version: ${version}`)
|
||||
}
|
||||
|
||||
const vmRecord = deltaVm.vm
|
||||
const vmRecord = incrementalVm.vm
|
||||
const xapi = sr.$xapi
|
||||
|
||||
let baseVm
|
||||
@@ -183,7 +183,7 @@ exports.importDeltaVm = defer(async function importDeltaVm(
|
||||
baseVdis[vbd.VDI] = vbd.$VDI
|
||||
}
|
||||
})
|
||||
const vdiRecords = deltaVm.vdis
|
||||
const vdiRecords = incrementalVm.vdis
|
||||
|
||||
// 0. Create suspend_VDI
|
||||
let suspendVdi
|
||||
@@ -240,7 +240,7 @@ exports.importDeltaVm = defer(async function importDeltaVm(
|
||||
await asyncMap(await xapi.getField('VM', vmRef, 'VBDs'), ref => ignoreErrors.call(xapi.call('VBD.destroy', ref)))
|
||||
|
||||
// 3. Create VDIs & VBDs.
|
||||
const vbdRecords = deltaVm.vbds
|
||||
const vbdRecords = incrementalVm.vbds
|
||||
const vbds = groupBy(vbdRecords, 'VDI')
|
||||
const newVdis = {}
|
||||
await asyncMap(Object.keys(vdiRecords), async vdiRef => {
|
||||
@@ -309,7 +309,7 @@ exports.importDeltaVm = defer(async function importDeltaVm(
|
||||
}
|
||||
})
|
||||
|
||||
const { streams } = deltaVm
|
||||
const { streams } = incrementalVm
|
||||
|
||||
await Promise.all([
|
||||
// Import VDI contents.
|
||||
@@ -326,7 +326,7 @@ exports.importDeltaVm = defer(async function importDeltaVm(
|
||||
}),
|
||||
|
||||
// Create VIFs.
|
||||
asyncMap(Object.values(deltaVm.vifs), vif => {
|
||||
asyncMap(Object.values(incrementalVm.vifs), vif => {
|
||||
let network = vif.$network$uuid && xapi.getObjectByUuid(vif.$network$uuid, undefined)
|
||||
|
||||
if (network === undefined) {
|
||||
@@ -358,8 +358,8 @@ exports.importDeltaVm = defer(async function importDeltaVm(
|
||||
])
|
||||
|
||||
await Promise.all([
|
||||
deltaVm.vm.ha_always_run && xapi.setField('VM', vmRef, 'ha_always_run', true),
|
||||
xapi.setField('VM', vmRef, 'name_label', deltaVm.vm.name_label),
|
||||
incrementalVm.vm.ha_always_run && xapi.setField('VM', vmRef, 'ha_always_run', true),
|
||||
xapi.setField('VM', vmRef, 'name_label', incrementalVm.vm.name_label),
|
||||
])
|
||||
|
||||
return vmRef
|
||||
21
@xen-orchestra/backups/backupJob.js
Normal file
21
@xen-orchestra/backups/backupJob.js
Normal file
@@ -0,0 +1,21 @@
|
||||
'use strict'
|
||||
|
||||
const { MetadatasBackupJob } = require('./_backupJob/MetadatasBackupJob.js')
|
||||
const { XapiVmBackupJob } = require('./_backupJob/XapiVMBackupJobs.js')
|
||||
|
||||
exports.instantiateBackupJob = function instantiateBackupJob({
|
||||
config,
|
||||
getAdapter,
|
||||
getConnectedRecord,
|
||||
job,
|
||||
schedule,
|
||||
}) {
|
||||
switch (job.type) {
|
||||
case 'backup':
|
||||
return new XapiVmBackupJob({ config, getAdapter, getConnectedRecord, job, schedule })
|
||||
case 'metadataBackup':
|
||||
return new MetadatasBackupJob({ config, getAdapter, getConnectedRecord, job, schedule })
|
||||
default:
|
||||
throw new Error(`No runner for the backup type ${job.type}`)
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,6 @@
|
||||
import Disposable from 'promise-toolbox/Disposable'
|
||||
import fromCallback from 'promise-toolbox/fromCallback'
|
||||
import { asyncMap } from '@xen-orchestra/async-map'
|
||||
import { Backup } from '@xen-orchestra/backups/Backup.js'
|
||||
import { compose } from '@vates/compose'
|
||||
import { createLogger } from '@xen-orchestra/log'
|
||||
import { decorateMethodsWith } from '@vates/decorate-with'
|
||||
@@ -10,6 +9,7 @@ import { defer } from 'golike-defer'
|
||||
import { DurablePartition } from '@xen-orchestra/backups/DurablePartition.js'
|
||||
import { execFile } from 'child_process'
|
||||
import { formatVmBackups } from '@xen-orchestra/backups/formatVmBackups.js'
|
||||
import { instantiateBackupJob } from '@xen-orchestra/backups/backupJob.js'
|
||||
import { ImportVmBackup } from '@xen-orchestra/backups/ImportVmBackup.js'
|
||||
import { JsonRpcError } from 'json-rpc-protocol'
|
||||
import { Readable } from 'stream'
|
||||
@@ -52,7 +52,7 @@ export default class Backups {
|
||||
const config = app.config.get('backups')
|
||||
if (config.disableWorkers) {
|
||||
const { recordToXapi, remotes, xapis, ...rest } = params
|
||||
return new Backup({
|
||||
return instantiateBackupJob({
|
||||
...rest,
|
||||
|
||||
config,
|
||||
|
||||
@@ -3,13 +3,13 @@ import Disposable from 'promise-toolbox/Disposable'
|
||||
import forOwn from 'lodash/forOwn.js'
|
||||
import groupBy from 'lodash/groupBy.js'
|
||||
import merge from 'lodash/merge.js'
|
||||
import { Backup } from '@xen-orchestra/backups/Backup.js'
|
||||
import { createLogger } from '@xen-orchestra/log'
|
||||
import { createPredicate } from 'value-matcher'
|
||||
import { decorateWith } from '@vates/decorate-with'
|
||||
import { formatVmBackups } from '@xen-orchestra/backups/formatVmBackups.js'
|
||||
import { HealthCheckVmBackup } from '@xen-orchestra/backups/HealthCheckVmBackup.js'
|
||||
import { ImportVmBackup } from '@xen-orchestra/backups/ImportVmBackup.js'
|
||||
import { instantiateBackupJob } from '@xen-orchestra/backups/backupJob.js'
|
||||
import { invalidParameters } from 'xo-common/api-errors.js'
|
||||
import { runBackupWorker } from '@xen-orchestra/backups/runBackupWorker.js'
|
||||
import { Task } from '@xen-orchestra/backups/Task.js'
|
||||
@@ -164,7 +164,7 @@ export default class BackupNg {
|
||||
}),
|
||||
},
|
||||
() =>
|
||||
new Backup({
|
||||
instantiateBackupJob({
|
||||
config: backupsConfig,
|
||||
getAdapter: async remoteId =>
|
||||
app.getBackupsRemoteAdapter(await app.getRemoteWithCredentials(remoteId)),
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
import asyncMapSettled from '@xen-orchestra/async-map/legacy.js'
|
||||
import cloneDeep from 'lodash/cloneDeep.js'
|
||||
import Disposable from 'promise-toolbox/Disposable'
|
||||
import { Backup } from '@xen-orchestra/backups/Backup.js'
|
||||
import { createLogger } from '@xen-orchestra/log'
|
||||
import { instantiateBackupJob } from '@xen-orchestra/backups/backupJob.js'
|
||||
import { parseMetadataBackupId } from '@xen-orchestra/backups/parseMetadataBackupId.js'
|
||||
import { RestoreMetadataBackup } from '@xen-orchestra/backups/RestoreMetadataBackup.js'
|
||||
import { Task } from '@xen-orchestra/backups/Task.js'
|
||||
@@ -129,7 +129,7 @@ export default class metadataBackup {
|
||||
}),
|
||||
},
|
||||
() =>
|
||||
new Backup({
|
||||
instantiateBackupJob({
|
||||
config: this._app.config.get('backups'),
|
||||
getAdapter: async remoteId => app.getBackupsRemoteAdapter(await app.getRemoteWithCredentials(remoteId)),
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { Backup } from '@xen-orchestra/backups/Backup.js'
|
||||
import { decorateWith } from '@vates/decorate-with'
|
||||
import { defer as deferrable } from 'golike-defer'
|
||||
import { fromEvent } from 'promise-toolbox'
|
||||
import { instantiateBackupJob } from '@xen-orchestra/backups/backupJob.js'
|
||||
import { Task } from '@xen-orchestra/mixins/Tasks.mjs'
|
||||
import { v4 as generateUuid } from 'uuid'
|
||||
import { VDI_FORMAT_VHD } from '@xen-orchestra/xapi'
|
||||
@@ -41,7 +41,7 @@ export default class MigrateVm {
|
||||
const schedule = { id: 'one-time' }
|
||||
|
||||
// for now we only support this from the main OA, no proxy
|
||||
return new Backup({
|
||||
return instantiateBackupJob({
|
||||
config,
|
||||
job,
|
||||
schedule,
|
||||
|
||||
Reference in New Issue
Block a user