Merge pull request #207 from vatesfr/abhamonr-avoid-metadata-imp-exp-delta-backups

Avoid metadata import/export in delta backups. (fix vatesfr/xo-web#651)
This commit is contained in:
Olivier Lambert 2016-01-28 11:35:04 +01:00
commit e193b45562
4 changed files with 217 additions and 166 deletions

View File

@ -110,6 +110,7 @@
"proxy-http-request": "0.1.0",
"redis": "^2.0.1",
"schema-inspector": "^1.5.1",
"semver": "^5.1.0",
"serve-static": "^1.9.2",
"stack-chain": "^1.3.3",
"trace": "^2.0.1",

View File

@ -5,6 +5,8 @@ import {
parse
} from 'xo-remote-parser'
import { noop } from '../utils'
export default class RemoteHandlerAbstract {
constructor (remote) {
this._remote = parse({...remote})
@ -76,11 +78,15 @@ export default class RemoteHandlerAbstract {
async createReadStream (file, options) {
const stream = await this._createReadStream(file)
if (stream.length === undefined) {
try {
stream.length = await this.getSize(file)
} catch (_) {}
}
await Promise.all([
stream.length === undefined
? this.getSize(file).then(value => stream.length = value).catch(noop)
: false,
// FIXME: the readable event may have already been emitted.
eventToPromise(stream, 'readable')
])
return stream
}

View File

@ -19,6 +19,8 @@ import {
debounce,
deferrable
} from './decorators'
import { satisfies as versionSatisfies } from 'semver'
import {
bufferToStream,
camelToSnakeCase,
@ -1322,6 +1324,7 @@ export default class Xapi extends XapiBase {
// TODO: make non-enumerable?
streams: await streams::pAll(),
version: '1.0.0',
vbds,
vdis,
vifs,
@ -1341,8 +1344,15 @@ export default class Xapi extends XapiBase {
async importDeltaVm ($onFailure, delta, {
deleteBase = false,
name_label = delta.vm.name_label,
srId = this.pool.default_SR
srId = this.pool.default_SR,
disableStartAfterImport = true
} = {}) {
const version = { delta }
if (!versionSatisfies(version, '^1')) {
throw new Error(`Unsupported delta backup version: ${version}`)
}
const remoteBaseVmUuid = delta.vm.other_config[TAG_BASE_DELTA]
let baseVm
if (remoteBaseVmUuid) {
@ -1447,7 +1457,11 @@ export default class Xapi extends XapiBase {
// Import VDI contents.
Promise.all(mapToArray(
newVdis,
(vdi, id) => this._importVdiContent(vdi, streams[`${id}.vhd`], VDI_FORMAT_VHD)
async (vdi, id) => {
for (const stream of ensureArray(streams[`${id}.vhd`])) {
await this._importVdiContent(vdi, stream, VDI_FORMAT_VHD)
}
}
)),
// Wait for VDI export tasks (if any) termination.
@ -1472,8 +1486,11 @@ export default class Xapi extends XapiBase {
this._setObjectProperties(vm, {
name_label
}),
// FIXME: move
this._updateObjectMapProperty(vm, 'blocked_operations', {
start: 'Do not start this VM, clone it if you want to use it.' // FIXME: move
start: disableStartAfterImport
? 'Do not start this VM, clone it if you want to use it.'
: null
})
])

View File

@ -10,6 +10,7 @@ import {
basename,
dirname
} from 'path'
import { satisfies as versionSatisfies } from 'semver'
import xapiObjectToXo from '../xapi-object-to-xo'
import {
@ -28,6 +29,9 @@ import {
// ===================================================================
const DELTA_BACKUP_EXT = '.json'
const DELTA_BACKUP_EXT_LENGTH = DELTA_BACKUP_EXT.length
// Test if a file is a vdi backup. (full or delta)
const isVdiBackup = name => /^\d+T\d+Z_(?:full|delta)\.vhd$/.test(name)
@ -40,6 +44,10 @@ const getVdiTimestamp = name => {
return arr[1] || undefined
}
const getDeltaBackupNameWithoutExt = name => name.slice(0, -DELTA_BACKUP_EXT_LENGTH)
const isDeltaBackup = name => endsWith(name, DELTA_BACKUP_EXT)
// ===================================================================
export default class {
@ -50,52 +58,33 @@ export default class {
async listRemoteBackups (remoteId) {
const handler = await this._xo.getRemoteHandler(remoteId)
// List backups. (Except delta backups)
const xvaFilter = file => endsWith(file, '.xva')
// List backups. (No delta)
const backupFilter = file => endsWith(file, '.xva')
const files = await handler.list()
const backups = filter(files, xvaFilter)
const backups = filter(files, backupFilter)
// List delta backups.
const deltaDirs = filter(files, file => startsWith(file, 'vm_delta_'))
for (const deltaDir of deltaDirs) {
const files = await handler.list(deltaDir)
const deltaBackups = filter(files, xvaFilter)
const deltaBackups = filter(files, isDeltaBackup)
backups.push(...mapToArray(
deltaBackups,
deltaBackup => `${deltaDir}/${deltaBackup}`
deltaBackup => {
return `${deltaDir}/${getDeltaBackupNameWithoutExt(deltaBackup)}`
}
))
}
return backups
}
// TODO: move into utils and rename! NO, until we may pass a handler instead of a remote...?
async _openAndwaitReadableFile (handler, file, errorMessage) {
let stream
try {
stream = await handler.createReadStream(file)
await eventToPromise(stream, 'readable')
} catch (error) {
if (error.code === 'ENOENT') {
throw new Error(errorMessage)
}
throw error
}
return stream
}
async importVmBackup (remoteId, file, sr) {
const handler = await this._xo.getRemoteHandler(remoteId)
const stream = await this._openAndwaitReadableFile(
handler,
file,
'VM to import not found in this remote'
)
const stream = await handler.createReadStream(file)
const xapi = this._xo.getXapi(sr)
await xapi.importVm(stream, { srId: sr._xapiId })
@ -169,6 +158,7 @@ export default class {
if (n <= 0) {
return
}
const getPath = (file, dir) => dir ? `${dir}/${file}` : file
await Promise.all(
@ -178,8 +168,87 @@ export default class {
// -----------------------------------------------------------------
async _legacyImportDeltaVdiBackup (xapi, { vmId, handler, dir, vdiInfo }) {
const vdi = await xapi.createVdi(vdiInfo.virtual_size, vdiInfo)
const vdiId = vdi.$id
// dir = vm_delta_xxx
// xoPath = vdi_xxx/timestamp_(full|delta).vhd
// vdiDir = vdi_xxx
const { xoPath } = vdiInfo
const filePath = `${dir}/${xoPath}`
const vdiDir = dirname(xoPath)
const backups = await this._listDeltaVdiDependencies(handler, filePath)
for (const backup of backups) {
const stream = await handler.createReadStream(`${dir}/${vdiDir}/${backup}`)
await xapi.importVdiContent(vdiId, stream, {
format: VDI_FORMAT_VHD
})
}
return vdiId
}
async _legacyImportDeltaVmBackup (xapi, { remoteId, handler, filePath, info, sr }) {
// Import vm metadata.
const vm = await (async () => {
const stream = await handler.createReadStream(`${filePath}.xva`)
return await xapi.importVm(stream, { onlyMetadata: true })
})()
const vmName = vm.name_label
const dir = dirname(filePath)
// Disable start and change the VM name label during import.
await Promise.all([
xapi.addForbiddenOperationToVm(vm.$id, 'start', 'Delta backup import...'),
xapi._setObjectProperties(vm, { name_label: `[Importing...] ${vmName}` })
])
// Destroy vbds if necessary. Why ?
// Because XenServer creates Vbds linked to the vdis of the backup vm if it exists.
await xapi.destroyVbdsFromVm(vm.uuid)
// Import VDIs.
const vdiIds = {}
await Promise.all(
mapToArray(
info.vdis,
async vdiInfo => {
vdiInfo.sr = sr._xapiId
const vdiId = await this._legacyImportDeltaVdiBackup(xapi, { vmId: vm.$id, handler, dir, vdiInfo })
vdiIds[vdiInfo.uuid] = vdiId
}
)
)
await Promise.all(
mapToArray(
info.vbds,
vbdInfo => {
xapi.attachVdiToVm(vdiIds[vbdInfo.xoVdi], vm.$id, vbdInfo)
}
)
)
// Import done, reenable start and set real vm name.
await Promise.all([
xapi.removeForbiddenOperationFromVm(vm.$id, 'start'),
xapi._setObjectProperties(vm, { name_label: vmName })
])
return vm
}
// -----------------------------------------------------------------
async _listVdiBackups (handler, dir) {
let files
try {
files = await handler.list(dir)
} catch (error) {
@ -189,6 +258,7 @@ export default class {
throw error
}
}
const backups = sortBy(filter(files, fileName => isVdiBackup(fileName)))
let i
@ -199,11 +269,9 @@ export default class {
return backups.slice(i)
}
async _deltaVdiBackup ({vdi, handler, dir, depth}) {
const xapi = this._xo.getXapi(vdi)
async _deltaVdiBackup (xapi, {vdi, handler, dir, depth}) {
const backupDirectory = `vdi_${vdi.uuid}`
vdi = xapi.getObject(vdi._xapiId)
dir = `${dir}/${backupDirectory}`
const backups = await this._listVdiBackups(handler, dir)
@ -264,6 +332,7 @@ export default class {
if (handler.type === 'smb') {
throw new Error('VDI merging is not available through SMB')
}
const backups = await this._listVdiBackups(handler, dir)
let i = backups.length - depth
@ -279,9 +348,11 @@ export default class {
const backup = `${dir}/${backups[i]}`
const parent = `${dir}/${backups[i - 1]}`
const path = handler._remote.path // FIXME, private attribute !
try {
await execa(vhdUtil, ['modify', '-n', `${handler.path}/${backup}`, '-p', `${handler.path}/${parent}`]) // FIXME not ok at least with smb remotes
await execa(vhdUtil, ['coalesce', '-n', `${handler.path}/${backup}`]) // FIXME not ok at least with smb remotes
await execa(vhdUtil, ['modify', '-n', `${path}/${backup}`, '-p', `${path}/${parent}`]) // FIXME not ok at least with smb remotes
await execa(vhdUtil, ['coalesce', '-n', `${path}/${backup}`]) // FIXME not ok at least with smb remotes
} catch (e) {
console.error('Unable to use vhd-util.', e)
throw e
@ -304,24 +375,7 @@ export default class {
await handler.rename(`${dir}/${backups[0]}`, `${dir}/${newFull}`)
}
async _importVdiBackupContent (xapi, handler, file, vdiId) {
const stream = await this._openAndwaitReadableFile(
handler,
file,
'VDI to import not found in this remote'
)
await xapi.importVdiContent(vdiId, stream, {
format: VDI_FORMAT_VHD
})
}
async importDeltaVdiBackup ({vdi, remoteId, filePath}) {
const handler = await this._xo.getRemoteHandler(remoteId)
return this._importDeltaVdiBackup(vdi, handler, filePath)
}
async _importDeltaVdiBackup (vdi, handler, filePath) {
async _listDeltaVdiDependencies (handler, filePath) {
const dir = dirname(filePath)
const filename = basename(filePath)
const backups = await this._listVdiBackups(handler, dir)
@ -344,19 +398,14 @@ export default class {
throw new Error(`Unable to found full vdi backup of: ${filePath}`)
}
// Restore...
const xapi = this._xo.getXapi(vdi)
for (; j <= i; j++) {
await this._importVdiBackupContent(xapi, handler, `${dir}/${backups[j]}`, vdi._xapiId)
}
return backups.slice(j, i + 1)
}
// -----------------------------------------------------------------
async _listDeltaVmBackups (handler, dir) {
const files = await handler.list(dir)
return await sortBy(filter(files, (fileName) => /^\d+T\d+Z_.*\.(?:xva|json)$/.test(fileName)))
return await sortBy(filter(files, isDeltaBackup))
}
async _failedRollingDeltaVmBackup (xapi, handler, dir, fulFilledVdiBackups) {
@ -388,47 +437,44 @@ export default class {
const dir = `vm_delta_${tag}_${vm.uuid}`
const info = {
vbds: [],
vdis: {}
version: '1.0.0',
vbds: {},
vdis: {},
vifs: {}
}
const promises = []
const xapi = this._xo.getXapi(vm)
for (const vbdId of vm.$VBDs) {
const vbd = this._xo.getObject(vbdId)
vm = xapi.getObject(vm._xapiId)
if (!vbd.VDI) {
for (const vbd of vm.$VBDs) {
const vdiId = vbd.VDI
if (!vdiId || vbd.type !== 'Disk') {
continue
}
if (vbd.is_cd_drive) {
continue
}
const vdiXo = this._xo.getObject(vbd.VDI)
const vdi = xapi.getObject(vdiXo._xapiId)
const vdiUUID = vdi.uuid
info.vbds.push({
...xapi.getObject(vbd._xapiId),
xoVdi: vdiUUID
})
info.vbds[vbd.$ref] = vbd
// Warning: There may be the same VDI id for a VBD set.
if (!info.vdis[vdiUUID]) {
info.vdis[vdiUUID] = { ...vdi }
promises.push(
this._deltaVdiBackup({handler, vdi: vdiXo, dir, depth}).then(
vdiBackup => {
const { backupDirectory, vdiFilename } = vdiBackup
info.vdis[vdiUUID].xoPath = `${backupDirectory}/${vdiFilename}`
return vdiBackup
}
)
)
if (info.vdis[vdiId]) {
continue
}
const vdi = vbd.$VDI
info.vdis[vdiId] = { ...vdi }
promises.push(
this._deltaVdiBackup(xapi, {vdi, handler, dir, depth}).then(
vdiBackup => {
const { backupDirectory, vdiFilename } = vdiBackup
info.vdis[vdiId].xoPath = `${backupDirectory}/${vdiFilename}`
return vdiBackup
}
)
)
}
const vdiBackups = await pSettle(promises)
@ -452,21 +498,20 @@ export default class {
throw new Error('Rolling delta vm backup failed.')
}
const backups = await this._listDeltaVmBackups(handler, dir)
const date = safeDateFormat(new Date())
const backupFormat = `${date}_${vm.name_label}`
const xvaPath = `${dir}/${backupFormat}.xva`
const infoPath = `${dir}/${backupFormat}.json`
const infoPath = `${dir}/${backupFormat}${DELTA_BACKUP_EXT}`
try {
await Promise.all([
this._backupVm(vm, handler, xvaPath, {onlyMetadata: true}),
handler.outputFile(infoPath, JSON.stringify(info), {flag: 'wx'})
])
for (const vif of vm.$VIFs) {
info.vifs[vif.$ref] = vif
}
info.vm = vm
await handler.outputFile(infoPath, JSON.stringify(info, null, 2), {flag: 'wx'})
} catch (e) {
await Promise.all([
handler.unlink(xvaPath).catch(noop),
handler.unlink(infoPath).catch(noop),
this._failedRollingDeltaVmBackup(xapi, handler, dir, fulFilledVdiBackups)
])
@ -482,8 +527,22 @@ export default class {
})
)
// Remove x2 files : json AND xva files.
await this._removeOldBackups(backups, handler, dir, backups.length - (depth - 1) * 2)
// Remove old vm backups.
const backups = await this._listDeltaVmBackups(handler, dir)
const nOldBackups = backups.length - depth
if (nOldBackups > 0) {
await Promise.all(
mapToArray(backups.slice(0, nOldBackups), async backup => {
// Remove json file.
await handler.unlink(`${dir}/${backup}`)
// Remove xva file.
// Version 0.0.0 (Legacy) Delta Backup.
handler.unlink(`${dir}/${getDeltaBackupNameWithoutExt(backup)}.xva`).catch(noop)
})
)
}
// Remove old vdi bases.
Promise.all(
@ -500,76 +559,44 @@ export default class {
return `${dir}/${backupFormat}`
}
async _importVmMetadata (xapi, handler, file) {
const stream = await this._openAndwaitReadableFile(
handler,
file,
'VM metadata to import not found in this remote'
)
return await xapi.importVm(stream, { onlyMetadata: true })
}
async _importDeltaVdiBackupFromVm (xapi, vmId, handler, directory, vdiInfo) {
const vdi = await xapi.createVdi(vdiInfo.virtual_size, vdiInfo)
const vdiId = vdi.$id
await this._importDeltaVdiBackup(
this._xo.getObject(vdiId),
handler,
`${directory}/${vdiInfo.xoPath}`
)
return vdiId
}
async importDeltaVmBackup ({sr, remoteId, filePath}) {
const handler = await this._xo.getRemoteHandler(remoteId)
const xapi = this._xo.getXapi(sr)
// Import vm metadata.
const vm = await this._importVmMetadata(xapi, handler, `${filePath}.xva`)
const vmName = vm.name_label
const delta = JSON.parse(await handler.readFile(`${filePath}${DELTA_BACKUP_EXT}`))
let vm
const version = { delta }
// Disable start and change the VM name label during import.
await Promise.all([
xapi.addForbiddenOperationToVm(vm.$id, 'start', 'Delta backup import...'),
xapi._setObjectProperties(vm, { name_label: `[Importing...] ${vmName}` })
])
if (!version) {
// Legacy import. (Version 0.0.0)
vm = await this._legacyImportDeltaVmBackup(xapi, {
remoteId, handler, filePath, info: delta, sr
})
} else if (versionSatisfies(delta.version, '^1')) {
const basePath = dirname(filePath)
const streams = delta.streams = {}
// Destroy vbds if necessary. Why ?
// Because XenServer creates Vbds linked to the vdis of the backup vm if it exists.
await xapi.destroyVbdsFromVm(vm.uuid)
await Promise.all(
mapToArray(
delta.vdis,
async (vdi, id) => {
const vdisFolder = dirname(vdi.xoPath)
const backups = await this._listDeltaVdiDependencies(handler, `${basePath}/${vdi.xoPath}`)
const info = JSON.parse(await handler.readFile(`${filePath}.json`))
// Import VDIs.
const vdiIds = {}
await Promise.all(
mapToArray(
info.vdis,
async vdiInfo => {
vdiInfo.sr = sr._xapiId
const vdiId = await this._importDeltaVdiBackupFromVm(xapi, vm.$id, remoteId, dirname(filePath), vdiInfo)
vdiIds[vdiInfo.uuid] = vdiId
}
streams[`${id}.vhd`] = await Promise.all(
mapToArray(backups, backup => handler.createReadStream(`${basePath}/${vdisFolder}/${backup}`))
)
}
)
)
)
await Promise.all(
mapToArray(
info.vbds,
vbdInfo => {
xapi.attachVdiToVm(vdiIds[vbdInfo.xoVdi], vm.$id, vbdInfo)
}
)
)
// Import done, reenable start and set real vm name.
await Promise.all([
xapi.removeForbiddenOperationFromVm(vm.$id, 'start'),
xapi._setObjectProperties(vm, { name_label: vmName })
])
vm = await xapi.importDeltaVm(delta, {
srId: sr._xapiId,
disableStartAfterImport: false
})
} else {
throw new Error(`Unsupported delta backup version: ${version}`)
}
return xapiObjectToXo(vm).id
}