Merge pull request #199 from vatesfr/continuous-replication

Continuous VM replication.
This commit is contained in:
Julien Fontanet 2016-01-17 23:51:29 +01:00
commit 7d4b9521e7
6 changed files with 649 additions and 76 deletions

View File

@ -77,6 +77,7 @@
"lodash.bind": "^3.0.0", "lodash.bind": "^3.0.0",
"lodash.difference": "^3.2.0", "lodash.difference": "^3.2.0",
"lodash.endswith": "^3.0.2", "lodash.endswith": "^3.0.2",
"lodash.every": "^4.0.0",
"lodash.filter": "^3.1.0", "lodash.filter": "^3.1.0",
"lodash.find": "^3.0.0", "lodash.find": "^3.0.0",
"lodash.findindex": "^3.0.0", "lodash.findindex": "^3.0.0",

View File

@ -536,6 +536,22 @@ exports.importDeltaBackup = importDeltaBackup
#--------------------------------------------------------------------- #---------------------------------------------------------------------
deltaCopy = ({ vm, sr }) -> @deltaCopyVm(vm, sr)
deltaCopy.params = {
vm: { type: 'string' },
sr: { type: 'string' }
}
deltaCopy.resolve = {
vm: [ 'vm', 'VM', 'operate'],
sr: [ 'sr', 'SR', 'operate']
}
exports.deltaCopy = deltaCopy
#---------------------------------------------------------------------
rollingSnapshot = $coroutine ({vm, tag, depth}) -> rollingSnapshot = $coroutine ({vm, tag, depth}) ->
yield checkPermissionOnSrs.call(this, vm) yield checkPermissionOnSrs.call(this, vm)
yield @rollingSnapshotVm(vm, tag, depth) yield @rollingSnapshotVm(vm, tag, depth)

View File

