fix(xo-server): CR with lazy streams (#2675)

This commit is contained in:
Julien Fontanet 2018-02-23 17:50:17 +01:00 committed by GitHub
parent 9a8f9dd1d7
commit e0d34b1747
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 21 additions and 22 deletions

View File

@ -65,7 +65,11 @@ export async function copyVm ({ vm, sr }) {
console.log('export delta VM...') console.log('export delta VM...')
const input = await srcXapi.exportDeltaVm(vm) const input = await srcXapi.exportDeltaVm(vm)
console.log('import delta VM...') console.log('import delta VM...')
await tgtXapi.deleteVm(await tgtXapi.importDeltaVm(input, { srId: sr })) const { transferSize, vm: copyVm } = await tgtXapi.importDeltaVm(input, {
srId: sr,
})
console.log('transfered size:', transferSize)
await tgtXapi.deleteVm(copyVm)
} }
} }

View File

@ -777,7 +777,7 @@ export function importDeltaBackup ({ sr, remote, filePath, mapVdisSrs }) {
remoteId: remote, remoteId: remote,
filePath, filePath,
mapVdisSrs: mapVdisSrsXapi, mapVdisSrs: mapVdisSrsXapi,
}) }).then(_ => _.vm)
} }
importDeltaBackup.params = { importDeltaBackup.params = {

View File

@ -943,8 +943,6 @@ export default class Xapi extends XapiBase {
baseVdis[vbd.VDI] = vbd.$VDI baseVdis[vbd.VDI] = vbd.$VDI
}) })
const { streams } = delta
// 1. Create the VMs. // 1. Create the VMs.
const vm = await this._getOrWaitObject( const vm = await this._getOrWaitObject(
await this._createVmRecord({ await this._createVmRecord({
@ -1012,6 +1010,9 @@ export default class Xapi extends XapiBase {
defaultNetwork = networksOnPoolMasterByDevice[pif.device] = pif.$network defaultNetwork = networksOnPoolMasterByDevice[pif.device] = pif.$network
}) })
const { streams } = delta
let transferSize = 0
await Promise.all([ await Promise.all([
// Create VBDs. // Create VBDs.
asyncMap(delta.vbds, vbd => asyncMap(delta.vbds, vbd =>
@ -1028,7 +1029,13 @@ export default class Xapi extends XapiBase {
if (typeof stream === 'function') { if (typeof stream === 'function') {
stream = await stream() stream = await stream()
} }
await this._importVdiContent(vdi, stream, VDI_FORMAT_VHD) const sizeStream = stream
.pipe(createSizeStream())
.once('finish', () => {
transferSize += sizeStream.size
})
stream.task = sizeStream.task
await this._importVdiContent(vdi, sizeStream, VDI_FORMAT_VHD)
} }
}), }),
@ -1064,7 +1071,7 @@ export default class Xapi extends XapiBase {
}), }),
]) ])
return vm return { transferSize, vm }
} }
async _migrateVmWithStorageMotion ( async _migrateVmWithStorageMotion (

View File

@ -429,8 +429,7 @@ export default class {
})(srcVm.other_config[TAG_LAST_BASE_DELTA]) })(srcVm.other_config[TAG_LAST_BASE_DELTA])
// 2. Copy. // 2. Copy.
let size = 0 const { transferSize, vm: dstVm } = await (async () => {
const dstVm = await (async () => {
const { cancel, token } = CancelToken.source() const { cancel, token } = CancelToken.source()
const delta = await srcXapi.exportDeltaVm( const delta = await srcXapi.exportDeltaVm(
token, token,
@ -452,17 +451,6 @@ export default class {
delta.vm.other_config[TAG_EXPORT_TIME] = date delta.vm.other_config[TAG_EXPORT_TIME] = date
delta.vm.tags = [...delta.vm.tags, 'Continuous Replication'] delta.vm.tags = [...delta.vm.tags, 'Continuous Replication']
const { streams } = delta
forEach(delta.vdis, (vdi, key) => {
const id = `${key}.vhd`
const stream = streams[id]
const sizeStream = createSizeStream().once('finish', () => {
size += sizeStream.size
})
sizeStream.task = stream.task
streams[id] = stream.pipe(sizeStream)
})
let toRemove = filter( let toRemove = filter(
targetXapi.objects.all, targetXapi.objects.all,
obj => obj.$type === 'vm' && obj.other_config[TAG_SOURCE_VM] === uuid obj => obj.$type === 'vm' && obj.other_config[TAG_SOURCE_VM] === uuid
@ -508,7 +496,7 @@ export default class {
// 5. Return the identifier of the new XO VM object. // 5. Return the identifier of the new XO VM object.
id: xapiObjectToXo(dstVm).id, id: xapiObjectToXo(dstVm).id,
transferDuration: Date.now() - transferStart, transferDuration: Date.now() - transferStart,
transferSize: size, transferSize,
} }
} }
@ -914,11 +902,11 @@ export default class {
delta.vm.name_label += ` (${shortDate(datetime * 1e3)})` delta.vm.name_label += ` (${shortDate(datetime * 1e3)})`
delta.vm.tags.push('restored from backup') delta.vm.tags.push('restored from backup')
vm = await xapi.importDeltaVm(delta, { vm = (await xapi.importDeltaVm(delta, {
disableStartAfterImport: false, disableStartAfterImport: false,
srId: sr !== undefined && sr._xapiId, srId: sr !== undefined && sr._xapiId,
mapVdisSrs, mapVdisSrs,
}) })).vm
} else { } else {
throw new Error(`Unsupported delta backup version: ${version}`) throw new Error(`Unsupported delta backup version: ${version}`)
} }