Delta backup works with quiesce. (fix vatesfr/xo-web#812)

This commit is contained in:
wescoeur
2016-03-16 14:37:44 +01:00
parent a0806d98a1
commit 322e1a75b9
2 changed files with 182 additions and 185 deletions

View File

@@ -1286,7 +1286,10 @@ export default class Xapi extends XapiBase {
// Create a snapshot of the VM and returns a delta export object.
@deferrable.onFailure
async exportDeltaVm ($onFailure, vmId, baseVmId = undefined, {
snapshotNameLabel = undefined
snapshotNameLabel = undefined,
// Contains a vdi.$id set of vmId.
fullVdisRequired = [],
disableBaseTags = false
} = {}) {
const vm = await this.snapshotVm(vmId)
$onFailure(() => this._deleteVm(vm, true))
@@ -1300,7 +1303,10 @@ export default class Xapi extends XapiBase {
const baseVdis = {}
baseVm && forEach(baseVm.$VBDs, vbd => {
baseVdis[vbd.VDI] = vbd.$VDI
const vdi = vbd.$VDI
if (!find(fullVdisRequired, id => vdi.$snapshot_of.$id === id)) {
baseVdis[vbd.VDI] = vdi
}
})
const streams = {}
@@ -1333,7 +1339,7 @@ export default class Xapi extends XapiBase {
}
})
vdis[vdiId] = baseVdi
vdis[vdiId] = baseVdi && !disableBaseTags
? {
...vdi,
other_config: {
@@ -1359,7 +1365,7 @@ export default class Xapi extends XapiBase {
vbds,
vdis,
vifs,
vm: baseVm
vm: baseVm && !disableBaseTags
? {
...vm,
other_config: {

View File

@@ -2,6 +2,7 @@ import endsWith from 'lodash.endswith'
import escapeStringRegexp from 'escape-string-regexp'
import eventToPromise from 'event-to-promise'
import filter from 'lodash.filter'
import find from 'lodash.find'
import findIndex from 'lodash.findindex'
import sortBy from 'lodash.sortby'
import startsWith from 'lodash.startswith'
@@ -36,8 +37,9 @@ const DELTA_BACKUP_EXT_LENGTH = DELTA_BACKUP_EXT.length
// Test if a file is a vdi backup. (full or delta)
const isVdiBackup = name => /^\d+T\d+Z_(?:full|delta)\.vhd$/.test(name)
// Test if a file is a delta vdi backup.
// Test if a file is a delta/full vdi backup.
const isDeltaVdiBackup = name => /^\d+T\d+Z_delta\.vhd$/.test(name)
const isFullVdiBackup = name => /^\d+T\d+Z_full\.vhd$/.test(name)
// Get the timestamp of a vdi backup. (full or delta)
const getVdiTimestamp = name => {
@@ -288,74 +290,6 @@ export default class {
return backups.slice(i)
}
async _deltaVdiBackup (xapi, {vdi, handler, dir, depth}) {
const backupDirectory = `vdi_${vdi.uuid}`
dir = `${dir}/${backupDirectory}`
const backups = await this._listVdiBackups(handler, dir)
// Make snapshot.
const date = safeDateFormat(new Date())
const currentSnapshot = await xapi.snapshotVdi(vdi.$id, 'XO_DELTA_BASE_VDI_SNAPSHOT')
const bases = sortBy(
filter(vdi.$snapshots, { name_label: 'XO_DELTA_BASE_VDI_SNAPSHOT' }),
base => base.snapshot_time
)
const base = bases.pop()
// Remove old bases if exists.
Promise.all(mapToArray(bases, base => xapi.deleteVdi(base.$id)))::pCatch(noop)
// It is strange to have no base but a full backup !
// A full is necessary if it not exists backups or
// the base is missing.
const isFull = (!backups.length || !base)
// Export full or delta backup.
const vdiFilename = `${date}_${isFull ? 'full' : 'delta'}.vhd`
const backupFullPath = `${dir}/${vdiFilename}`
try {
const sourceStream = await xapi.exportVdi(currentSnapshot.$id, {
baseId: isFull ? undefined : base.$id,
format: VDI_FORMAT_VHD
})
const targetStream = await handler.createOutputStream(backupFullPath, {
// FIXME: Checksum is not computed for full vdi backups.
// The problem is in the merge case, a delta merged in a full vdi
// backup forces us to browse the resulting file =>
// Significant transfer time on the network !
checksum: !isFull,
flags: 'wx'
})
sourceStream.on('error', error => targetStream.emit('error', error))
await Promise.all([
eventToPromise(sourceStream.pipe(targetStream), 'finish'),
sourceStream.task
])
} catch (error) {
// Remove new backup. (corrupt) and delete new vdi base.
xapi.deleteVdi(currentSnapshot.$id)::pCatch(noop)
await handler.unlink(backupFullPath, { checksum: true })::pCatch(noop)
throw error
}
// Returns relative path. (subdir and vdi filename), old/new base.
return {
backupDirectory,
vdiFilename,
oldBaseId: base && base.$id, // Base can be undefined. (full backup)
newBaseId: currentSnapshot.$id
}
}
async _mergeDeltaVdiBackups ({handler, dir, depth}) {
const backups = await this._listVdiBackups(handler, dir)
let i = backups.length - depth
@@ -432,125 +366,54 @@ export default class {
async _listDeltaVmBackups (handler, dir) {
const files = await handler.list(dir)
return /* await */ sortBy(filter(files, isDeltaBackup))
return sortBy(filter(files, isDeltaBackup))
}
async _failedRollingDeltaVmBackup (xapi, handler, dir, fulFilledVdiBackups) {
await Promise.all(
mapToArray(fulFilledVdiBackups, async vdiBackup => {
const { newBaseId, backupDirectory, vdiFilename } = vdiBackup.value()
await xapi.deleteVdi(newBaseId)
await handler.unlink(`${dir}/${backupDirectory}/${vdiFilename}`, { checksum: true })::pCatch(noop)
})
)
}
async rollingDeltaVmBackup ({vm, remoteId, tag, depth}) {
const remote = await this._xo.getRemote(remoteId)
if (!remote) {
throw new Error(`No such Remote ${remoteId}`)
}
if (!remote.enabled) {
throw new Error(`Remote ${remoteId} is disabled`)
}
const handler = await this._xo.getRemoteHandler(remote)
const dir = `vm_delta_${tag}_${vm.uuid}`
const info = {
version: '1.0.0',
vbds: {},
vdis: {},
vifs: {}
}
const promises = []
const xapi = this._xo.getXapi(vm)
vm = xapi.getObject(vm._xapiId)
for (const vbd of vm.$VBDs) {
const vdiId = vbd.VDI
if (!vdiId || vbd.type !== 'Disk') {
continue
}
info.vbds[vbd.$ref] = vbd
// Warning: There may be the same VDI id for a VBD set.
if (info.vdis[vdiId]) {
continue
}
const vdi = vbd.$VDI
info.vdis[vdiId] = { ...vdi }
promises.push(
this._deltaVdiBackup(xapi, {vdi, handler, dir, depth}).then(
vdiBackup => {
const { backupDirectory, vdiFilename } = vdiBackup
info.vdis[vdiId].xoPath = `${backupDirectory}/${vdiFilename}`
return vdiBackup
}
)
)
}
const vdiBackups = await pSettle(promises)
const fulFilledVdiBackups = []
let fail = false
// One or many vdi backups have failed.
for (const vdiBackup of vdiBackups) {
if (vdiBackup.isFulfilled()) {
fulFilledVdiBackups.push(vdiBackup)
} else {
console.error(`Rejected backup: ${vdiBackup.reason()}`)
fail = true
}
}
if (fail) {
console.error(`Remove successful backups in ${dir}`, fulFilledVdiBackups)
await this._failedRollingDeltaVmBackup(xapi, handler, dir, fulFilledVdiBackups)
throw new Error('Rolling delta vm backup failed.')
}
async _saveDeltaVdiBackup (xapi, { vdiParent, isFull, handler, stream, dir, depth }) {
const backupDirectory = `vdi_${vdiParent.uuid}`
dir = `${dir}/${backupDirectory}`
const date = safeDateFormat(new Date())
const backupFormat = `${date}_${vm.name_label}`
const infoPath = `${dir}/${backupFormat}${DELTA_BACKUP_EXT}`
// For old versions: remove old bases if exists.
const bases = sortBy(
filter(vdiParent.$snapshots, { name_label: 'XO_DELTA_BASE_VDI_SNAPSHOT' }),
base => base.snapshot_time
)
forEach(bases, base => { xapi.deleteVdi(base.$id)::pCatch(noop) })
// Export full or delta backup.
const vdiFilename = `${date}_${isFull ? 'full' : 'delta'}.vhd`
const backupFullPath = `${dir}/${vdiFilename}`
try {
for (const vif of vm.$VIFs) {
info.vifs[vif.$ref] = vif
}
const targetStream = await handler.createOutputStream(backupFullPath, {
// FIXME: Checksum is not computed for full vdi backups.
// The problem is in the merge case, a delta merged in a full vdi
// backup forces us to browse the resulting file =>
// Significant transfer time on the network !
checksum: !isFull,
flags: 'wx'
})
info.vm = vm
stream.on('error', error => targetStream.emit('error', error))
await handler.outputFile(infoPath, JSON.stringify(info, null, 2), {flag: 'wx'})
} catch (e) {
await Promise.all([
handler.unlink(infoPath)::pCatch(noop),
this._failedRollingDeltaVmBackup(xapi, handler, dir, fulFilledVdiBackups)
eventToPromise(stream.pipe(targetStream), 'finish'),
stream.task
])
} catch (error) {
// Remove new backup. (corrupt).
await handler.unlink(backupFullPath, { checksum: true })::pCatch(noop)
throw e
throw error
}
// Here we have a completed backup. We can merge old vdis.
await Promise.all(
mapToArray(vdiBackups, vdiBackup => {
const { backupDirectory } = vdiBackup.value()
return this._mergeDeltaVdiBackups({handler, dir: `${dir}/${backupDirectory}`, depth})
})
)
// Returns relative path.
return `${backupDirectory}/${vdiFilename}`
}
// Remove old vm backups.
async _removeOldDeltaVmBackups (xapi, { handler, dir, depth }) {
const backups = await this._listDeltaVmBackups(handler, dir)
const nOldBackups = backups.length - depth
@@ -566,17 +429,145 @@ export default class {
})
)
}
}
// Remove old vdi bases.
Promise.all(
mapToArray(vdiBackups, async vdiBackup => {
const { oldBaseId } = vdiBackup.value()
@deferrable.onFailure
async rollingDeltaVmBackup ($onFailure, {vm, remoteId, tag, depth}) {
const remote = await this._xo.getRemote(remoteId)
if (oldBaseId) {
await xapi.deleteVdi(oldBaseId)
if (!remote) {
throw new Error(`No such Remote ${remoteId}`)
}
if (!remote.enabled) {
throw new Error(`Remote ${remoteId} is disabled`)
}
const handler = await this._xo.getRemoteHandler(remote)
const xapi = this._xo.getXapi(vm)
vm = xapi.getObject(vm._xapiId)
// Get most recent base.
const bases = sortBy(
filter(vm.$snapshots, { name_label: `XO_DELTA_BASE_VM_SNAPSHOT_${tag}` }),
base => base.snapshot_time
)
const baseVm = bases.pop()
forEach(bases, base => { xapi.deleteVm(base.$id, true)::pCatch(noop) })
// Check backup dirs.
const dir = `vm_delta_${tag}_${vm.uuid}`
const fullVdisRequired = []
await Promise.all(
mapToArray(vm.$VBDs, async vbd => {
if (!vbd.VDI || vbd.type !== 'Disk') {
return
}
const vdi = vbd.$VDI
const backups = await this._listVdiBackups(handler, `${dir}/vdi_${vdi.uuid}`)
// Force full if missing full.
if (!find(backups, isFullVdiBackup)) {
fullVdisRequired.push(vdi.$id)
}
})
)::pCatch(noop)
)
// Export...
const delta = await xapi.exportDeltaVm(vm.$id, baseVm && baseVm.$id, {
snapshotNameLabel: `XO_DELTA_BASE_VM_SNAPSHOT_${tag}`,
fullVdisRequired,
disableBaseTags: true
})
$onFailure(async () => {
await Promise.all(mapToArray(
delta.streams,
stream => stream.cancel()
))
await xapi.deleteVm(delta.vm.$id, true)
})
// Save vdis.
const vdiBackups = await pSettle(
mapToArray(delta.vdis, async (vdi, key) => {
const vdiParent = xapi.getObject(vdi.snapshot_of)
return this._saveDeltaVdiBackup(xapi, {
vdiParent,
isFull: !baseVm || find(fullVdisRequired, id => vdiParent.$id === id),
handler,
stream: delta.streams[`${key}.vhd`],
dir,
depth
})
.then(path => {
delta.vdis[key] = {
...delta.vdis[key],
xoPath: path
}
return path
})
})
)
const fulFilledVdiBackups = []
let success = true
// One or many vdi backups have failed.
for (const vdiBackup of vdiBackups) {
if (vdiBackup.isFulfilled()) {
fulFilledVdiBackups.push(vdiBackup)
} else {
console.error(`Rejected backup: ${vdiBackup.reason()}`)
success = false
}
}
$onFailure(async () => {
await Promise.all(
mapToArray(fulFilledVdiBackups, vdiBackup => {
return handler.unlink(`${dir}/${vdiBackup.value()}`, { checksum: true })::pCatch(noop)
})
)
})
if (!success) {
throw new Error('Rolling delta vm backup failed.')
}
const date = safeDateFormat(new Date())
const backupFormat = `${date}_${vm.name_label}`
const infoPath = `${dir}/${backupFormat}${DELTA_BACKUP_EXT}`
$onFailure(() => handler.unlink(infoPath)::pCatch(noop))
const { streams,
...infos
} = delta
// Write Metadata.
await handler.outputFile(infoPath, JSON.stringify(infos, null, 2), {flag: 'wx'})
// Here we have a completed backup. We can merge old vdis.
await Promise.all(
mapToArray(vdiBackups, vdiBackup => {
const backupName = vdiBackup.value()
const backupDirectory = backupName.slice(0, backupName.lastIndexOf('/'))
return this._mergeDeltaVdiBackups({ handler, dir: `${dir}/${backupDirectory}`, depth })
})
)
// Delete old backups.
await this._removeOldDeltaVmBackups(xapi, { vm, handler, dir, depth })
if (baseVm) {
xapi.deleteVm(baseVm.$id, true)::pCatch(noop)
}
// Returns relative path.
return `${dir}/${backupFormat}`