@ -179,6 +179,38 @@ deferrable.onFailure = (target, name, descriptor) => {
return deferrable(target, name, descriptor) return deferrable(target, name, descriptor)
} }
// Deferred functions are only executed on success.
//
// i.e.: defer.clear() is automatically called in case of failure.
deferrable.onSuccess = (target, name, descriptor) => {
let fn
function newFn (defer) {
try {
const result = fn.apply(this, arguments)
return isPromise(result)
? result.then(null, error => {
defer.clear()
throw error
})
: result
} catch (error) {
defer.clear()
throw error
}
}
if (descriptor) {
fn = descriptor.value
descriptor.value = newFn
} else {
fn = target
target = newFn
}
return deferrable(target, name, descriptor)
}
// ------------------------------------------------------------------- // -------------------------------------------------------------------
const _ownKeys = ( const _ownKeys = (

View File

@ -170,13 +170,13 @@ export function pDebug (promise, name) {
value => { value => {
console.log( console.log(
'%s', '%s',
`Promise ${name} resolved${value !== undefined ? `with ${kindOf(value)}` : ''}` `Promise ${name} resolved${value !== undefined ? ` with ${kindOf(value)}` : ''}`
) )
}, },
reason => { reason => {
console.log( console.log(
'%s', '%s',
`Promise ${name} rejected${reason !== undefined ? `with ${kindOf(reason)}` : ''}` `Promise ${name} rejected${reason !== undefined ? ` with ${kindOf(reason)}` : ''}`
) )
} }
) )

View File

@ -1,10 +1,12 @@
import createDebug from 'debug' import createDebug from 'debug'
import every from 'lodash.every'
import fatfs from 'fatfs'
import fatfsBuffer, { init as fatfsBufferInit } from './fatfs-buffer'
import find from 'lodash.find' import find from 'lodash.find'
import includes from 'lodash.includes' import includes from 'lodash.includes'
import isFunction from 'lodash.isfunction' import isFunction from 'lodash.isfunction'
import pick from 'lodash.pick'
import sortBy from 'lodash.sortby' import sortBy from 'lodash.sortby'
import fatfs from 'fatfs'
import fatfsBuffer, { init as fatfsBufferInit } from './fatfs-buffer'
import unzip from 'julien-f-unzip' import unzip from 'julien-f-unzip'
import { utcFormat, utcParse } from 'd3-time-format' import { utcFormat, utcParse } from 'd3-time-format'
import { import {
@ -13,15 +15,20 @@ import {
} from 'xen-api' } from 'xen-api'
import httpRequest from './http-request' import httpRequest from './http-request'
import {debounce} from './decorators' import {
debounce,
deferrable
} from './decorators'
import { import {
bufferToStream, bufferToStream,
camelToSnakeCase, camelToSnakeCase,
createRawObject, createRawObject,
ensureArray, ensureArray,
forEach, forEach,
map,
mapToArray, mapToArray,
noop, noop,
pAll,
parseSize, parseSize,
parseXml, parseXml,
pFinally, pFinally,
@ -37,6 +44,11 @@ const debug = createDebug('xo:xapi')
// =================================================================== // ===================================================================
const TAG_BASE_DELTA = 'xo:base_delta'
const TAG_COPY_SRC = 'xo:copy_of'
// ===================================================================
const OPAQUE_REF_RE = /OpaqueRef:[0-9a-z-]+/ const OPAQUE_REF_RE = /OpaqueRef:[0-9a-z-]+/
function extractOpaqueRef (str) { function extractOpaqueRef (str) {
const matches = OPAQUE_REF_RE.exec(str) const matches = OPAQUE_REF_RE.exec(str)
@ -51,32 +63,49 @@ function extractOpaqueRef (str) {
const put = (stream, { const put = (stream, {
headers: { ...headers } = {}, headers: { ...headers } = {},
...opts ...opts
}) => { }, task) => {
const { length } = stream const makeRequest = () => httpRequest({
if (length == null) {
headers['transfer-encoding'] = null
} else {
headers['content-length'] = length
}
const promise = httpRequest({
...opts, ...opts,
body: stream, body: stream,
headers, headers,
method: 'put' method: 'put'
}) })
if (length != null || !promise.request) { // Xen API does not support chunk encoding.
return promise.readAll() if (stream.length == null) {
headers['transfer-encoding'] = null
const promise = makeRequest()
if (task) {
// Some connections need the task to resolve (VDI import).
task::pFinally(() => {
promise.cancel()
})
} else {
// Some tasks need the connection to close (VM import).
promise.request.once('finish', () => {
promise.cancel()
})
}
return promise.readAll
} }
promise.request.once('finish', () => { return makeRequest().readAll()
promise.cancel()
})
return promise.catch(() => new Buffer(0))
} }
const asBoolean = value => Boolean(value)
// const asFloat = value => {
// value = String(value)
// return value.indexOf('.') === -1
// ? `${value}.0`
// : value
// }
const asInteger = value => String(value)
const filterUndefineds = obj => pick(obj, value => value !== undefined)
// =================================================================== // ===================================================================
const typeToNamespace = createRawObject() const typeToNamespace = createRawObject()
@ -254,11 +283,11 @@ export default class Xapi extends XapiBase {
// Create a task. // Create a task.
async _createTask (name = 'untitled task', description = '') { async _createTask (name = 'untitled task', description = '') {
const ref = await this.call('task.create', `[XO] ${name}`, description) const ref = await this.call('task.create', `[XO] ${name}`, description)
debug('task created: %s', name) debug('task created: %s (%s)', name, description)
this._watchTask(ref)::pFinally(() => { this._watchTask(ref)::pFinally(() => {
this.call('task.destroy', ref).then(() => { this.call('task.destroy', ref).then(() => {
debug('task destroyed: %s', name) debug('task destroyed: %s (%s)', name, description)
}) })
}) })
@ -324,8 +353,11 @@ export default class Xapi extends XapiBase {
if (value !== undefined) { if (value !== undefined) {
name = camelToSnakeCase(name) name = camelToSnakeCase(name)
return this.call(remove, ref, name).catch(noop) const removal = this.call(remove, ref, name).catch(noop)
.then(() => this.call(add, ref, name, value))
return value === null
? removal
: removal.catch(noop).then(() => this.call(add, ref, name, value))
} }
})) }))
} }
@ -564,10 +596,11 @@ export default class Xapi extends XapiBase {
// ----------------------------------------------------------------- // -----------------------------------------------------------------
async uploadPoolPatch (stream, patchName = 'unknown') { async uploadPoolPatch (stream, patchName = 'unknown') {
const taskRef = await this._createTask('Upload: ' + patchName) const taskRef = await this._createTask('Patch upload', patchName)
const task = this._watchTask(taskRef)
const [ patchRef ] = await Promise.all([ const [ patchRef ] = await Promise.all([
this._watchTask(taskRef), task,
put(stream, { put(stream, {
hostname: this.pool.$master.address, hostname: this.pool.$master.address,
path: '/pool_patch_upload', path: '/pool_patch_upload',
@ -575,7 +608,7 @@ export default class Xapi extends XapiBase {
session_id: this.sessionId, session_id: this.sessionId,
task_id: taskRef task_id: taskRef
} }
}) }, task)
]) ])
return this._getOrWaitObject(patchRef) return this._getOrWaitObject(patchRef)
@ -746,20 +779,36 @@ export default class Xapi extends XapiBase {
// Clone a VM: make a fast copy by fast copying each of its VDIs // Clone a VM: make a fast copy by fast copying each of its VDIs
// (using snapshots where possible) on the same SRs. // (using snapshots where possible) on the same SRs.
async _cloneVm (vm, nameLabel = vm.name_label) { _cloneVm (vm, nameLabel = vm.name_label) {
return await this.call('VM.clone', vm.$ref, nameLabel) debug(`Cloning VM ${vm.name_label}${
nameLabel !== vm.name_label
? ` as ${nameLabel}`
: ''
}`)
return this.call('VM.clone', vm.$ref, nameLabel)
} }
// Copy a VM: make a normal copy of a VM and all its VDIs. // Copy a VM: make a normal copy of a VM and all its VDIs.
// //
// If a SR is specified, it will contains the copies of the VDIs, // If a SR is specified, it will contains the copies of the VDIs,
// otherwise they will use the SRs they are on. // otherwise they will use the SRs they are on.
async _copyVm (vm, nameLabel = vm.nameLabel, sr = undefined) { async _copyVm (vm, nameLabel = vm.name_label, sr = undefined) {
let snapshotRef let snapshotRef
if (isVmRunning(vm)) { if (isVmRunning(vm)) {
snapshotRef = await this._snapshotVm(vm) snapshotRef = await this._snapshotVm(vm)
} }
debug(`Copying VM ${vm.name_label}${
nameLabel !== vm.name_label
? ` as ${nameLabel}`
: ''
}${
sr
? ` on ${sr.name_label}`
: ''
}`)
try { try {
return await this.call( return await this.call(
'VM.copy', 'VM.copy',
@ -778,6 +827,12 @@ export default class Xapi extends XapiBase {
} }
async _snapshotVm (vm, nameLabel = vm.name_label) { async _snapshotVm (vm, nameLabel = vm.name_label) {
debug(`Snapshotting VM ${vm.name_label}${
nameLabel !== vm.name_label
? ` as ${nameLabel}`
: ''
}`)
let ref let ref
try { try {
ref = await this.call('VM.snapshot_with_quiesce', vm.$ref, nameLabel) ref = await this.call('VM.snapshot_with_quiesce', vm.$ref, nameLabel)
@ -828,6 +883,11 @@ export default class Xapi extends XapiBase {
compress = true, compress = true,
nameLabel = undefined nameLabel = undefined
} = {}) { } = {}) {
// Fall back on local copy if possible.
if (targetXapi === this) {
return this.copyVm(vmId, targetSrId, { nameLabel })
}
const sr = targetXapi.getObject(targetSrId) const sr = targetXapi.getObject(targetSrId)
const stream = await this.exportVm(vmId, { const stream = await this.exportVm(vmId, {
compress, compress,
@ -852,6 +912,99 @@ export default class Xapi extends XapiBase {
return vm return vm
} }
// Low level create VM.
_createVm ({
actions_after_crash,
actions_after_reboot,
actions_after_shutdown,
affinity,
// appliance,
blocked_operations,
generation_id,
ha_always_run,
ha_restart_priority,
hardware_platform_version,
HVM_boot_params,
HVM_boot_policy,
HVM_shadow_multiplier,
is_a_template,
memory_dynamic_max,
memory_dynamic_min,
memory_static_max,
memory_static_min,
name_description,
name_label,
order,
other_config,
PCI_bus,
platform,
protection_policy,
PV_args,
PV_bootloader,
PV_bootloader_args,
PV_kernel,
PV_legacy_args,
PV_ramdisk,
recommendations,
shutdown_delay,
start_delay,
// suspend_SR,
tags,
user_version,
VCPUs_at_startup,
VCPUs_max,
VCPUs_params,
version,
xenstore_data
}) {
debug(`Creating VM ${name_label}`)
return this.call('VM.create', filterUndefineds({
actions_after_crash,
actions_after_reboot,
actions_after_shutdown,
affinity: affinity == null ? 'OpaqueRef:NULL' : affinity,
HVM_boot_params,
HVM_boot_policy,
is_a_template: asBoolean(is_a_template),
memory_dynamic_max: asInteger(memory_dynamic_max),
memory_dynamic_min: asInteger(memory_dynamic_min),
memory_static_max: asInteger(memory_static_max),
memory_static_min: asInteger(memory_static_min),
other_config,
PCI_bus,
platform,
PV_args,
PV_bootloader,
PV_bootloader_args,
PV_kernel,
PV_legacy_args,
PV_ramdisk,
recommendations,
user_version: asInteger(user_version),
VCPUs_at_startup: asInteger(VCPUs_at_startup),
VCPUs_max: asInteger(VCPUs_max),
VCPUs_params,
// Optional fields.
blocked_operations,
generation_id,
ha_always_run: asBoolean(ha_always_run),
ha_restart_priority,
hardware_platform_version,
// HVM_shadow_multiplier: asFloat(HVM_shadow_multiplier), // FIXME: does not work FIELD_TYPE_ERROR(hVM_shadow_multiplier)
name_description,
name_label,
order,
protection_policy,
shutdown_delay: asInteger(shutdown_delay),
start_delay: asInteger(start_delay),
tags,
version: asInteger(version),
xenstore_data
}))
}
// TODO: clean up on error. // TODO: clean up on error.
async createVm (templateId, { async createVm (templateId, {
nameDescription = undefined, nameDescription = undefined,
@ -1007,6 +1160,8 @@ export default class Xapi extends XapiBase {
} }
async _deleteVm (vm, deleteDisks) { async _deleteVm (vm, deleteDisks) {
debug(`Deleting VM ${vm.name_label}`)
// It is necessary for suspended VMs to be shut down // It is necessary for suspended VMs to be shut down
// to be able to delete their VDIs. // to be able to delete their VDIs.
if (vm.power_state !== 'Halted') { if (vm.power_state !== 'Halted') {
@ -1014,20 +1169,30 @@ export default class Xapi extends XapiBase {
} }
if (deleteDisks) { if (deleteDisks) {
await Promise.all(mapToArray(vm.$VBDs, vbd => { // Compute the VDIs list without duplicates.
const vdis = {}
forEach(vm.$VBDs, vbd => {
let vdi let vdi
if ( if (
// Do not remove CDs and Floppies. // Do not remove CDs and Floppies.
vbd.type === 'Disk' && vbd.type === 'Disk' &&
// Ignore VBD without VDI. // Ignore VBD without VDI.
(vdi = vbd.$VDI) && (vdi = vbd.$VDI)
) {
vdis[vdi.$id] = vdi
}
})
// Do not remove VDI attached to other VMs. await Promise.all(mapToArray(vdis, vdi => {
vdi.VBDs.length < 2 if (
// Do not remove VBDs attached to other VMs.
vdi.VBDs.length < 2 ||
every(vdi.$VBDs, vbd => vbd.VM === vm.$ref)
) { ) {
return this._deleteVdi(vdi).catch(noop) return this._deleteVdi(vdi).catch(noop)
} }
console.error(`cannot delete VDI ${vdi.name_label} (from VM ${vm.name_label})`)
})) }))
} }
@ -1092,6 +1257,229 @@ export default class Xapi extends XapiBase {
}) })
} }
// Create a snapshot of the VM and returns a delta export object.
@deferrable.onFailure
async exportDeltaVm ($onFailure, vmId, baseVmId = undefined) {
const vm = await this.snapshotVm(vmId)
$onFailure(() => this._deleteVm(vm, true))
const baseVm = baseVmId && this.getObject(baseVmId)
const baseVdis = {}
baseVm && forEach(baseVm.$VBDs, vbd => {
baseVdis[vbd.VDI] = vbd.$VDI
})
const streams = {}
const vdis = {}
const vbds = {}
forEach(vm.$VBDs, vbd => {
const vdiId = vbd.VDI
if (!vdiId || vbd.type !== 'Disk') {
// Ignore this VBD.
return
}
vbds[vbd.$ref] = vbd
if (vdiId in vdis) {
// This VDI has already been managed.
return
}
const vdi = vbd.$VDI
// Look for a snapshot of this vdi in the base VM.
let baseVdi
baseVm && forEach(vdi.$snapshot_of.$snapshots, vdi => {
if (baseVdis[vdi.$ref]) {
baseVdi = vdi
// Stop iterating.
return false
}
})
vdis[vdiId] = baseVdi
? {
...vdi,
other_config: {
...vdi.other_config,
[TAG_BASE_DELTA]: baseVdi.uuid
}
}
: vdi
const stream = streams[`${vdiId}.vhd`] = this._exportVdi(vdi, baseVdi, VDI_FORMAT_VHD)
$onFailure(() => stream.cancel())
})
const vifs = {}
forEach(vm.$VIFs, vif => {
vifs[vif.$ref] = vif
})
return {
// TODO: make non-enumerable?
streams: await streams::pAll(),
vbds,
vdis,
vifs,
vm: baseVm
? {
...vm,
other_config: {
...vm.other_config,
[TAG_BASE_DELTA]: baseVm.uuid
}
}
: vm
}
}
@deferrable.onFailure
async importDeltaVm ($onFailure, delta, {
deleteBase = false,
name_label = delta.vm.name_label,
srId = this.pool.default_SR
} = {}) {
const remoteBaseVmUuid = delta.vm.other_config[TAG_BASE_DELTA]
let baseVm
if (remoteBaseVmUuid) {
baseVm = find(this.objects.all, obj => (
(obj = obj.other_config) &&
obj[TAG_COPY_SRC] === remoteBaseVmUuid
))
if (!baseVm) {
throw new Error('could not find the base VM')
}
}
const sr = this.getObject(srId)
const baseVdis = {}
baseVm && forEach(baseVm.$VBDs, vbd => {
baseVdis[vbd.VDI] = vbd.$VDI
})
const { streams } = delta
// 1. Create the VMs.
const vm = await this._getOrWaitObject(
await this._createVm({
...delta.vm,
affinity: null,
is_a_template: false
})
)
$onFailure(() => this._deleteVm(vm))
await Promise.all([
this._setObjectProperties(vm, {
name_label: `[Importing…] ${name_label}`
}),
this._updateObjectMapProperty(vm, 'blocked_operations', {
start: 'Importing…'
}),
this._updateObjectMapProperty(vm, 'other_config', {
[TAG_COPY_SRC]: delta.vm.uuid
})
])
// 2. Delete all VBDs which may have been created by the import.
await Promise.all(mapToArray(
vm.$VBDs,
vbd => this._deleteVbd(vbd).catch(noop)
))
// 3. Create VDIs.
const newVdis = await map(delta.vdis, async vdi => {
const remoteBaseVdiUuid = vdi.other_config[TAG_BASE_DELTA]
if (!remoteBaseVdiUuid) {
const newVdi = await this.createVdi(vdi.virtual_size, {
...vdi,
other_config: {
...vdi.other_config,
[TAG_BASE_DELTA]: undefined,
[TAG_COPY_SRC]: vdi.uuid
},
sr: sr.$id
})
$onFailure(() => this._deleteVdi(newVdi))
return newVdi
}
const baseVdi = find(
baseVdis,
vdi => vdi.other_config[TAG_COPY_SRC] === remoteBaseVdiUuid
)
if (!baseVdi) {
throw new Error(`missing base VDI (copy of ${remoteBaseVdiUuid})`)
}
const newVdi = await this._getOrWaitObject(
await this._cloneVdi(baseVdi)
)
$onFailure(() => this._deleteVdi(newVdi))
await this._updateObjectMapProperty(newVdi, 'other_config', {
[TAG_COPY_SRC]: vdi.uuid
})
return newVdi
})::pAll()
const networksOnPoolMasterByDevice = {}
let defaultNetwork
forEach(this.pool.$master.$PIFs, pif => {
defaultNetwork = networksOnPoolMasterByDevice[pif.device] = pif.$network
})
await Promise.all([
// Create VBDs.
Promise.all(mapToArray(
delta.vbds,
vbd => this._createVbd(vm, newVdis[vbd.VDI], vbd)
)),
// Import VDI contents.
Promise.all(mapToArray(
newVdis,
(vdi, id) => this._importVdiContent(vdi, streams[`${id}.vhd`], VDI_FORMAT_VHD)
)),
// Wait for VDI export tasks (if any) termination.
Promise.all(mapToArray(
streams,
stream => stream.task
)),
// Create VIFs.
defaultNetwork && Promise.all(mapToArray(delta.vifs, vif => this._createVif(
vm,
networksOnPoolMasterByDevice[vif.device] || defaultNetwork,
vif
)))
])
if (deleteBase && baseVm) {
this._deleteVm(baseVm, true).catch(noop)
}
await Promise.all([
this._setObjectProperties(vm, {
name_label
}),
this._updateObjectMapProperty(vm, 'blocked_operations', {
start: 'Do not start this VM, clone it if you want to use it.' // FIXME: move
})
])
return vm
}
async _migrateVMWithStorageMotion (vm, hostXapi, host, { async _migrateVMWithStorageMotion (vm, hostXapi, host, {
migrationNetwork = find(host.$PIFs, pif => pif.management).$network, // TODO: handle not found migrationNetwork = find(host.$PIFs, pif => pif.management).$network, // TODO: handle not found
sr = host.$pool.$default_SR, // TODO: handle not found sr = host.$pool.$default_SR, // TODO: handle not found
@ -1242,6 +1630,8 @@ export default class Xapi extends XapiBase {
} }
_startVm (vm) { _startVm (vm) {
debug(`Starting VM ${vm.name_label}`)
return this.call( return this.call(
'VM.start', 'VM.start',
vm.$ref, vm.$ref,
@ -1353,6 +1743,8 @@ export default class Xapi extends XapiBase {
readOnly = (mode === 'RO') readOnly = (mode === 'RO')
} = {}) { } = {}) {
debug(`Creating VBD for VDI ${vdi.name_label} on VM ${vm.name_label}`)
if (position == null) { if (position == null) {
const allowed = await this.call('VM.get_allowed_VBD_devices', vm.$ref) const allowed = await this.call('VM.get_allowed_VBD_devices', vm.$ref)
const {length} = allowed const {length} = allowed
@ -1397,29 +1789,42 @@ export default class Xapi extends XapiBase {
return vbdRef return vbdRef
} }
_cloneVdi (vdi) {
debug(`Cloning VDI ${vdi.name_label}`)
return this.call('VDI.clone', vdi.$ref)
}
async _createVdi (size, { async _createVdi (size, {
name_description = undefined, name_description = undefined,
name_label = '', name_label = '',
other_config = {},
read_only = false, read_only = false,
sharable = false, sharable = false,
// FIXME: should be named srId or an object.
sr = this.pool.default_SR, sr = this.pool.default_SR,
tags = [], tags = [],
type = 'user', type = 'user',
xenstore_data = undefined xenstore_data = undefined
} = {}) { } = {}) {
sr = this.getObject(sr)
debug(`Creating VDI ${name_label} on ${sr.name_label}`)
sharable = Boolean(sharable) sharable = Boolean(sharable)
read_only = Boolean(read_only) read_only = Boolean(read_only)
const data = { const data = {
name_description, name_description,
name_label, name_label,
other_config: {}, other_config,
read_only, read_only,
sharable, sharable,
tags, tags,
type, type,
virtual_size: String(size), virtual_size: String(size),
SR: this.getObject(sr).$ref SR: sr.$ref
} }
if (xenstore_data) { if (xenstore_data) {
@ -1432,6 +1837,8 @@ export default class Xapi extends XapiBase {
async moveVdi (vdiId, srId) { async moveVdi (vdiId, srId) {
const vdi = this.getObject(vdiId) const vdi = this.getObject(vdiId)
const sr = this.getObject(srId) const sr = this.getObject(srId)
debug(`Moving VDI ${vdi.name_label} from vdi.$SR.name_label to ${sr.name_label}`)
try { try {
await this.call('VDI.pool_migrate', vdi.$ref, sr.$ref, {}) await this.call('VDI.pool_migrate', vdi.$ref, sr.$ref, {})
} catch (error) { } catch (error) {
@ -1458,10 +1865,14 @@ export default class Xapi extends XapiBase {
// TODO: check whether the VDI is attached. // TODO: check whether the VDI is attached.
async _deleteVdi (vdi) { async _deleteVdi (vdi) {
debug(`Deleting VDI ${vdi.name_label}`)
await this.call('VDI.destroy', vdi.$ref) await this.call('VDI.destroy', vdi.$ref)
} }
async _resizeVdi (vdi, size) { async _resizeVdi (vdi, size) {
debug(`Resizing VDI ${vdi.name_label} from ${vdi.virtual_size} to ${size}`)
try { try {
await this.call('VDI.resize_online', vdi.$ref, String(size)) await this.call('VDI.resize_online', vdi.$ref, String(size))
} catch (error) { } catch (error) {
@ -1529,11 +1940,21 @@ export default class Xapi extends XapiBase {
await this.call('VBD.plug', vbdId) await this.call('VBD.plug', vbdId)
} }
async disconnectVbd (vbdId) { _disconnectVbd (vbd) {
// TODO: check if VBD is attached before // TODO: check if VBD is attached before
await this.call('VBD.unplug_force', vbdId) return this.call('VBD.unplug_force', vbd.$ref)
} }
async disconnectVbd (vbdId) {
await this._disconnectVbd(this.getObject(vbdId))
}
async _deleteVbd (vbd) {
await this._disconnectVbd(vbd).catch(noop)
await this.call('VBD.destroy', vbd.$ref)
}
// TODO: remove when no longer used.
async destroyVbdsFromVm (vmId) { async destroyVbdsFromVm (vmId) {
await Promise.all( await Promise.all(
mapToArray(this.getObject(vmId).$VBDs, async vbd => { mapToArray(this.getObject(vmId).$VBDs, async vbd => {
@ -1585,9 +2006,7 @@ export default class Xapi extends XapiBase {
return snap return snap
} }
// Returns a stream to the exported VDI. async _exportVdi (vdi, base, format = VDI_FORMAT_VHD) {
async exportVdi (vdiId, { baseId = undefined, format = VDI_FORMAT_VHD } = {}) {
const vdi = this.getObject(vdiId)
const host = vdi.$SR.$PBDs[0].$host const host = vdi.$SR.$PBDs[0].$host
const taskRef = await this._createTask('VDI Export', vdi.name_label) const taskRef = await this._createTask('VDI Export', vdi.name_label)
@ -1597,22 +2016,48 @@ export default class Xapi extends XapiBase {
task_id: taskRef, task_id: taskRef,
vdi: vdi.$ref vdi: vdi.$ref
} }
if (base) {
if (baseId) { query.base = base.$ref
query.base = this.getObject(baseId).$ref
} }
debug(`exporting VDI ${vdi.name_label}${base
? ` (from base ${vdi.name_label})`
: ''
}`)
const task = this._watchTask(taskRef)
return httpRequest({ return httpRequest({
hostname: host.address, hostname: host.address,
path: '/export_raw_vdi/', path: '/export_raw_vdi/',
query query
}).then(response => {
response.cancel = (cancel => () => {
return new Promise(resolve => {
resolve(cancel())
}).then(() => task.catch(noop))
})(response.cancel)
response.task = task
return response
}) })
} }
// Returns a stream to the exported VDI.
exportVdi (vdiId, {
baseId,
format
} = {}) {
return this._exportVdi(
this.getObject(vdiId),
baseId && this.getObject(baseId),
format
)
}
// ----------------------------------------------------------------- // -----------------------------------------------------------------
async importVdiContent (vdiId, stream, { format = VDI_FORMAT_VHD } = {}) { async _importVdiContent (vdi, stream, format = VDI_FORMAT_VHD) {
const vdi = this.getObject(vdiId) const taskRef = await this._createTask('VDI Content Import', vdi.name_label)
const taskRef = await this._createTask('VDI import')
const query = { const query = {
session_id: this.sessionId, session_id: this.sessionId,
@ -1623,48 +2068,64 @@ export default class Xapi extends XapiBase {
const host = vdi.$SR.$PBDs[0].$host const host = vdi.$SR.$PBDs[0].$host
const upload = put(stream, { const task = this._watchTask(taskRef)
hostname: host.address,
method: 'put',
path: '/import_raw_vdi/',
query
})
await Promise.all([ await Promise.all([
upload, task,
this._watchTask(taskRef) put(stream, {
hostname: host.address,
method: 'put',
path: '/import_raw_vdi/',
query
}, task)
]) ])
} }
importVdiContent (vdiId, stream, {
format
} = {}) {
return this._importVdiContent(
this.getObject(vdiId),
stream,
format
)
}
// ================================================================= // =================================================================
async _createVif (vm, network, { async _createVif (vm, network, {
mac = '', mac = '',
mtu = 1500, mtu = 1500,
position = undefined position = undefined,
} = {}) {
// TODO: use VM.get_allowed_VIF_devices()?
if (position == null) {
forEach(vm.$VIFs, vif => {
const curPos = +vif.device
if (!(position > curPos)) {
position = curPos
}
})
position = position == null ? 0 : position + 1 device = position != null && String(position),
ipv4_allowed = undefined,
ipv6_allowed = undefined,
locking_mode = undefined,
MAC = mac,
MTU = mtu,
other_config = {},
qos_algorithm_params = {},
qos_algorithm_type = ''
} = {}) {
debug(`Creating VIF for VM ${vm.name_label} on network ${network.name_label}`)
if (device == null) {
device = (await this.call('VM.get_allowed_VIF_devices', vm.$ref))[0]
} }
const vifRef = await this.call('VIF.create', { const vifRef = await this.call('VIF.create', filterUndefineds({
device: String(position), device,
MAC: String(mac), ipv4_allowed,
MTU: String(mtu), ipv6_allowed,
locking_mode,
MAC,
MTU: asInteger(MTU),
network: network.$ref, network: network.$ref,
other_config: {}, other_config,
qos_algorithm_params: {}, qos_algorithm_params,
qos_algorithm_type: '', qos_algorithm_type,
VM: vm.$ref VM: vm.$ref
}) }))
if (isVmRunning(vm)) { if (isVmRunning(vm)) {
await this.call('VIF.plug', vifRef) await this.call('VIF.plug', vifRef)

View File

@ -23,6 +23,9 @@ import {
} from 'path' } from 'path'
import xapiObjectToXo from '../xapi-object-to-xo' import xapiObjectToXo from '../xapi-object-to-xo'
import {
deferrable
} from '../decorators'
import { import {
forEach, forEach,
mapToArray, mapToArray,
@ -114,6 +117,66 @@ export default class {
// ----------------------------------------------------------------- // -----------------------------------------------------------------
@deferrable.onFailure
async deltaCopyVm ($onFailure, srcVm, targetSr) {
const srcXapi = this._xo.getXapi(srcVm)
const targetXapi = this._xo.getXapi(targetSr)
// Get Xen objects from XO objects.
srcVm = srcXapi.getObject(srcVm._xapiId)
targetSr = targetXapi.getObject(targetSr._xapiId)
// 1. Find the local base for this SR (if any).
const TAG_LAST_BASE_DELTA = `xo:base_delta:${targetSr.uuid}`
const localBaseUuid = (id => {
if (id != null) {
const base = srcXapi.getObject(id, null)
return base && base.uuid
}
})(srcVm.other_config[TAG_LAST_BASE_DELTA])
// 2. Copy.
const dstVm = await (async () => {
const delta = await srcXapi.exportDeltaVm(srcVm.$id, localBaseUuid)
$onFailure(async () => {
await Promise.all(mapToArray(
delta.streams,
stream => stream.cancel()
))
return srcXapi.deleteVm(delta.vm.$id, true)
})
const promise = targetXapi.importDeltaVm(
delta,
{
deleteBase: true, // Remove the remote base.
srId: targetSr.$id
}
)
// Once done, (asynchronously) remove the (now obsolete) local
// base.
if (localBaseUuid) {
promise.then(() => srcXapi.deleteVm(localBaseUuid, true)).catch(noop)
}
// (Asynchronously) Identify snapshot as future base.
promise.then(() => {
return srcXapi._updateObjectMapProperty(srcVm, 'other_config', {
[TAG_LAST_BASE_DELTA]: delta.vm.uuid
})
}).catch(noop)
return promise
})()
// 5. Return the identifier of the new XO VM object.
return xapiObjectToXo(dstVm).id
}
// -----------------------------------------------------------------
// TODO: The other backup methods must use this function ! // TODO: The other backup methods must use this function !
// Prerequisite: The backups array must be ordered. (old to new backups) // Prerequisite: The backups array must be ordered. (old to new backups)
async _removeOldBackups (backups, path, n) { async _removeOldBackups (backups, path, n) {