chore: improve backup/export/import code (#575)

Should fixes vatesfr/xo-web#2227
This commit is contained in:
Julien Fontanet 2017-06-30 13:09:04 +02:00 committed by GitHub
commit 9c34e64d0e
15 changed files with 175 additions and 238 deletions

View File

@ -68,7 +68,7 @@
"helmet": "^3.6.0", "helmet": "^3.6.0",
"highland": "^2.10.5", "highland": "^2.10.5",
"http-proxy": "^1.16.2", "http-proxy": "^1.16.2",
"http-request-plus": "^0.3.0", "http-request-plus": "^0.4.0",
"http-server-plus": "^0.8.0", "http-server-plus": "^0.8.0",
"human-format": "^0.8.0", "human-format": "^0.8.0",
"is-redirect": "^1.0.0", "is-redirect": "^1.0.0",
@ -95,7 +95,7 @@
"passport": "^0.3.2", "passport": "^0.3.2",
"passport-local": "^1.0.0", "passport-local": "^1.0.0",
"pretty-format": "^20.0.1", "pretty-format": "^20.0.1",
"promise-toolbox": "^0.9.2", "promise-toolbox": "^0.9.5",
"proxy-agent": "^2.0.0", "proxy-agent": "^2.0.0",
"pug": "^2.0.0-rc.2", "pug": "^2.0.0-rc.2",
"pw": "^0.0.4", "pw": "^0.0.4",
@ -110,7 +110,7 @@
"tmp": "^0.0.31", "tmp": "^0.0.31",
"uuid": "^3.0.1", "uuid": "^3.0.1",
"ws": "^3.0.0", "ws": "^3.0.0",
"xen-api": "^0.13.1", "xen-api": "^0.13.5",
"xml2js": "~0.4.17", "xml2js": "~0.4.17",
"xo-acl-resolver": "^0.2.3", "xo-acl-resolver": "^0.2.3",
"xo-collection": "^0.4.1", "xo-collection": "^0.4.1",

View File

@ -1,14 +1,10 @@
import { import { ignoreErrors } from 'promise-toolbox'
noop,
pCatch
} from '../utils'
export async function add ({autoConnect = true, ...props}) { export async function add ({autoConnect = true, ...props}) {
const server = await this.registerXenServer(props) const server = await this.registerXenServer(props)
if (autoConnect) { if (autoConnect) {
// Connect asynchronously, ignore any errors. this.connectXenServer(server.id)::ignoreErrors()
this.connectXenServer(server.id)::pCatch(noop)
} }
return server.id return server.id
@ -109,7 +105,7 @@ set.params = {
// ------------------------------------------------------------------- // -------------------------------------------------------------------
export async function connect ({id}) { export async function connect ({id}) {
this.updateXenServer(id, {enabled: true})::pCatch(noop) this.updateXenServer(id, {enabled: true})::ignoreErrors()
await this.connectXenServer(id) await this.connectXenServer(id)
} }
@ -126,7 +122,7 @@ connect.params = {
// ------------------------------------------------------------------- // -------------------------------------------------------------------
export async function disconnect ({id}) { export async function disconnect ({id}) {
this.updateXenServer(id, {enabled: false})::pCatch(noop) this.updateXenServer(id, {enabled: false})::ignoreErrors()
await this.disconnectXenServer(id) await this.disconnectXenServer(id)
} }

View File

@ -1,8 +1,6 @@
import { import { ignoreErrors } from 'promise-toolbox'
diffItems,
noop, import { diffItems } from '../utils'
pCatch
} from '../utils'
// =================================================================== // ===================================================================
@ -12,7 +10,7 @@ async function delete_ ({vif}) {
vif.id, vif.id,
null, null,
vif.allowedIpv4Addresses.concat(vif.allowedIpv6Addresses) vif.allowedIpv4Addresses.concat(vif.allowedIpv6Addresses)
)::pCatch(noop) )::ignoreErrors()
await this.getXapi(vif).deleteVif(vif._xapiId) await this.getXapi(vif).deleteVif(vif._xapiId)
} }

View File

@ -12,12 +12,13 @@ sortBy = require 'lodash/sortBy'
startsWith = require 'lodash/startsWith' startsWith = require 'lodash/startsWith'
{coroutine: $coroutine} = require 'bluebird' {coroutine: $coroutine} = require 'bluebird'
{format} = require 'json-rpc-peer' {format} = require 'json-rpc-peer'
{ ignoreErrors } = require('promise-toolbox')
{ {
forbiddenOperation, forbiddenOperation,
invalidParameters, invalidParameters,
unauthorized unauthorized
} = require('xo-common/api-errors') } = require('xo-common/api-errors')
{ {
forEach, forEach,
formatXml: $js2xml, formatXml: $js2xml,
@ -25,10 +26,8 @@ startsWith = require 'lodash/startsWith'
map, map,
mapFilter, mapFilter,
mapToArray, mapToArray,
noop,
parseSize, parseSize,
parseXml, parseXml,
pCatch,
pFinally pFinally
} = require '../utils' } = require '../utils'
{isVmRunning: $isVmRunning} = require('../xapi') {isVmRunning: $isVmRunning} = require('../xapi')
@ -174,7 +173,7 @@ create = $coroutine (params) ->
) )
if params.bootAfterCreate if params.bootAfterCreate
pCatch.call(xapi.startVm(vm._xapiId), noop) ignoreErrors.call(xapi.startVm(vm._xapiId))
return vm.id return vm.id
@ -324,9 +323,8 @@ delete_ = $coroutine ({vm, delete_disks: deleteDisks = false }) ->
@getAllAcls().then((acls) => @getAllAcls().then((acls) =>
Promise.all(mapFilter(acls, (acl) => Promise.all(mapFilter(acls, (acl) =>
if (acl.object == vm.id) if (acl.object == vm.id)
return pCatch.call( return ignoreErrors.call(
@removeAcl(acl.subject, acl.object, acl.action), @removeAcl(acl.subject, acl.object, acl.action)
noop
) )
)) ))
) )
@ -334,13 +332,12 @@ delete_ = $coroutine ({vm, delete_disks: deleteDisks = false }) ->
# Update IP pools # Update IP pools
yield Promise.all(map(vm.VIFs, (vifId) => yield Promise.all(map(vm.VIFs, (vifId) =>
vif = xapi.getObject(vifId) vif = xapi.getObject(vifId)
return pCatch.call( return ignoreErrors.call(
this.allocIpAddresses( this.allocIpAddresses(
vifId, vifId,
null, null,
concat(vif.ipv4_allowed, vif.ipv6_allowed) concat(vif.ipv4_allowed, vif.ipv6_allowed)
), )
noop
) )
)) ))
@ -364,12 +361,11 @@ delete_ = $coroutine ({vm, delete_disks: deleteDisks = false }) ->
resourceSetUsage = @computeVmResourcesUsage(vm) resourceSetUsage = @computeVmResourcesUsage(vm)
ipPoolsUsage = yield @computeVmIpPoolsUsage(vm) ipPoolsUsage = yield @computeVmIpPoolsUsage(vm)
pCatch.call( ignoreErrors.call(
@releaseLimitsInResourceSet( @releaseLimitsInResourceSet(
merge(resourceSetUsage, ipPoolsUsage), merge(resourceSetUsage, ipPoolsUsage),
resourceSet resourceSet
), )
noop
) )
return xapi.deleteVm(vm._xapiId, deleteDisks) return xapi.deleteVm(vm._xapiId, deleteDisks)
@ -1172,7 +1168,7 @@ createInterface = $coroutine ({
{ push } = ipAddresses = [] { push } = ipAddresses = []
push.apply(ipAddresses, allowedIpv4Addresses) if allowedIpv4Addresses push.apply(ipAddresses, allowedIpv4Addresses) if allowedIpv4Addresses
push.apply(ipAddresses, allowedIpv6Addresses) if allowedIpv6Addresses push.apply(ipAddresses, allowedIpv6Addresses) if allowedIpv6Addresses
pCatch.call(@allocIpAddresses(vif.$id, allo), noop) if ipAddresses.length ignoreErrors.call(@allocIpAddresses(vif.$id, allo)) if ipAddresses.length
return vif.$id return vif.$id

View File

@ -1,5 +1,6 @@
import eventToPromise from 'event-to-promise' import eventToPromise from 'event-to-promise'
import through2 from 'through2' import through2 from 'through2'
import { ignoreErrors } from 'promise-toolbox'
import { import {
parse parse
@ -8,8 +9,6 @@ import {
import { import {
addChecksumToReadStream, addChecksumToReadStream,
getPseudoRandomBytes, getPseudoRandomBytes,
noop,
pCatch,
streamToBuffer, streamToBuffer,
validChecksumOfReadStream validChecksumOfReadStream
} from '../utils' } from '../utils'
@ -70,7 +69,7 @@ export default class RemoteHandlerAbstract {
error: error.message || String(error) error: error.message || String(error)
} }
} finally { } finally {
this.unlink(testFileName).catch(noop) this.unlink(testFileName)::ignoreErrors()
} }
} }
@ -127,9 +126,12 @@ export default class RemoteHandlerAbstract {
options.end === undefined && options.end === undefined &&
options.start === undefined options.start === undefined
) { ) {
promise = Promise.all([ promise, this.getSize(file).then(size => { promise = Promise.all([
promise,
this.getSize(file).then(size => {
stream.length = size stream.length = size
}, noop) ]) })::ignoreErrors()
])
} }
return promise.then(() => stream) return promise.then(() => stream)
@ -140,7 +142,7 @@ export default class RemoteHandlerAbstract {
} }
// avoid a unhandled rejection warning // avoid a unhandled rejection warning
streamP.catch(noop) streamP::ignoreErrors()
return this.readFile(`${file}.checksum`).then( return this.readFile(`${file}.checksum`).then(
checksum => streamP.then(stream => { checksum => streamP.then(stream => {
@ -206,7 +208,7 @@ export default class RemoteHandlerAbstract {
checksum = true checksum = true
} = {}) { } = {}) {
if (checksum) { if (checksum) {
this._unlink(`${file}.checksum`)::pCatch(noop) this._unlink(`${file}.checksum`)::ignoreErrors()
} }
return this._unlink(file) return this._unlink(file)

View File

@ -367,7 +367,6 @@ export function pSettle (promises) {
export { export {
all as pAll, all as pAll,
catchPlus as pCatch,
delay as pDelay, delay as pDelay,
fromCallback as pFromCallback, fromCallback as pFromCallback,
isPromise, isPromise,

View File

@ -391,7 +391,7 @@ export default class XapiStats {
json: 'true', json: 'true',
start: timestamp start: timestamp
} }
}).readAll().then(JSON5.parse) }).then(response => response.readAll().then(JSON5.parse))
} }
async _getLastTimestamp (xapi, host, step) { async _getLastTimestamp (xapi, host, step) {

View File

@ -4,7 +4,7 @@ import fatfs from 'fatfs'
import synchronized from 'decorator-synchronized' import synchronized from 'decorator-synchronized'
import tarStream from 'tar-stream' import tarStream from 'tar-stream'
import vmdkToVhd from 'xo-vmdk-to-vhd' import vmdkToVhd from 'xo-vmdk-to-vhd'
import { cancellable, defer } from 'promise-toolbox' import { cancellable, catchPlus as pCatch, defer, ignoreErrors } from 'promise-toolbox'
import { PassThrough } from 'stream' import { PassThrough } from 'stream'
import { forbiddenOperation } from 'xo-common/api-errors' import { forbiddenOperation } from 'xo-common/api-errors'
import { import {
@ -20,7 +20,6 @@ import {
uniq uniq
} from 'lodash' } from 'lodash'
import { import {
wrapError as wrapXapiError,
Xapi as XapiBase Xapi as XapiBase
} from 'xen-api' } from 'xen-api'
import { import {
@ -31,6 +30,7 @@ import createSizeStream from '../size-stream'
import fatfsBuffer, { init as fatfsBufferInit } from '../fatfs-buffer' import fatfsBuffer, { init as fatfsBufferInit } from '../fatfs-buffer'
import { mixin } from '../decorators' import { mixin } from '../decorators'
import { import {
asyncMap,
camelToSnakeCase, camelToSnakeCase,
createRawObject, createRawObject,
ensureArray, ensureArray,
@ -38,9 +38,7 @@ import {
isFunction, isFunction,
map, map,
mapToArray, mapToArray,
noop,
pAll, pAll,
pCatch,
pDelay, pDelay,
pFinally, pFinally,
promisifyAll, promisifyAll,
@ -100,7 +98,6 @@ export default class Xapi extends XapiBase {
const genericWatchers = this._genericWatchers = createRawObject() const genericWatchers = this._genericWatchers = createRawObject()
const objectsWatchers = this._objectWatchers = createRawObject() const objectsWatchers = this._objectWatchers = createRawObject()
const taskWatchers = this._taskWatchers = createRawObject()
const onAddOrUpdate = objects => { const onAddOrUpdate = objects => {
forEach(objects, object => { forEach(objects, object => {
@ -123,21 +120,6 @@ export default class Xapi extends XapiBase {
objectsWatchers[ref].resolve(object) objectsWatchers[ref].resolve(object)
delete objectsWatchers[ref] delete objectsWatchers[ref]
} }
// Watched task.
if (ref in taskWatchers) {
const {status} = object
if (status === 'success') {
taskWatchers[ref].resolve(object.result)
} else if (status === 'failure') {
taskWatchers[ref].reject(wrapXapiError(object.error_info))
} else {
return
}
delete taskWatchers[ref]
}
}) })
} }
this.objects.on('add', onAddOrUpdate) this.objects.on('add', onAddOrUpdate)
@ -154,6 +136,10 @@ export default class Xapi extends XapiBase {
return loop() return loop()
} }
createTask (name = 'untitled task', description) {
return super.createTask(`[XO] ${name}`, description)
}
// ================================================================= // =================================================================
_registerGenericWatcher (fn) { _registerGenericWatcher (fn) {
@ -228,36 +214,6 @@ export default class Xapi extends XapiBase {
// ================================================================= // =================================================================
// Create a task.
async _createTask (name = 'untitled task', description = '') {
const ref = await this.call('task.create', `[XO] ${name}`, description)
debug('task created: %s (%s)', name, description)
this._watchTask(ref)::pFinally(() => {
this.call('task.destroy', ref).then(() => {
debug('task destroyed: %s (%s)', name, description)
})
})
return ref
}
// Waits for a task to be resolved.
_watchTask (ref) {
// If a task object is passed, unpacked the ref.
if (typeof ref === 'object' && ref.$ref) ref = ref.$ref
let watcher = this._taskWatchers[ref]
if (!watcher) {
// Register the watcher.
watcher = this._taskWatchers[ref] = defer()
}
return watcher.promise
}
// =================================================================
_setObjectProperty (object, name, value) { _setObjectProperty (object, name, value) {
return this.call( return this.call(
`${getNamespaceForType(object.$type)}.set_${camelToSnakeCase(name)}`, `${getNamespaceForType(object.$type)}.set_${camelToSnakeCase(name)}`,
@ -280,7 +236,7 @@ export default class Xapi extends XapiBase {
if (value != null) { if (value != null) {
return this.call(`${namespace}.set_${camelToSnakeCase(name)}`, ref, prepareXapiParam(value)) return this.call(`${namespace}.set_${camelToSnakeCase(name)}`, ref, prepareXapiParam(value))
} }
}))::pCatch(noop) }))::ignoreErrors()
} }
async _updateObjectMapProperty (object, prop, values) { async _updateObjectMapProperty (object, prop, values) {
@ -302,7 +258,7 @@ export default class Xapi extends XapiBase {
return value === null return value === null
? removal ? removal
: removal::pCatch(noop).then(() => this.call(add, ref, name, prepareXapiParam(value))) : removal::ignoreErrors().then(() => this.call(add, ref, name, prepareXapiParam(value)))
} }
})) }))
} }
@ -719,14 +675,14 @@ export default class Xapi extends XapiBase {
vdi.VBDs.length < 2 || vdi.VBDs.length < 2 ||
every(vdi.$VBDs, vbd => vbd.VM === vm.$ref) every(vdi.$VBDs, vbd => vbd.VM === vm.$ref)
) { ) {
return this._deleteVdi(vdi)::pCatch(noop) return this._deleteVdi(vdi)::ignoreErrors()
} }
console.error(`cannot delete VDI ${vdi.name_label} (from VM ${vm.name_label})`) console.error(`cannot delete VDI ${vdi.name_label} (from VM ${vm.name_label})`)
})) }))
} }
await Promise.all(mapToArray(vm.$snapshots, snapshot => await Promise.all(mapToArray(vm.$snapshots, snapshot =>
this.deleteVm(snapshot.$id)::pCatch(noop) this.deleteVm(snapshot.$id)::ignoreErrors()
)) ))
await this.call('VM.destroy', vm.$ref) await this.call('VM.destroy', vm.$ref)
@ -765,21 +721,22 @@ export default class Xapi extends XapiBase {
snapshotRef = (await this._snapshotVm(vm)).$ref snapshotRef = (await this._snapshotVm(vm)).$ref
} }
const taskRef = await this._createTask('VM Export', vm.name_label) const promise = this.getResource(onlyMetadata ? '/export_metadata/' : '/export/', {
if (snapshotRef) {
this._watchTask(taskRef)::pFinally(() => {
this.deleteVm(snapshotRef)::pCatch(noop)
})
}
return this.getResource(onlyMetadata ? '/export_metadata/' : '/export/', {
host, host,
query: { query: {
ref: snapshotRef || vm.$ref, ref: snapshotRef || vm.$ref,
task_id: taskRef,
use_compression: compress ? 'true' : 'false' use_compression: compress ? 'true' : 'false'
} },
task: this.createTask('VM export', vm.name_label)
}) })
if (snapshotRef !== undefined) {
promise.then(_ => _.task::pFinally(() =>
this.deleteVm(snapshotRef)::ignoreErrors()
))
}
return promise
} }
_assertHealthyVdiChain (vdi, childrenMap) { _assertHealthyVdiChain (vdi, childrenMap) {
@ -816,8 +773,9 @@ export default class Xapi extends XapiBase {
} }
// Create a snapshot of the VM and returns a delta export object. // Create a snapshot of the VM and returns a delta export object.
@cancellable
@deferrable.onFailure @deferrable.onFailure
async exportDeltaVm ($onFailure, vmId, baseVmId = undefined, { async exportDeltaVm ($onFailure, $cancelToken, vmId, baseVmId = undefined, {
snapshotNameLabel = undefined, snapshotNameLabel = undefined,
// Contains a vdi.$id set of vmId. // Contains a vdi.$id set of vmId.
fullVdisRequired = [], fullVdisRequired = [],
@ -830,7 +788,7 @@ export default class Xapi extends XapiBase {
if (snapshotNameLabel) { if (snapshotNameLabel) {
this._setObjectProperties(vm, { this._setObjectProperties(vm, {
nameLabel: snapshotNameLabel nameLabel: snapshotNameLabel
})::pCatch(noop) })::ignoreErrors()
} }
const baseVm = baseVmId && this.getObject(baseVmId) const baseVm = baseVmId && this.getObject(baseVmId)
@ -868,7 +826,7 @@ export default class Xapi extends XapiBase {
// //
// The snapshot must not exist otherwise it could break the // The snapshot must not exist otherwise it could break the
// next export. // next export.
this._deleteVdi(vdi)::pCatch(noop) this._deleteVdi(vdi)::ignoreErrors()
return return
} }
@ -896,8 +854,8 @@ export default class Xapi extends XapiBase {
...vdi, ...vdi,
$SR$uuid: vdi.$SR.uuid $SR$uuid: vdi.$SR.uuid
} }
const stream = streams[`${vdiRef}.vhd`] = this._exportVdi(vdi, baseVdi, VDI_FORMAT_VHD) const stream = streams[`${vdiRef}.vhd`] = this._exportVdi($cancelToken, vdi, baseVdi, VDI_FORMAT_VHD)
$onFailure(() => stream.cancel()) $onFailure(stream.cancel)
}) })
const vifs = {} const vifs = {}
@ -988,10 +946,10 @@ export default class Xapi extends XapiBase {
]) ])
// 2. Delete all VBDs which may have been created by the import. // 2. Delete all VBDs which may have been created by the import.
await Promise.all(mapToArray( await asyncMap(
vm.$VBDs, vm.$VBDs,
vbd => this._deleteVbd(vbd)::pCatch(noop) vbd => this._deleteVbd(vbd)
)) )::ignoreErrors()
// 3. Create VDIs. // 3. Create VDIs.
const newVdis = await map(delta.vdis, async vdi => { const newVdis = await map(delta.vdis, async vdi => {
@ -1078,7 +1036,7 @@ export default class Xapi extends XapiBase {
]) ])
if (deleteBase && baseVm) { if (deleteBase && baseVm) {
this._deleteVm(baseVm)::pCatch(noop) this._deleteVm(baseVm)::ignoreErrors()
} }
await Promise.all([ await Promise.all([
@ -1220,16 +1178,13 @@ export default class Xapi extends XapiBase {
} }
async _importVm (stream, sr, onlyMetadata = false, onVmCreation = undefined) { async _importVm (stream, sr, onlyMetadata = false, onVmCreation = undefined) {
const taskRef = await this._createTask('VM import') const taskRef = await this.createTask('VM import')
const query = { const query = {
force: onlyMetadata force: onlyMetadata
? 'true'
: undefined,
task_id: taskRef
} }
let host let host
if (sr) { if (sr != null) {
host = sr.$PBDs[0].$host host = sr.$PBDs[0].$host
query.sr_id = sr.$ref query.sr_id = sr.$ref
} }
@ -1237,17 +1192,18 @@ export default class Xapi extends XapiBase {
if (onVmCreation) { if (onVmCreation) {
this._waitObject( this._waitObject(
obj => obj && obj.current_operations && taskRef in obj.current_operations obj => obj && obj.current_operations && taskRef in obj.current_operations
).then(onVmCreation)::pCatch(noop) ).then(onVmCreation)::ignoreErrors()
} }
const [ vmRef ] = await Promise.all([ const vmRef = await this.putResource(
this._watchTask(taskRef).then(extractOpaqueRef),
this.putResource(
stream, stream,
onlyMetadata ? '/import_metadata/' : '/import/', onlyMetadata ? '/import_metadata/' : '/import/',
{ host, query } {
) host,
]) query,
task: taskRef
}
).then(extractOpaqueRef)
// Importing a metadata archive of running VMs is currently // Importing a metadata archive of running VMs is currently
// broken: its VBDs are incorrectly seen as attached. // broken: its VBDs are incorrectly seen as attached.
@ -1414,7 +1370,7 @@ export default class Xapi extends XapiBase {
let ref let ref
try { try {
ref = await this.call('VM.snapshot_with_quiesce', vm.$ref, nameLabel) ref = await this.call('VM.snapshot_with_quiesce', vm.$ref, nameLabel)
this.addTag(ref, 'quiesce')::pCatch(noop) // ignore any failures this.addTag(ref, 'quiesce')::ignoreErrors()
await this._waitObjectState(ref, vm => includes(vm.tags, 'quiesce')) await this._waitObjectState(ref, vm => includes(vm.tags, 'quiesce'))
} catch (error) { } catch (error) {
@ -1544,10 +1500,10 @@ export default class Xapi extends XapiBase {
} finally { } finally {
this._setObjectProperties(vm, { this._setObjectProperties(vm, {
PV_bootloader: bootloader PV_bootloader: bootloader
})::pCatch(noop) })::ignoreErrors()
forEach(bootables, ([ vbd, bootable ]) => { forEach(bootables, ([ vbd, bootable ]) => {
this._setObjectProperties(vbd, { bootable })::pCatch(noop) this._setObjectProperties(vbd, { bootable })::ignoreErrors()
}) })
} }
} }
@ -1748,7 +1704,7 @@ export default class Xapi extends XapiBase {
throw error throw error
} }
await this.call('VBD.eject', cdDrive.$ref)::pCatch(noop) await this.call('VBD.eject', cdDrive.$ref)::ignoreErrors()
// Retry. // Retry.
await this.call('VBD.insert', cdDrive.$ref, cd.$ref) await this.call('VBD.insert', cdDrive.$ref, cd.$ref)
@ -1794,7 +1750,7 @@ export default class Xapi extends XapiBase {
} }
async _deleteVbd (vbd) { async _deleteVbd (vbd) {
await this._disconnectVbd(vbd)::pCatch(noop) await this._disconnectVbd(vbd)::ignoreErrors()
await this.call('VBD.destroy', vbd.$ref) await this.call('VBD.destroy', vbd.$ref)
} }
@ -1806,7 +1762,7 @@ export default class Xapi extends XapiBase {
async destroyVbdsFromVm (vmId) { async destroyVbdsFromVm (vmId) {
await Promise.all( await Promise.all(
mapToArray(this.getObject(vmId).$VBDs, async vbd => { mapToArray(this.getObject(vmId).$VBDs, async vbd => {
await this.disconnectVbd(vbd.$ref)::pCatch(noop) await this.disconnectVbd(vbd.$ref)::ignoreErrors()
return this.call('VBD.destroy', vbd.$ref) return this.call('VBD.destroy', vbd.$ref)
}) })
) )
@ -1855,13 +1811,11 @@ export default class Xapi extends XapiBase {
} }
@cancellable @cancellable
async _exportVdi ($cancelToken, vdi, base, format = VDI_FORMAT_VHD) { _exportVdi ($cancelToken, vdi, base, format = VDI_FORMAT_VHD) {
const host = vdi.$SR.$PBDs[0].$host const host = vdi.$SR.$PBDs[0].$host
const taskRef = await this._createTask('VDI Export', vdi.name_label)
const query = { const query = {
format, format,
task_id: taskRef,
vdi: vdi.$ref vdi: vdi.$ref
} }
if (base) { if (base) {
@ -1873,14 +1827,10 @@ export default class Xapi extends XapiBase {
: '' : ''
}`) }`)
const task = this._watchTask(taskRef)
return this.getResource($cancelToken, '/export_raw_vdi/', { return this.getResource($cancelToken, '/export_raw_vdi/', {
host, host,
query query,
}).then(response => { task: this.createTask('VDI Export', vdi.name_label)
response.task = task
return response
}) })
} }
@ -1898,35 +1848,31 @@ export default class Xapi extends XapiBase {
// ----------------------------------------------------------------- // -----------------------------------------------------------------
async _importVdiContent (vdi, stream, format = VDI_FORMAT_VHD) { async _importVdiContent (vdi, body, format = VDI_FORMAT_VHD) {
const taskRef = await this._createTask('VDI Content Import', vdi.name_label)
const pbd = find(vdi.$SR.$PBDs, 'currently_attached') const pbd = find(vdi.$SR.$PBDs, 'currently_attached')
if (!pbd) { if (pbd === undefined) {
throw new Error('no valid PBDs found') throw new Error('no valid PBDs found')
} }
const task = this._watchTask(taskRef)
await Promise.all([ await Promise.all([
stream.checksumVerified, body.checksumVerified,
task, this.putResource(body, '/import_raw_vdi/', {
this.putResource(stream, '/import_raw_vdi/', {
host: pbd.host, host: pbd.host,
query: { query: {
format, format,
task_id: taskRef,
vdi: vdi.$ref vdi: vdi.$ref
} },
task: this.createTask('VDI Content Import', vdi.name_label)
}) })
]) ])
} }
importVdiContent (vdiId, stream, { importVdiContent (vdiId, body, {
format format
} = {}) { } = {}) {
return this._importVdiContent( return this._importVdiContent(
this.getObject(vdiId), this.getObject(vdiId),
stream, body,
format format
) )
} }
@ -2036,7 +1982,7 @@ export default class Xapi extends XapiBase {
const newPifs = await this.call('pool.create_VLAN_from_PIF', physPif.$ref, pif.network, asInteger(vlan)) const newPifs = await this.call('pool.create_VLAN_from_PIF', physPif.$ref, pif.network, asInteger(vlan))
await Promise.all( await Promise.all(
mapToArray(newPifs, pifRef => mapToArray(newPifs, pifRef =>
!wasAttached[this.getObject(pifRef).host] && this.call('PIF.unplug', pifRef)::pCatch(noop) !wasAttached[this.getObject(pifRef).host] && this.call('PIF.unplug', pifRef)::ignoreErrors()
) )
) )
} }
@ -2140,25 +2086,36 @@ export default class Xapi extends XapiBase {
} }
// Generic Config Drive // Generic Config Drive
async createCloudInitConfigDrive (vmId, srId, config) { @deferrable.onFailure
async createCloudInitConfigDrive ($onFailure, vmId, srId, config) {
const vm = this.getObject(vmId) const vm = this.getObject(vmId)
const sr = this.getObject(srId) const sr = this.getObject(srId)
// First, create a small VDI (10MB) which will become the ConfigDrive // First, create a small VDI (10MB) which will become the ConfigDrive
const buffer = fatfsBufferInit() const buffer = fatfsBufferInit()
const vdi = await this.createVdi(buffer.length, { name_label: 'XO CloudConfigDrive', name_description: undefined, sr: sr.$ref }) const vdi = await this.createVdi(buffer.length, { name_label: 'XO CloudConfigDrive', name_description: undefined, sr: sr.$ref })
$onFailure(() => this._deleteVdi(vdi))
// Then, generate a FAT fs // Then, generate a FAT fs
const fs = promisifyAll(fatfs.createFileSystem(fatfsBuffer(buffer))) const fs = promisifyAll(fatfs.createFileSystem(fatfsBuffer(buffer)))
// Create Cloud config folders
await fs.mkdir('openstack') await fs.mkdir('openstack')
await fs.mkdir('openstack/latest') await fs.mkdir('openstack/latest')
// Create the meta_data file await Promise.all([
await fs.writeFile('openstack/latest/meta_data.json', '{\n "uuid": "' + vm.uuid + '"\n}\n') fs.writeFile(
// Create the user_data file 'openstack/latest/meta_data.json',
await fs.writeFile('openstack/latest/user_data', config) '{\n "uuid": "' + vm.uuid + '"\n}\n'
),
fs.writeFile('openstack/latest/user_data', config)
])
// ignore VDI_IO_ERROR errors, I (JFT) don't understand why they
// are emitted because it works
await this._importVdiContent(vdi, buffer, VDI_FORMAT_RAW)::pCatch(
{ code: 'VDI_IO_ERROR' },
console.warn
)
// Transform the buffer into a stream
await this._importVdiContent(vdi, buffer, VDI_FORMAT_RAW)
await this._createVbd(vm, vdi) await this._createVbd(vm, vdi)
} }

View File

@ -212,9 +212,9 @@ export default {
// platform_version < 2.1.1 ---------------------------------------- // platform_version < 2.1.1 ----------------------------------------
async uploadPoolPatch (stream, patchName = 'unknown') { async uploadPoolPatch (stream, patchName = 'unknown') {
const taskRef = await this._createTask('Patch upload', patchName) const taskRef = await this.createTask('Patch upload', patchName)
const task = this._watchTask(taskRef) const task = this.watchTask(taskRef)
const [ patchRef ] = await Promise.all([ const [ patchRef ] = await Promise.all([
task, task,
this.putResource(stream, '/pool_patch_upload', { this.putResource(stream, '/pool_patch_upload', {

View File

@ -1,16 +1,17 @@
import deferrable from 'golike-defer' import deferrable from 'golike-defer'
import find from 'lodash/find' import { ignoreErrors } from 'promise-toolbox'
import gte from 'lodash/gte' import {
import includes from 'lodash/includes' find,
import isEmpty from 'lodash/isEmpty' gte,
import lte from 'lodash/lte' includes,
isEmpty,
lte
} from 'lodash'
import { import {
forEach, forEach,
mapToArray, mapToArray,
noop, parseSize
parseSize,
pCatch
} from '../../utils' } from '../../utils'
import { import {
@ -64,7 +65,7 @@ export default {
// Removes disks from the provision XML, we will create them by // Removes disks from the provision XML, we will create them by
// ourselves. // ourselves.
await this.call('VM.remove_from_other_config', vmRef, 'disks')::pCatch(noop) await this.call('VM.remove_from_other_config', vmRef, 'disks')::ignoreErrors()
// Creates the VDIs and executes the initial steps of the // Creates the VDIs and executes the initial steps of the
// installation. // installation.
@ -386,9 +387,9 @@ export default {
if (snapshot.snapshot_info['power-state-at-snapshot'] === 'Running') { if (snapshot.snapshot_info['power-state-at-snapshot'] === 'Running') {
const vm = snapshot.$snapshot_of const vm = snapshot.$snapshot_of
if (vm.power_state === 'Halted') { if (vm.power_state === 'Halted') {
this.startVm(vm.$id)::pCatch(noop) this.startVm(vm.$id)::ignoreErrors()
} else if (vm.power_state === 'Suspended') { } else if (vm.power_state === 'Suspended') {
this.resumeVm(vm.$id)::pCatch(noop) this.resumeVm(vm.$id)::ignoreErrors()
} }
} }
}, },

View File

@ -1,11 +1,11 @@
import Token, { Tokens } from '../models/token'
import { noSuchObject } from 'xo-common/api-errors' import { noSuchObject } from 'xo-common/api-errors'
import { ignoreErrors } from 'promise-toolbox'
import Token, { Tokens } from '../models/token'
import { import {
createRawObject, createRawObject,
forEach, forEach,
generateToken, generateToken
pCatch,
noop
} from '../utils' } from '../utils'
// =================================================================== // ===================================================================
@ -180,7 +180,7 @@ export default class {
if (!( if (!(
token.expiration > Date.now() token.expiration > Date.now()
)) { )) {
this._tokens.remove(id)::pCatch(noop) this._tokens.remove(id)::ignoreErrors()
throw noSuchAuthenticationToken(id) throw noSuchAuthenticationToken(id)
} }

View File

@ -3,6 +3,7 @@ import escapeStringRegexp from 'escape-string-regexp'
import eventToPromise from 'event-to-promise' import eventToPromise from 'event-to-promise'
import execa from 'execa' import execa from 'execa'
import splitLines from 'split-lines' import splitLines from 'split-lines'
import { CancelToken, ignoreErrors } from 'promise-toolbox'
import { createParser as createPairsParser } from 'parse-pairs' import { createParser as createPairsParser } from 'parse-pairs'
import { createReadStream, readdir, stat } from 'fs' import { createReadStream, readdir, stat } from 'fs'
import { satisfies as versionSatisfies } from 'semver' import { satisfies as versionSatisfies } from 'semver'
@ -34,7 +35,6 @@ import {
mapFilter, mapFilter,
mapToArray, mapToArray,
noop, noop,
pCatch,
pFinally, pFinally,
pFromCallback, pFromCallback,
pSettle, pSettle,
@ -416,17 +416,12 @@ export default class {
// 2. Copy. // 2. Copy.
let size = 0 let size = 0
const dstVm = await (async () => { const dstVm = await (async () => {
const delta = await srcXapi.exportDeltaVm(srcVm.$id, localBaseUuid, { const { cancel, token } = CancelToken.source()
const delta = await srcXapi.exportDeltaVm(token, srcVm.$id, localBaseUuid, {
snapshotNameLabel: `XO_DELTA_EXPORT: ${targetSr.name_label} (${targetSr.uuid})` snapshotNameLabel: `XO_DELTA_EXPORT: ${targetSr.name_label} (${targetSr.uuid})`
}) })
$onFailure(async () => { $onFailure(() => srcXapi.deleteVm(delta.vm.uuid))
await Promise.all(mapToArray( $onFailure(cancel)
delta.streams,
stream => stream.cancel()::pCatch(noop)
))
return srcXapi.deleteVm(delta.vm.uuid)::pCatch(noop)
})
delta.vm.name_label += ` (${shortDate(Date.now())})` delta.vm.name_label += ` (${shortDate(Date.now())})`
@ -435,7 +430,9 @@ export default class {
const sizeStream = createSizeStream().once('finish', () => { const sizeStream = createSizeStream().once('finish', () => {
size += sizeStream.size size += sizeStream.size
}) })
delta.streams[id] = delta.streams[id].pipe(sizeStream) delta.streams[id] = delta.streams[id].on('end', () => {
console.log(`${id} end`)
}).pipe(sizeStream)
}) })
const promise = targetXapi.importDeltaVm( const promise = targetXapi.importDeltaVm(
@ -449,7 +446,7 @@ export default class {
// Once done, (asynchronously) remove the (now obsolete) local // Once done, (asynchronously) remove the (now obsolete) local
// base. // base.
if (localBaseUuid) { if (localBaseUuid) {
promise.then(() => srcXapi.deleteVm(localBaseUuid))::pCatch(noop) promise.then(() => srcXapi.deleteVm(localBaseUuid))::ignoreErrors()
} }
// (Asynchronously) Identify snapshot as future base. // (Asynchronously) Identify snapshot as future base.
@ -457,7 +454,7 @@ export default class {
return srcXapi._updateObjectMapProperty(srcVm, 'other_config', { return srcXapi._updateObjectMapProperty(srcVm, 'other_config', {
[TAG_LAST_BASE_DELTA]: delta.vm.uuid [TAG_LAST_BASE_DELTA]: delta.vm.uuid
}) })
})::pCatch(noop) })::ignoreErrors()
return promise return promise
})() })()
@ -687,7 +684,7 @@ export default class {
filter(vdiParent.$snapshots, { name_label: 'XO_DELTA_BASE_VDI_SNAPSHOT' }), filter(vdiParent.$snapshots, { name_label: 'XO_DELTA_BASE_VDI_SNAPSHOT' }),
base => base.snapshot_time base => base.snapshot_time
) )
forEach(bases, base => { xapi.deleteVdi(base.$id)::pCatch(noop) }) forEach(bases, base => { xapi.deleteVdi(base.$id)::ignoreErrors() })
// Export full or delta backup. // Export full or delta backup.
const vdiFilename = `${date}_${isFull ? 'full' : 'delta'}.vhd` const vdiFilename = `${date}_${isFull ? 'full' : 'delta'}.vhd`
@ -717,7 +714,7 @@ export default class {
]) ])
} catch (error) { } catch (error) {
// Remove new backup. (corrupt). // Remove new backup. (corrupt).
await handler.unlink(backupFullPath)::pCatch(noop) await handler.unlink(backupFullPath)::ignoreErrors()
throw error throw error
} }
@ -740,7 +737,7 @@ export default class {
// Remove xva file. // Remove xva file.
// Version 0.0.0 (Legacy) Delta Backup. // Version 0.0.0 (Legacy) Delta Backup.
handler.unlink(`${dir}/${getDeltaBackupNameWithoutExt(backup)}.xva`)::pCatch(noop) handler.unlink(`${dir}/${getDeltaBackupNameWithoutExt(backup)}.xva`)::ignoreErrors()
])) ]))
} }
} }
@ -758,7 +755,7 @@ export default class {
base => base.snapshot_time base => base.snapshot_time
) )
const baseVm = bases.pop() const baseVm = bases.pop()
forEach(bases, base => { xapi.deleteVm(base.$id)::pCatch(noop) }) forEach(bases, base => { xapi.deleteVm(base.$id)::ignoreErrors() })
// Check backup dirs. // Check backup dirs.
const dir = `vm_delta_${tag}_${vm.uuid}` const dir = `vm_delta_${tag}_${vm.uuid}`
@ -781,20 +778,14 @@ export default class {
) )
// Export... // Export...
const delta = await xapi.exportDeltaVm(vm.$id, baseVm && baseVm.$id, { const { cancel, token } = CancelToken.source()
const delta = await xapi.exportDeltaVm(token, vm.$id, baseVm && baseVm.$id, {
snapshotNameLabel: `XO_DELTA_BASE_VM_SNAPSHOT_${tag}`, snapshotNameLabel: `XO_DELTA_BASE_VM_SNAPSHOT_${tag}`,
fullVdisRequired, fullVdisRequired,
disableBaseTags: true disableBaseTags: true
}) })
$onFailure(() => xapi.deleteVm(delta.vm.uuid))
$onFailure(async () => { $onFailure(cancel)
await Promise.all(mapToArray(
delta.streams,
stream => stream.cancel()::pCatch(noop)
))
return xapi.deleteVm(delta.vm.uuid)::pCatch(noop)
})
// Save vdis. // Save vdis.
const vdiBackups = await pSettle( const vdiBackups = await pSettle(
@ -834,7 +825,7 @@ export default class {
} }
$onFailure(() => asyncMap(fulFilledVdiBackups, vdiBackup => $onFailure(() => asyncMap(fulFilledVdiBackups, vdiBackup =>
handler.unlink(`${dir}/${vdiBackup.value()}`)::pCatch(noop) handler.unlink(`${dir}/${vdiBackup.value()}`)::ignoreErrors()
)) ))
if (error) { if (error) {
@ -869,7 +860,7 @@ export default class {
await this._removeOldDeltaVmBackups(xapi, { vm, handler, dir, retention }) await this._removeOldDeltaVmBackups(xapi, { vm, handler, dir, retention })
if (baseVm) { if (baseVm) {
xapi.deleteVm(baseVm.$id)::pCatch(noop) xapi.deleteVm(baseVm.$id)::ignoreErrors()
} }
return { return {
@ -994,7 +985,7 @@ export default class {
_removeVms (xapi, vms) { _removeVms (xapi, vms) {
return Promise.all(mapToArray(vms, vm => return Promise.all(mapToArray(vms, vm =>
// Do not consider a failure to delete an old copy as a fatal error. // Do not consider a failure to delete an old copy as a fatal error.
xapi.deleteVm(vm.$id)::pCatch(noop) xapi.deleteVm(vm.$id)::ignoreErrors()
)) ))
} }
@ -1134,7 +1125,7 @@ export default class {
const entriesMap = {} const entriesMap = {}
await Promise.all(mapToArray(entries, async name => { await Promise.all(mapToArray(entries, async name => {
const stats = await pFromCallback(cb => stat(`${path}/${name}`, cb))::pCatch(noop) const stats = await pFromCallback(cb => stat(`${path}/${name}`, cb))::ignoreErrors()
if (stats) { if (stats) {
entriesMap[stats.isDirectory() ? `${name}/` : name] = {} entriesMap[stats.isDirectory() ? `${name}/` : name] = {}
} }

View File

@ -1,5 +1,5 @@
import filter from 'lodash/filter' import { filter, includes } from 'lodash'
import includes from 'lodash/includes' import { ignoreErrors } from 'promise-toolbox'
import { import {
hash, hash,
needsRehash, needsRehash,
@ -20,9 +20,7 @@ import {
forEach, forEach,
isEmpty, isEmpty,
lightSet, lightSet,
mapToArray, mapToArray
noop,
pCatch
} from '../utils' } from '../utils'
// =================================================================== // ===================================================================
@ -106,15 +104,15 @@ export default class {
this._xo.getAuthenticationTokensForUser(id) this._xo.getAuthenticationTokensForUser(id)
.then(tokens => { .then(tokens => {
forEach(tokens, token => { forEach(tokens, token => {
this._xo.deleteAuthenticationToken(id)::pCatch(noop) this._xo.deleteAuthenticationToken(id)::ignoreErrors()
}) })
}) })
::pCatch(noop) // Ignore any failures. ::ignoreErrors()
// Remove ACLs for this user. // Remove ACLs for this user.
this._xo.getAclsForSubject(id).then(acls => { this._xo.getAclsForSubject(id).then(acls => {
forEach(acls, acl => { forEach(acls, acl => {
this._xo.removeAcl(id, acl.object, acl.action)::pCatch(noop) this._xo.removeAcl(id, acl.object, acl.action)::ignoreErrors()
}) })
}) })
@ -122,7 +120,7 @@ export default class {
forEach(user.groups, groupId => { forEach(user.groups, groupId => {
this.getGroup(groupId) this.getGroup(groupId)
.then(group => this._removeUserFromGroup(id, group)) .then(group => this._removeUserFromGroup(id, group))
::pCatch(noop) // Ignore any failures. ::ignoreErrors()
}) })
} }
@ -268,7 +266,7 @@ export default class {
// Remove ACLs for this group. // Remove ACLs for this group.
this._xo.getAclsForSubject(id).then(acls => { this._xo.getAclsForSubject(id).then(acls => {
forEach(acls, acl => { forEach(acls, acl => {
this._xo.removeAcl(id, acl.object, acl.action)::pCatch(noop) this._xo.removeAcl(id, acl.object, acl.action)::ignoreErrors()
}) })
}) })
@ -276,7 +274,7 @@ export default class {
forEach(group.users, userId => { forEach(group.users, userId => {
this.getUser(userId) this.getUser(userId)
.then(user => this._removeGroupFromUser(id, user)) .then(user => this._removeGroupFromUser(id, user))
::pCatch(noop) // Ignore any failures. ::ignoreErrors()
}) })
} }

View File

@ -1,3 +1,4 @@
import { ignoreErrors } from 'promise-toolbox'
import { noSuchObject } from 'xo-common/api-errors' import { noSuchObject } from 'xo-common/api-errors'
import Xapi from '../xapi' import Xapi from '../xapi'
@ -9,8 +10,6 @@ import {
forEach, forEach,
isEmpty, isEmpty,
isString, isString,
noop,
pCatch,
popProperty, popProperty,
serializeError serializeError
} from '../utils' } from '../utils'
@ -81,7 +80,7 @@ export default class {
} }
async unregisterXenServer (id) { async unregisterXenServer (id) {
this.disconnectXenServer(id)::pCatch(noop) this.disconnectXenServer(id)::ignoreErrors()
if (!await this._servers.remove(id)) { if (!await this._servers.remove(id)) {
throw noSuchObject(id, 'xenServer') throw noSuchObject(id, 'xenServer')

View File

@ -3323,9 +3323,9 @@ http-proxy@^1.16.2:
eventemitter3 "1.x.x" eventemitter3 "1.x.x"
requires-port "1.x.x" requires-port "1.x.x"
http-request-plus@^0.3.0: http-request-plus@^0.4.0:
version "0.3.0" version "0.4.0"
resolved "https://registry.yarnpkg.com/http-request-plus/-/http-request-plus-0.3.0.tgz#4ee822f65006485af0362dd70c5fd145f885dd0c" resolved "https://registry.yarnpkg.com/http-request-plus/-/http-request-plus-0.4.0.tgz#f9cfe550094c82d4a8d93b6252ff8f37349a8123"
dependencies: dependencies:
is-redirect "^1.0.0" is-redirect "^1.0.0"
lodash "^4.17.4" lodash "^4.17.4"
@ -5399,9 +5399,9 @@ promise-toolbox@^0.8.0:
dependencies: dependencies:
make-error "^1.2.3" make-error "^1.2.3"
promise-toolbox@^0.9.1, promise-toolbox@^0.9.2, promise-toolbox@^0.9.4: promise-toolbox@^0.9.1, promise-toolbox@^0.9.5:
version "0.9.4" version "0.9.5"
resolved "https://registry.yarnpkg.com/promise-toolbox/-/promise-toolbox-0.9.4.tgz#672fe37557eea05093f770e7e1fca64e39ae9238" resolved "https://registry.yarnpkg.com/promise-toolbox/-/promise-toolbox-0.9.5.tgz#ca33e53714cfde924a9bd3d2d23c53b21cb75acc"
dependencies: dependencies:
make-error "^1.2.3" make-error "^1.2.3"
@ -7071,23 +7071,23 @@ xdg-basedir@^2.0.0:
dependencies: dependencies:
os-homedir "^1.0.0" os-homedir "^1.0.0"
xen-api@^0.13.1: xen-api@^0.13.5:
version "0.13.1" version "0.13.5"
resolved "https://registry.yarnpkg.com/xen-api/-/xen-api-0.13.1.tgz#471aedff714460e888189b07d04610bf762971df" resolved "https://registry.yarnpkg.com/xen-api/-/xen-api-0.13.5.tgz#4edcf1b9a83c87df2bd94e02e860aaef85796157"
dependencies: dependencies:
babel-polyfill "^6.23.0" babel-polyfill "^6.23.0"
blocked "^1.2.1" blocked "^1.2.1"
debug "^2.6.8" debug "^2.6.8"
event-to-promise "^0.8.0" event-to-promise "^0.8.0"
exec-promise "^0.7.0" exec-promise "^0.7.0"
http-request-plus "^0.3.0" http-request-plus "^0.4.0"
json-rpc-protocol "^0.11.2" json-rpc-protocol "^0.11.2"
kindof "^2.0.0" kindof "^2.0.0"
lodash "^4.17.4" lodash "^4.17.4"
make-error "^1.3.0" make-error "^1.3.0"
minimist "^1.2.0" minimist "^1.2.0"
ms "^2.0.0" ms "^2.0.0"
promise-toolbox "^0.9.4" promise-toolbox "^0.9.5"
pw "0.0.4" pw "0.0.4"
xmlrpc "^1.3.2" xmlrpc "^1.3.2"
xo-collection "^0.4.1" xo-collection "^0.4.1"