diff --git a/package.json b/package.json index 098fd6b64..f84a04cc7 100644 --- a/package.json +++ b/package.json @@ -68,7 +68,7 @@ "helmet": "^3.6.0", "highland": "^2.10.5", "http-proxy": "^1.16.2", - "http-request-plus": "^0.3.0", + "http-request-plus": "^0.4.0", "http-server-plus": "^0.8.0", "human-format": "^0.8.0", "is-redirect": "^1.0.0", @@ -95,7 +95,7 @@ "passport": "^0.3.2", "passport-local": "^1.0.0", "pretty-format": "^20.0.1", - "promise-toolbox": "^0.9.2", + "promise-toolbox": "^0.9.5", "proxy-agent": "^2.0.0", "pug": "^2.0.0-rc.2", "pw": "^0.0.4", @@ -110,7 +110,7 @@ "tmp": "^0.0.31", "uuid": "^3.0.1", "ws": "^3.0.0", - "xen-api": "^0.13.1", + "xen-api": "^0.13.5", "xml2js": "~0.4.17", "xo-acl-resolver": "^0.2.3", "xo-collection": "^0.4.1", diff --git a/src/api/server.js b/src/api/server.js index f51d50f32..902c32b91 100644 --- a/src/api/server.js +++ b/src/api/server.js @@ -1,14 +1,10 @@ -import { - noop, - pCatch -} from '../utils' +import { ignoreErrors } from 'promise-toolbox' export async function add ({autoConnect = true, ...props}) { const server = await this.registerXenServer(props) if (autoConnect) { - // Connect asynchronously, ignore any errors. - this.connectXenServer(server.id)::pCatch(noop) + this.connectXenServer(server.id)::ignoreErrors() } return server.id @@ -109,7 +105,7 @@ set.params = { // ------------------------------------------------------------------- export async function connect ({id}) { - this.updateXenServer(id, {enabled: true})::pCatch(noop) + this.updateXenServer(id, {enabled: true})::ignoreErrors() await this.connectXenServer(id) } @@ -126,7 +122,7 @@ connect.params = { // ------------------------------------------------------------------- export async function disconnect ({id}) { - this.updateXenServer(id, {enabled: false})::pCatch(noop) + this.updateXenServer(id, {enabled: false})::ignoreErrors() await this.disconnectXenServer(id) } diff --git a/src/api/vif.js b/src/api/vif.js index b385f7fa8..071c314f1 100644 --- a/src/api/vif.js +++ b/src/api/vif.js @@ -1,8 +1,6 @@ -import { - diffItems, - noop, - pCatch -} from '../utils' +import { ignoreErrors } from 'promise-toolbox' + +import { diffItems } from '../utils' // =================================================================== @@ -12,7 +10,7 @@ async function delete_ ({vif}) { vif.id, null, vif.allowedIpv4Addresses.concat(vif.allowedIpv6Addresses) - )::pCatch(noop) + )::ignoreErrors() await this.getXapi(vif).deleteVif(vif._xapiId) } diff --git a/src/api/vm.coffee b/src/api/vm.coffee index b4163d28e..e639c02ac 100644 --- a/src/api/vm.coffee +++ b/src/api/vm.coffee @@ -12,12 +12,13 @@ sortBy = require 'lodash/sortBy' startsWith = require 'lodash/startsWith' {coroutine: $coroutine} = require 'bluebird' {format} = require 'json-rpc-peer' - +{ ignoreErrors } = require('promise-toolbox') { forbiddenOperation, invalidParameters, unauthorized } = require('xo-common/api-errors') + { forEach, formatXml: $js2xml, @@ -25,10 +26,8 @@ startsWith = require 'lodash/startsWith' map, mapFilter, mapToArray, - noop, parseSize, parseXml, - pCatch, pFinally } = require '../utils' {isVmRunning: $isVmRunning} = require('../xapi') @@ -174,7 +173,7 @@ create = $coroutine (params) -> ) if params.bootAfterCreate - pCatch.call(xapi.startVm(vm._xapiId), noop) + ignoreErrors.call(xapi.startVm(vm._xapiId)) return vm.id @@ -324,9 +323,8 @@ delete_ = $coroutine ({vm, delete_disks: deleteDisks = false }) -> @getAllAcls().then((acls) => Promise.all(mapFilter(acls, (acl) => if (acl.object == vm.id) - return pCatch.call( - @removeAcl(acl.subject, acl.object, acl.action), - noop + return ignoreErrors.call( + @removeAcl(acl.subject, acl.object, acl.action) ) )) ) @@ -334,13 +332,12 @@ delete_ = $coroutine ({vm, delete_disks: deleteDisks = false }) -> # Update IP pools yield Promise.all(map(vm.VIFs, (vifId) => vif = xapi.getObject(vifId) - return pCatch.call( + return ignoreErrors.call( this.allocIpAddresses( vifId, null, concat(vif.ipv4_allowed, vif.ipv6_allowed) - ), - noop + ) ) )) @@ -364,12 +361,11 @@ delete_ = $coroutine ({vm, delete_disks: deleteDisks = false }) -> resourceSetUsage = @computeVmResourcesUsage(vm) ipPoolsUsage = yield @computeVmIpPoolsUsage(vm) - pCatch.call( + ignoreErrors.call( @releaseLimitsInResourceSet( merge(resourceSetUsage, ipPoolsUsage), resourceSet - ), - noop + ) ) return xapi.deleteVm(vm._xapiId, deleteDisks) @@ -1172,7 +1168,7 @@ createInterface = $coroutine ({ { push } = ipAddresses = [] push.apply(ipAddresses, allowedIpv4Addresses) if allowedIpv4Addresses push.apply(ipAddresses, allowedIpv6Addresses) if allowedIpv6Addresses - pCatch.call(@allocIpAddresses(vif.$id, allo), noop) if ipAddresses.length + ignoreErrors.call(@allocIpAddresses(vif.$id, allo)) if ipAddresses.length return vif.$id diff --git a/src/remote-handlers/abstract.js b/src/remote-handlers/abstract.js index 15a2bb13e..3f66af2f2 100644 --- a/src/remote-handlers/abstract.js +++ b/src/remote-handlers/abstract.js @@ -1,5 +1,6 @@ import eventToPromise from 'event-to-promise' import through2 from 'through2' +import { ignoreErrors } from 'promise-toolbox' import { parse @@ -8,8 +9,6 @@ import { import { addChecksumToReadStream, getPseudoRandomBytes, - noop, - pCatch, streamToBuffer, validChecksumOfReadStream } from '../utils' @@ -70,7 +69,7 @@ export default class RemoteHandlerAbstract { error: error.message || String(error) } } finally { - this.unlink(testFileName).catch(noop) + this.unlink(testFileName)::ignoreErrors() } } @@ -127,9 +126,12 @@ export default class RemoteHandlerAbstract { options.end === undefined && options.start === undefined ) { - promise = Promise.all([ promise, this.getSize(file).then(size => { - stream.length = size - }, noop) ]) + promise = Promise.all([ + promise, + this.getSize(file).then(size => { + stream.length = size + })::ignoreErrors() + ]) } return promise.then(() => stream) @@ -140,7 +142,7 @@ export default class RemoteHandlerAbstract { } // avoid a unhandled rejection warning - streamP.catch(noop) + streamP::ignoreErrors() return this.readFile(`${file}.checksum`).then( checksum => streamP.then(stream => { @@ -206,7 +208,7 @@ export default class RemoteHandlerAbstract { checksum = true } = {}) { if (checksum) { - this._unlink(`${file}.checksum`)::pCatch(noop) + this._unlink(`${file}.checksum`)::ignoreErrors() } return this._unlink(file) diff --git a/src/utils.js b/src/utils.js index aa40b6933..bd77d4bc6 100644 --- a/src/utils.js +++ b/src/utils.js @@ -367,7 +367,6 @@ export function pSettle (promises) { export { all as pAll, - catchPlus as pCatch, delay as pDelay, fromCallback as pFromCallback, isPromise, diff --git a/src/xapi-stats.js b/src/xapi-stats.js index 4f9077613..bfb18d783 100644 --- a/src/xapi-stats.js +++ b/src/xapi-stats.js @@ -391,7 +391,7 @@ export default class XapiStats { json: 'true', start: timestamp } - }).readAll().then(JSON5.parse) + }).then(response => response.readAll().then(JSON5.parse)) } async _getLastTimestamp (xapi, host, step) { diff --git a/src/xapi/index.js b/src/xapi/index.js index 48a5fbf4d..70b9afc07 100644 --- a/src/xapi/index.js +++ b/src/xapi/index.js @@ -4,7 +4,7 @@ import fatfs from 'fatfs' import synchronized from 'decorator-synchronized' import tarStream from 'tar-stream' import vmdkToVhd from 'xo-vmdk-to-vhd' -import { cancellable, defer } from 'promise-toolbox' +import { cancellable, catchPlus as pCatch, defer, ignoreErrors } from 'promise-toolbox' import { PassThrough } from 'stream' import { forbiddenOperation } from 'xo-common/api-errors' import { @@ -20,7 +20,6 @@ import { uniq } from 'lodash' import { - wrapError as wrapXapiError, Xapi as XapiBase } from 'xen-api' import { @@ -31,6 +30,7 @@ import createSizeStream from '../size-stream' import fatfsBuffer, { init as fatfsBufferInit } from '../fatfs-buffer' import { mixin } from '../decorators' import { + asyncMap, camelToSnakeCase, createRawObject, ensureArray, @@ -38,9 +38,7 @@ import { isFunction, map, mapToArray, - noop, pAll, - pCatch, pDelay, pFinally, promisifyAll, @@ -100,7 +98,6 @@ export default class Xapi extends XapiBase { const genericWatchers = this._genericWatchers = createRawObject() const objectsWatchers = this._objectWatchers = createRawObject() - const taskWatchers = this._taskWatchers = createRawObject() const onAddOrUpdate = objects => { forEach(objects, object => { @@ -123,21 +120,6 @@ export default class Xapi extends XapiBase { objectsWatchers[ref].resolve(object) delete objectsWatchers[ref] } - - // Watched task. - if (ref in taskWatchers) { - const {status} = object - - if (status === 'success') { - taskWatchers[ref].resolve(object.result) - } else if (status === 'failure') { - taskWatchers[ref].reject(wrapXapiError(object.error_info)) - } else { - return - } - - delete taskWatchers[ref] - } }) } this.objects.on('add', onAddOrUpdate) @@ -154,6 +136,10 @@ export default class Xapi extends XapiBase { return loop() } + createTask (name = 'untitled task', description) { + return super.createTask(`[XO] ${name}`, description) + } + // ================================================================= _registerGenericWatcher (fn) { @@ -228,36 +214,6 @@ export default class Xapi extends XapiBase { // ================================================================= - // Create a task. - async _createTask (name = 'untitled task', description = '') { - const ref = await this.call('task.create', `[XO] ${name}`, description) - debug('task created: %s (%s)', name, description) - - this._watchTask(ref)::pFinally(() => { - this.call('task.destroy', ref).then(() => { - debug('task destroyed: %s (%s)', name, description) - }) - }) - - return ref - } - - // Waits for a task to be resolved. - _watchTask (ref) { - // If a task object is passed, unpacked the ref. - if (typeof ref === 'object' && ref.$ref) ref = ref.$ref - - let watcher = this._taskWatchers[ref] - if (!watcher) { - // Register the watcher. - watcher = this._taskWatchers[ref] = defer() - } - - return watcher.promise - } - - // ================================================================= - _setObjectProperty (object, name, value) { return this.call( `${getNamespaceForType(object.$type)}.set_${camelToSnakeCase(name)}`, @@ -280,7 +236,7 @@ export default class Xapi extends XapiBase { if (value != null) { return this.call(`${namespace}.set_${camelToSnakeCase(name)}`, ref, prepareXapiParam(value)) } - }))::pCatch(noop) + }))::ignoreErrors() } async _updateObjectMapProperty (object, prop, values) { @@ -302,7 +258,7 @@ export default class Xapi extends XapiBase { return value === null ? removal - : removal::pCatch(noop).then(() => this.call(add, ref, name, prepareXapiParam(value))) + : removal::ignoreErrors().then(() => this.call(add, ref, name, prepareXapiParam(value))) } })) } @@ -719,14 +675,14 @@ export default class Xapi extends XapiBase { vdi.VBDs.length < 2 || every(vdi.$VBDs, vbd => vbd.VM === vm.$ref) ) { - return this._deleteVdi(vdi)::pCatch(noop) + return this._deleteVdi(vdi)::ignoreErrors() } console.error(`cannot delete VDI ${vdi.name_label} (from VM ${vm.name_label})`) })) } await Promise.all(mapToArray(vm.$snapshots, snapshot => - this.deleteVm(snapshot.$id)::pCatch(noop) + this.deleteVm(snapshot.$id)::ignoreErrors() )) await this.call('VM.destroy', vm.$ref) @@ -765,21 +721,22 @@ export default class Xapi extends XapiBase { snapshotRef = (await this._snapshotVm(vm)).$ref } - const taskRef = await this._createTask('VM Export', vm.name_label) - if (snapshotRef) { - this._watchTask(taskRef)::pFinally(() => { - this.deleteVm(snapshotRef)::pCatch(noop) - }) - } - - return this.getResource(onlyMetadata ? '/export_metadata/' : '/export/', { + const promise = this.getResource(onlyMetadata ? '/export_metadata/' : '/export/', { host, query: { ref: snapshotRef || vm.$ref, - task_id: taskRef, use_compression: compress ? 'true' : 'false' - } + }, + task: this.createTask('VM export', vm.name_label) }) + + if (snapshotRef !== undefined) { + promise.then(_ => _.task::pFinally(() => + this.deleteVm(snapshotRef)::ignoreErrors() + )) + } + + return promise } _assertHealthyVdiChain (vdi, childrenMap) { @@ -816,8 +773,9 @@ export default class Xapi extends XapiBase { } // Create a snapshot of the VM and returns a delta export object. + @cancellable @deferrable.onFailure - async exportDeltaVm ($onFailure, vmId, baseVmId = undefined, { + async exportDeltaVm ($onFailure, $cancelToken, vmId, baseVmId = undefined, { snapshotNameLabel = undefined, // Contains a vdi.$id set of vmId. fullVdisRequired = [], @@ -830,7 +788,7 @@ export default class Xapi extends XapiBase { if (snapshotNameLabel) { this._setObjectProperties(vm, { nameLabel: snapshotNameLabel - })::pCatch(noop) + })::ignoreErrors() } const baseVm = baseVmId && this.getObject(baseVmId) @@ -868,7 +826,7 @@ export default class Xapi extends XapiBase { // // The snapshot must not exist otherwise it could break the // next export. - this._deleteVdi(vdi)::pCatch(noop) + this._deleteVdi(vdi)::ignoreErrors() return } @@ -896,8 +854,8 @@ export default class Xapi extends XapiBase { ...vdi, $SR$uuid: vdi.$SR.uuid } - const stream = streams[`${vdiRef}.vhd`] = this._exportVdi(vdi, baseVdi, VDI_FORMAT_VHD) - $onFailure(() => stream.cancel()) + const stream = streams[`${vdiRef}.vhd`] = this._exportVdi($cancelToken, vdi, baseVdi, VDI_FORMAT_VHD) + $onFailure(stream.cancel) }) const vifs = {} @@ -988,10 +946,10 @@ export default class Xapi extends XapiBase { ]) // 2. Delete all VBDs which may have been created by the import. - await Promise.all(mapToArray( + await asyncMap( vm.$VBDs, - vbd => this._deleteVbd(vbd)::pCatch(noop) - )) + vbd => this._deleteVbd(vbd) + )::ignoreErrors() // 3. Create VDIs. const newVdis = await map(delta.vdis, async vdi => { @@ -1078,7 +1036,7 @@ export default class Xapi extends XapiBase { ]) if (deleteBase && baseVm) { - this._deleteVm(baseVm)::pCatch(noop) + this._deleteVm(baseVm)::ignoreErrors() } await Promise.all([ @@ -1220,16 +1178,13 @@ export default class Xapi extends XapiBase { } async _importVm (stream, sr, onlyMetadata = false, onVmCreation = undefined) { - const taskRef = await this._createTask('VM import') + const taskRef = await this.createTask('VM import') const query = { force: onlyMetadata - ? 'true' - : undefined, - task_id: taskRef } let host - if (sr) { + if (sr != null) { host = sr.$PBDs[0].$host query.sr_id = sr.$ref } @@ -1237,17 +1192,18 @@ export default class Xapi extends XapiBase { if (onVmCreation) { this._waitObject( obj => obj && obj.current_operations && taskRef in obj.current_operations - ).then(onVmCreation)::pCatch(noop) + ).then(onVmCreation)::ignoreErrors() } - const [ vmRef ] = await Promise.all([ - this._watchTask(taskRef).then(extractOpaqueRef), - this.putResource( - stream, - onlyMetadata ? '/import_metadata/' : '/import/', - { host, query } - ) - ]) + const vmRef = await this.putResource( + stream, + onlyMetadata ? '/import_metadata/' : '/import/', + { + host, + query, + task: taskRef + } + ).then(extractOpaqueRef) // Importing a metadata archive of running VMs is currently // broken: its VBDs are incorrectly seen as attached. @@ -1414,7 +1370,7 @@ export default class Xapi extends XapiBase { let ref try { ref = await this.call('VM.snapshot_with_quiesce', vm.$ref, nameLabel) - this.addTag(ref, 'quiesce')::pCatch(noop) // ignore any failures + this.addTag(ref, 'quiesce')::ignoreErrors() await this._waitObjectState(ref, vm => includes(vm.tags, 'quiesce')) } catch (error) { @@ -1544,10 +1500,10 @@ export default class Xapi extends XapiBase { } finally { this._setObjectProperties(vm, { PV_bootloader: bootloader - })::pCatch(noop) + })::ignoreErrors() forEach(bootables, ([ vbd, bootable ]) => { - this._setObjectProperties(vbd, { bootable })::pCatch(noop) + this._setObjectProperties(vbd, { bootable })::ignoreErrors() }) } } @@ -1748,7 +1704,7 @@ export default class Xapi extends XapiBase { throw error } - await this.call('VBD.eject', cdDrive.$ref)::pCatch(noop) + await this.call('VBD.eject', cdDrive.$ref)::ignoreErrors() // Retry. await this.call('VBD.insert', cdDrive.$ref, cd.$ref) @@ -1794,7 +1750,7 @@ export default class Xapi extends XapiBase { } async _deleteVbd (vbd) { - await this._disconnectVbd(vbd)::pCatch(noop) + await this._disconnectVbd(vbd)::ignoreErrors() await this.call('VBD.destroy', vbd.$ref) } @@ -1806,7 +1762,7 @@ export default class Xapi extends XapiBase { async destroyVbdsFromVm (vmId) { await Promise.all( mapToArray(this.getObject(vmId).$VBDs, async vbd => { - await this.disconnectVbd(vbd.$ref)::pCatch(noop) + await this.disconnectVbd(vbd.$ref)::ignoreErrors() return this.call('VBD.destroy', vbd.$ref) }) ) @@ -1855,13 +1811,11 @@ export default class Xapi extends XapiBase { } @cancellable - async _exportVdi ($cancelToken, vdi, base, format = VDI_FORMAT_VHD) { + _exportVdi ($cancelToken, vdi, base, format = VDI_FORMAT_VHD) { const host = vdi.$SR.$PBDs[0].$host - const taskRef = await this._createTask('VDI Export', vdi.name_label) const query = { format, - task_id: taskRef, vdi: vdi.$ref } if (base) { @@ -1873,14 +1827,10 @@ export default class Xapi extends XapiBase { : '' }`) - const task = this._watchTask(taskRef) return this.getResource($cancelToken, '/export_raw_vdi/', { host, - query - }).then(response => { - response.task = task - - return response + query, + task: this.createTask('VDI Export', vdi.name_label) }) } @@ -1898,35 +1848,31 @@ export default class Xapi extends XapiBase { // ----------------------------------------------------------------- - async _importVdiContent (vdi, stream, format = VDI_FORMAT_VHD) { - const taskRef = await this._createTask('VDI Content Import', vdi.name_label) - + async _importVdiContent (vdi, body, format = VDI_FORMAT_VHD) { const pbd = find(vdi.$SR.$PBDs, 'currently_attached') - if (!pbd) { + if (pbd === undefined) { throw new Error('no valid PBDs found') } - const task = this._watchTask(taskRef) await Promise.all([ - stream.checksumVerified, - task, - this.putResource(stream, '/import_raw_vdi/', { + body.checksumVerified, + this.putResource(body, '/import_raw_vdi/', { host: pbd.host, query: { format, - task_id: taskRef, vdi: vdi.$ref - } + }, + task: this.createTask('VDI Content Import', vdi.name_label) }) ]) } - importVdiContent (vdiId, stream, { + importVdiContent (vdiId, body, { format } = {}) { return this._importVdiContent( this.getObject(vdiId), - stream, + body, format ) } @@ -2036,7 +1982,7 @@ export default class Xapi extends XapiBase { const newPifs = await this.call('pool.create_VLAN_from_PIF', physPif.$ref, pif.network, asInteger(vlan)) await Promise.all( mapToArray(newPifs, pifRef => - !wasAttached[this.getObject(pifRef).host] && this.call('PIF.unplug', pifRef)::pCatch(noop) + !wasAttached[this.getObject(pifRef).host] && this.call('PIF.unplug', pifRef)::ignoreErrors() ) ) } @@ -2140,25 +2086,36 @@ export default class Xapi extends XapiBase { } // Generic Config Drive - async createCloudInitConfigDrive (vmId, srId, config) { + @deferrable.onFailure + async createCloudInitConfigDrive ($onFailure, vmId, srId, config) { const vm = this.getObject(vmId) const sr = this.getObject(srId) // First, create a small VDI (10MB) which will become the ConfigDrive const buffer = fatfsBufferInit() const vdi = await this.createVdi(buffer.length, { name_label: 'XO CloudConfigDrive', name_description: undefined, sr: sr.$ref }) + $onFailure(() => this._deleteVdi(vdi)) + // Then, generate a FAT fs const fs = promisifyAll(fatfs.createFileSystem(fatfsBuffer(buffer))) - // Create Cloud config folders + await fs.mkdir('openstack') await fs.mkdir('openstack/latest') - // Create the meta_data file - await fs.writeFile('openstack/latest/meta_data.json', '{\n "uuid": "' + vm.uuid + '"\n}\n') - // Create the user_data file - await fs.writeFile('openstack/latest/user_data', config) + await Promise.all([ + fs.writeFile( + 'openstack/latest/meta_data.json', + '{\n "uuid": "' + vm.uuid + '"\n}\n' + ), + fs.writeFile('openstack/latest/user_data', config) + ]) + + // ignore VDI_IO_ERROR errors, I (JFT) don't understand why they + // are emitted because it works + await this._importVdiContent(vdi, buffer, VDI_FORMAT_RAW)::pCatch( + { code: 'VDI_IO_ERROR' }, + console.warn + ) - // Transform the buffer into a stream - await this._importVdiContent(vdi, buffer, VDI_FORMAT_RAW) await this._createVbd(vm, vdi) } diff --git a/src/xapi/mixins/patching.js b/src/xapi/mixins/patching.js index 82b6876e6..856d47fb1 100644 --- a/src/xapi/mixins/patching.js +++ b/src/xapi/mixins/patching.js @@ -212,9 +212,9 @@ export default { // platform_version < 2.1.1 ---------------------------------------- async uploadPoolPatch (stream, patchName = 'unknown') { - const taskRef = await this._createTask('Patch upload', patchName) + const taskRef = await this.createTask('Patch upload', patchName) - const task = this._watchTask(taskRef) + const task = this.watchTask(taskRef) const [ patchRef ] = await Promise.all([ task, this.putResource(stream, '/pool_patch_upload', { diff --git a/src/xapi/mixins/vm.js b/src/xapi/mixins/vm.js index 04f0abbd8..774669b13 100644 --- a/src/xapi/mixins/vm.js +++ b/src/xapi/mixins/vm.js @@ -1,16 +1,17 @@ import deferrable from 'golike-defer' -import find from 'lodash/find' -import gte from 'lodash/gte' -import includes from 'lodash/includes' -import isEmpty from 'lodash/isEmpty' -import lte from 'lodash/lte' +import { ignoreErrors } from 'promise-toolbox' +import { + find, + gte, + includes, + isEmpty, + lte +} from 'lodash' import { forEach, mapToArray, - noop, - parseSize, - pCatch + parseSize } from '../../utils' import { @@ -64,7 +65,7 @@ export default { // Removes disks from the provision XML, we will create them by // ourselves. - await this.call('VM.remove_from_other_config', vmRef, 'disks')::pCatch(noop) + await this.call('VM.remove_from_other_config', vmRef, 'disks')::ignoreErrors() // Creates the VDIs and executes the initial steps of the // installation. @@ -386,9 +387,9 @@ export default { if (snapshot.snapshot_info['power-state-at-snapshot'] === 'Running') { const vm = snapshot.$snapshot_of if (vm.power_state === 'Halted') { - this.startVm(vm.$id)::pCatch(noop) + this.startVm(vm.$id)::ignoreErrors() } else if (vm.power_state === 'Suspended') { - this.resumeVm(vm.$id)::pCatch(noop) + this.resumeVm(vm.$id)::ignoreErrors() } } }, diff --git a/src/xo-mixins/authentication.js b/src/xo-mixins/authentication.js index c6a58c6c3..f94ed58f3 100644 --- a/src/xo-mixins/authentication.js +++ b/src/xo-mixins/authentication.js @@ -1,11 +1,11 @@ -import Token, { Tokens } from '../models/token' import { noSuchObject } from 'xo-common/api-errors' +import { ignoreErrors } from 'promise-toolbox' + +import Token, { Tokens } from '../models/token' import { createRawObject, forEach, - generateToken, - pCatch, - noop + generateToken } from '../utils' // =================================================================== @@ -180,7 +180,7 @@ export default class { if (!( token.expiration > Date.now() )) { - this._tokens.remove(id)::pCatch(noop) + this._tokens.remove(id)::ignoreErrors() throw noSuchAuthenticationToken(id) } diff --git a/src/xo-mixins/backups.js b/src/xo-mixins/backups.js index 14a7d0957..1cceb59b9 100644 --- a/src/xo-mixins/backups.js +++ b/src/xo-mixins/backups.js @@ -3,6 +3,7 @@ import escapeStringRegexp from 'escape-string-regexp' import eventToPromise from 'event-to-promise' import execa from 'execa' import splitLines from 'split-lines' +import { CancelToken, ignoreErrors } from 'promise-toolbox' import { createParser as createPairsParser } from 'parse-pairs' import { createReadStream, readdir, stat } from 'fs' import { satisfies as versionSatisfies } from 'semver' @@ -34,7 +35,6 @@ import { mapFilter, mapToArray, noop, - pCatch, pFinally, pFromCallback, pSettle, @@ -416,17 +416,12 @@ export default class { // 2. Copy. let size = 0 const dstVm = await (async () => { - const delta = await srcXapi.exportDeltaVm(srcVm.$id, localBaseUuid, { + const { cancel, token } = CancelToken.source() + const delta = await srcXapi.exportDeltaVm(token, srcVm.$id, localBaseUuid, { snapshotNameLabel: `XO_DELTA_EXPORT: ${targetSr.name_label} (${targetSr.uuid})` }) - $onFailure(async () => { - await Promise.all(mapToArray( - delta.streams, - stream => stream.cancel()::pCatch(noop) - )) - - return srcXapi.deleteVm(delta.vm.uuid)::pCatch(noop) - }) + $onFailure(() => srcXapi.deleteVm(delta.vm.uuid)) + $onFailure(cancel) delta.vm.name_label += ` (${shortDate(Date.now())})` @@ -435,7 +430,9 @@ export default class { const sizeStream = createSizeStream().once('finish', () => { size += sizeStream.size }) - delta.streams[id] = delta.streams[id].pipe(sizeStream) + delta.streams[id] = delta.streams[id].on('end', () => { + console.log(`${id} end`) + }).pipe(sizeStream) }) const promise = targetXapi.importDeltaVm( @@ -449,7 +446,7 @@ export default class { // Once done, (asynchronously) remove the (now obsolete) local // base. if (localBaseUuid) { - promise.then(() => srcXapi.deleteVm(localBaseUuid))::pCatch(noop) + promise.then(() => srcXapi.deleteVm(localBaseUuid))::ignoreErrors() } // (Asynchronously) Identify snapshot as future base. @@ -457,7 +454,7 @@ export default class { return srcXapi._updateObjectMapProperty(srcVm, 'other_config', { [TAG_LAST_BASE_DELTA]: delta.vm.uuid }) - })::pCatch(noop) + })::ignoreErrors() return promise })() @@ -687,7 +684,7 @@ export default class { filter(vdiParent.$snapshots, { name_label: 'XO_DELTA_BASE_VDI_SNAPSHOT' }), base => base.snapshot_time ) - forEach(bases, base => { xapi.deleteVdi(base.$id)::pCatch(noop) }) + forEach(bases, base => { xapi.deleteVdi(base.$id)::ignoreErrors() }) // Export full or delta backup. const vdiFilename = `${date}_${isFull ? 'full' : 'delta'}.vhd` @@ -717,7 +714,7 @@ export default class { ]) } catch (error) { // Remove new backup. (corrupt). - await handler.unlink(backupFullPath)::pCatch(noop) + await handler.unlink(backupFullPath)::ignoreErrors() throw error } @@ -740,7 +737,7 @@ export default class { // Remove xva file. // Version 0.0.0 (Legacy) Delta Backup. - handler.unlink(`${dir}/${getDeltaBackupNameWithoutExt(backup)}.xva`)::pCatch(noop) + handler.unlink(`${dir}/${getDeltaBackupNameWithoutExt(backup)}.xva`)::ignoreErrors() ])) } } @@ -758,7 +755,7 @@ export default class { base => base.snapshot_time ) const baseVm = bases.pop() - forEach(bases, base => { xapi.deleteVm(base.$id)::pCatch(noop) }) + forEach(bases, base => { xapi.deleteVm(base.$id)::ignoreErrors() }) // Check backup dirs. const dir = `vm_delta_${tag}_${vm.uuid}` @@ -781,20 +778,14 @@ export default class { ) // Export... - const delta = await xapi.exportDeltaVm(vm.$id, baseVm && baseVm.$id, { + const { cancel, token } = CancelToken.source() + const delta = await xapi.exportDeltaVm(token, vm.$id, baseVm && baseVm.$id, { snapshotNameLabel: `XO_DELTA_BASE_VM_SNAPSHOT_${tag}`, fullVdisRequired, disableBaseTags: true }) - - $onFailure(async () => { - await Promise.all(mapToArray( - delta.streams, - stream => stream.cancel()::pCatch(noop) - )) - - return xapi.deleteVm(delta.vm.uuid)::pCatch(noop) - }) + $onFailure(() => xapi.deleteVm(delta.vm.uuid)) + $onFailure(cancel) // Save vdis. const vdiBackups = await pSettle( @@ -834,7 +825,7 @@ export default class { } $onFailure(() => asyncMap(fulFilledVdiBackups, vdiBackup => - handler.unlink(`${dir}/${vdiBackup.value()}`)::pCatch(noop) + handler.unlink(`${dir}/${vdiBackup.value()}`)::ignoreErrors() )) if (error) { @@ -869,7 +860,7 @@ export default class { await this._removeOldDeltaVmBackups(xapi, { vm, handler, dir, retention }) if (baseVm) { - xapi.deleteVm(baseVm.$id)::pCatch(noop) + xapi.deleteVm(baseVm.$id)::ignoreErrors() } return { @@ -994,7 +985,7 @@ export default class { _removeVms (xapi, vms) { return Promise.all(mapToArray(vms, vm => // Do not consider a failure to delete an old copy as a fatal error. - xapi.deleteVm(vm.$id)::pCatch(noop) + xapi.deleteVm(vm.$id)::ignoreErrors() )) } @@ -1134,7 +1125,7 @@ export default class { const entriesMap = {} await Promise.all(mapToArray(entries, async name => { - const stats = await pFromCallback(cb => stat(`${path}/${name}`, cb))::pCatch(noop) + const stats = await pFromCallback(cb => stat(`${path}/${name}`, cb))::ignoreErrors() if (stats) { entriesMap[stats.isDirectory() ? `${name}/` : name] = {} } diff --git a/src/xo-mixins/subjects.js b/src/xo-mixins/subjects.js index b4ab6c4f7..ce8d49df4 100644 --- a/src/xo-mixins/subjects.js +++ b/src/xo-mixins/subjects.js @@ -1,5 +1,5 @@ -import filter from 'lodash/filter' -import includes from 'lodash/includes' +import { filter, includes } from 'lodash' +import { ignoreErrors } from 'promise-toolbox' import { hash, needsRehash, @@ -20,9 +20,7 @@ import { forEach, isEmpty, lightSet, - mapToArray, - noop, - pCatch + mapToArray } from '../utils' // =================================================================== @@ -106,15 +104,15 @@ export default class { this._xo.getAuthenticationTokensForUser(id) .then(tokens => { forEach(tokens, token => { - this._xo.deleteAuthenticationToken(id)::pCatch(noop) + this._xo.deleteAuthenticationToken(id)::ignoreErrors() }) }) - ::pCatch(noop) // Ignore any failures. + ::ignoreErrors() // Remove ACLs for this user. this._xo.getAclsForSubject(id).then(acls => { forEach(acls, acl => { - this._xo.removeAcl(id, acl.object, acl.action)::pCatch(noop) + this._xo.removeAcl(id, acl.object, acl.action)::ignoreErrors() }) }) @@ -122,7 +120,7 @@ export default class { forEach(user.groups, groupId => { this.getGroup(groupId) .then(group => this._removeUserFromGroup(id, group)) - ::pCatch(noop) // Ignore any failures. + ::ignoreErrors() }) } @@ -268,7 +266,7 @@ export default class { // Remove ACLs for this group. this._xo.getAclsForSubject(id).then(acls => { forEach(acls, acl => { - this._xo.removeAcl(id, acl.object, acl.action)::pCatch(noop) + this._xo.removeAcl(id, acl.object, acl.action)::ignoreErrors() }) }) @@ -276,7 +274,7 @@ export default class { forEach(group.users, userId => { this.getUser(userId) .then(user => this._removeGroupFromUser(id, user)) - ::pCatch(noop) // Ignore any failures. + ::ignoreErrors() }) } diff --git a/src/xo-mixins/xen-servers.js b/src/xo-mixins/xen-servers.js index 6c48adeca..3c11f642a 100644 --- a/src/xo-mixins/xen-servers.js +++ b/src/xo-mixins/xen-servers.js @@ -1,3 +1,4 @@ +import { ignoreErrors } from 'promise-toolbox' import { noSuchObject } from 'xo-common/api-errors' import Xapi from '../xapi' @@ -9,8 +10,6 @@ import { forEach, isEmpty, isString, - noop, - pCatch, popProperty, serializeError } from '../utils' @@ -81,7 +80,7 @@ export default class { } async unregisterXenServer (id) { - this.disconnectXenServer(id)::pCatch(noop) + this.disconnectXenServer(id)::ignoreErrors() if (!await this._servers.remove(id)) { throw noSuchObject(id, 'xenServer') diff --git a/yarn.lock b/yarn.lock index d7979e242..510165f3b 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3323,9 +3323,9 @@ http-proxy@^1.16.2: eventemitter3 "1.x.x" requires-port "1.x.x" -http-request-plus@^0.3.0: - version "0.3.0" - resolved "https://registry.yarnpkg.com/http-request-plus/-/http-request-plus-0.3.0.tgz#4ee822f65006485af0362dd70c5fd145f885dd0c" +http-request-plus@^0.4.0: + version "0.4.0" + resolved "https://registry.yarnpkg.com/http-request-plus/-/http-request-plus-0.4.0.tgz#f9cfe550094c82d4a8d93b6252ff8f37349a8123" dependencies: is-redirect "^1.0.0" lodash "^4.17.4" @@ -5399,9 +5399,9 @@ promise-toolbox@^0.8.0: dependencies: make-error "^1.2.3" -promise-toolbox@^0.9.1, promise-toolbox@^0.9.2, promise-toolbox@^0.9.4: - version "0.9.4" - resolved "https://registry.yarnpkg.com/promise-toolbox/-/promise-toolbox-0.9.4.tgz#672fe37557eea05093f770e7e1fca64e39ae9238" +promise-toolbox@^0.9.1, promise-toolbox@^0.9.5: + version "0.9.5" + resolved "https://registry.yarnpkg.com/promise-toolbox/-/promise-toolbox-0.9.5.tgz#ca33e53714cfde924a9bd3d2d23c53b21cb75acc" dependencies: make-error "^1.2.3" @@ -7071,23 +7071,23 @@ xdg-basedir@^2.0.0: dependencies: os-homedir "^1.0.0" -xen-api@^0.13.1: - version "0.13.1" - resolved "https://registry.yarnpkg.com/xen-api/-/xen-api-0.13.1.tgz#471aedff714460e888189b07d04610bf762971df" +xen-api@^0.13.5: + version "0.13.5" + resolved "https://registry.yarnpkg.com/xen-api/-/xen-api-0.13.5.tgz#4edcf1b9a83c87df2bd94e02e860aaef85796157" dependencies: babel-polyfill "^6.23.0" blocked "^1.2.1" debug "^2.6.8" event-to-promise "^0.8.0" exec-promise "^0.7.0" - http-request-plus "^0.3.0" + http-request-plus "^0.4.0" json-rpc-protocol "^0.11.2" kindof "^2.0.0" lodash "^4.17.4" make-error "^1.3.0" minimist "^1.2.0" ms "^2.0.0" - promise-toolbox "^0.9.4" + promise-toolbox "^0.9.5" pw "0.0.4" xmlrpc "^1.3.2" xo-collection "^0.4.1"