Compare commits

...

1 Commits

Author SHA1 Message Date
Julien Fontanet
4c991c1a57 WiP 2022-07-04 17:41:46 +02:00
40 changed files with 510 additions and 388 deletions

View File

@@ -1,4 +1,4 @@
### `asyncEach(iterable, iteratee, [opts])`
### `pEach(iterable, iteratee, [opts])`
Executes `iteratee` in order for each value yielded by `iterable`.
@@ -6,7 +6,7 @@ Returns a promise wich rejects as soon as a call to `iteratee` throws or a promi
`iterable` must be an iterable or async iterable.
`iteratee` is called with the same `this` value as `asyncEach`, and with the following arguments:
`iteratee` is called with the same `this` value as `pEach`, and with the following arguments:
- `value`: the value yielded by `iterable`
- `index`: the 0-based index for this value
@@ -19,10 +19,10 @@ Returns a promise wich rejects as soon as a call to `iteratee` throws or a promi
- `stopOnError`: wether to stop iteration of first error, or wait for all calls to finish and throw an `AggregateError`, defaults to `true`
```js
import { asyncEach } from '@vates/async-each'
import { pEach } from '@vates/async-each'
const contents = []
await asyncEach(
await pEach(
['foo.txt', 'bar.txt', 'baz.txt'],
async function (filename, i) {
contents[i] = await readFile(filename)

View File

@@ -16,7 +16,7 @@ Installation of the [npm package](https://npmjs.org/package/@vates/async-each):
## Usage
### `asyncEach(iterable, iteratee, [opts])`
### `pEach(iterable, iteratee, [opts])`
Executes `iteratee` in order for each value yielded by `iterable`.
@@ -24,7 +24,7 @@ Returns a promise wich rejects as soon as a call to `iteratee` throws or a promi
`iterable` must be an iterable or async iterable.
`iteratee` is called with the same `this` value as `asyncEach`, and with the following arguments:
`iteratee` is called with the same `this` value as `pEach`, and with the following arguments:
- `value`: the value yielded by `iterable`
- `index`: the 0-based index for this value
@@ -37,10 +37,10 @@ Returns a promise wich rejects as soon as a call to `iteratee` throws or a promi
- `stopOnError`: wether to stop iteration of first error, or wait for all calls to finish and throw an `AggregateError`, defaults to `true`
```js
import { asyncEach } from '@vates/async-each'
import { pEach } from '@vates/async-each'
const contents = []
await asyncEach(
await pEach(
['foo.txt', 'bar.txt', 'baz.txt'],
async function (filename, i) {
contents[i] = await readFile(filename)

View File

@@ -15,7 +15,7 @@ class AggregateError extends Error {
* @param {(item: Item, index: number, iterable: Iterable<Item>) => Promise<void>} iteratee
* @returns {Promise<void>}
*/
exports.asyncEach = function asyncEach(iterable, iteratee, { concurrency = 1, signal, stopOnError = true } = {}) {
exports.pEach = function pEach(iterable, iteratee, { concurrency = 10, signal, stopOnError = true } = {}) {
if (concurrency === 0) {
concurrency = Infinity
}
@@ -28,7 +28,7 @@ exports.asyncEach = function asyncEach(iterable, iteratee, { concurrency = 1, si
let onAbort
if (signal !== undefined) {
onAbort = () => {
onRejectedWrapper(new Error('asyncEach aborted'))
onRejectedWrapper(new Error('pEach aborted'))
}
signal.addEventListener('abort', onAbort)
}
@@ -51,11 +51,10 @@ exports.asyncEach = function asyncEach(iterable, iteratee, { concurrency = 1, si
clean()
})(reject)
let onFulfilled = value => {
let onFulfilled = () => {
--running
next()
}
const onFulfilledWrapper = value => onFulfilled(value)
let onRejected = stopOnError
? reject
@@ -66,6 +65,14 @@ exports.asyncEach = function asyncEach(iterable, iteratee, { concurrency = 1, si
}
const onRejectedWrapper = reason => onRejected(reason)
const run = async (value, index) => {
try {
onFulfilled(await iteratee.call(this, value, index++))
} catch (error) {
onRejected(error)
}
}
let nextIsRunning = false
let next = async () => {
if (nextIsRunning) {
@@ -86,17 +93,7 @@ exports.asyncEach = function asyncEach(iterable, iteratee, { concurrency = 1, si
}
} else {
++running
try {
const result = iteratee.call(this, cursor.value, index++, iterable)
let then
if (result != null && typeof result === 'object' && typeof (then = result.then) === 'function') {
then.call(result, onFulfilledWrapper, onRejectedWrapper)
} else {
onFulfilled(result)
}
} catch (error) {
onRejected(error)
}
run(cursor.value, index++, iterable)
}
nextIsRunning = false
return next()

View File

@@ -2,7 +2,7 @@
/* eslint-env jest */
const { asyncEach } = require('./')
const { pEach } = require('./')
const randomDelay = (max = 10) =>
new Promise(resolve => {
@@ -14,7 +14,7 @@ const rejectionOf = p =>
p.then(reject, resolve)
})
describe('asyncEach', () => {
describe('pEach', () => {
const thisArg = 'qux'
const values = ['foo', 'bar', 'baz']
@@ -36,7 +36,7 @@ describe('asyncEach', () => {
it('works', async () => {
const iteratee = jest.fn(async () => {})
await asyncEach.call(thisArg, iterable, iteratee)
await pEach.call(thisArg, iterable, iteratee)
expect(iteratee.mock.instances).toEqual(Array.from(values, () => thisArg))
expect(iteratee.mock.calls).toEqual(Array.from(values, (value, index) => [value, index, iterable]))
@@ -45,7 +45,7 @@ describe('asyncEach', () => {
it('respects a concurrency of ' + concurrency, async () => {
let running = 0
await asyncEach(
await pEach(
values,
async () => {
++running
@@ -66,7 +66,7 @@ describe('asyncEach', () => {
}
})
expect(await rejectionOf(asyncEach(iterable, iteratee, { stopOnError: true }))).toBe(error)
expect(await rejectionOf(pEach(iterable, iteratee, { stopOnError: true }))).toBe(error)
expect(iteratee).toHaveBeenCalledTimes(2)
})
@@ -78,7 +78,7 @@ describe('asyncEach', () => {
throw error
})
const error = await rejectionOf(asyncEach(iterable, iteratee, { stopOnError: false }))
const error = await rejectionOf(pEach(iterable, iteratee, { stopOnError: false }))
expect(error.errors).toEqual(errors)
expect(iteratee.mock.calls).toEqual(Array.from(values, (value, index) => [value, index, iterable]))
})
@@ -91,7 +91,7 @@ describe('asyncEach', () => {
}
})
await expect(asyncEach(iterable, iteratee, { signal: ac.signal })).rejects.toThrow('asyncEach aborted')
await expect(pEach(iterable, iteratee, { signal: ac.signal })).rejects.toThrow('pEach aborted')
expect(iteratee).toHaveBeenCalledTimes(2)
})
})

View File

@@ -23,7 +23,7 @@ module.exports = async function main(args) {
},
})
await asyncMap(_, async vmDir => {
await pEach(_, async vmDir => {
vmDir = resolve(vmDir)
try {
await adapter.cleanVm(vmDir, { fixMetadata: fix, remove, merge, onLog: (...args) => console.warn(...args) })

View File

@@ -11,8 +11,8 @@ module.exports = async function createSymlinkIndex([backupDir, fieldPath]) {
const indexDir = join(backupDir, 'indexes', filenamify(fieldPath))
await mktree(indexDir)
await asyncMap(await readdir2(backupDir), async vmDir =>
asyncMap(
await pEach(await readdir2(backupDir), async vmDir =>
pEach(
(await readdir2(vmDir)).filter(_ => _.endsWith('.json')),
async json => {
const metadata = JSON.parse(await readFile(json))

View File

@@ -159,7 +159,7 @@ exports.Backup = class Backup {
const promises = []
if (pools.length !== 0 && settings.retentionPoolMetadata !== 0) {
promises.push(
asyncMap(pools, async pool =>
pEach(pools, async pool =>
runTask(
{
name: `Starting metadata backup for the pool (${pool.$id}). (${job.id})`,
@@ -245,7 +245,7 @@ exports.Backup = class Backup {
})
)
),
() => settings.healthCheckSr !== undefined ? this._getRecord('SR', settings.healthCheckSr) : undefined,
() => (settings.healthCheckSr !== undefined ? this._getRecord('SR', settings.healthCheckSr) : undefined),
async (srs, remoteAdapters, healthCheckSr) => {
// remove adapters that failed (already handled)
remoteAdapters = remoteAdapters.filter(_ => _ !== undefined)
@@ -284,7 +284,10 @@ exports.Backup = class Backup {
)
)
const { concurrency } = settings
await asyncMapSettled(vmIds, concurrency === 0 ? handleVm : limitConcurrency(concurrency)(handleVm))
await pEach(vmIds, handleVm, {
concurrency,
stopOnError: false,
})
}
)
}

View File

@@ -1,6 +1,8 @@
'use strict'
const { asyncMap } = require('@xen-orchestra/async-map')
const { pEach } = require('@xen-orchestra/async-map')
const noop = Function.protoype
exports.DurablePartition = class DurablePartition {
// private resource API is used exceptionally to be able to separate resource creation and release
@@ -8,10 +10,10 @@ exports.DurablePartition = class DurablePartition {
flushAll() {
const partitionDisposers = this.#partitionDisposers
return asyncMap(Object.keys(partitionDisposers), path => {
return pEach(Object.keys(partitionDisposers), path => {
const disposers = partitionDisposers[path]
delete partitionDisposers[path]
return asyncMap(disposers, d => d(path).catch(noop => {}))
return pEach(disposers, d => d(path).catch(noop))
})
}

View File

@@ -49,9 +49,7 @@ const RE_VHDI = /^vhdi(\d+)$/
async function addDirectory(files, realPath, metadataPath) {
const stats = await lstat(realPath)
if (stats.isDirectory()) {
await asyncMap(await readdir(realPath), file =>
addDirectory(files, realPath + '/' + file, metadataPath + '/' + file)
)
await pEach(await readdir(realPath), file => addDirectory(files, realPath + '/' + file, metadataPath + '/' + file))
} else if (stats.isFile()) {
files.push({
realPath,
@@ -182,7 +180,7 @@ class RemoteAdapter {
const path = yield this.getPartition(diskId, partitionId)
const files = []
await asyncMap(paths, file =>
await pEach(paths, file =>
addDirectory(files, resolveSubpath(path, file), normalize('./' + file).replace(/\/+$/, ''))
)
@@ -228,7 +226,9 @@ class RemoteAdapter {
const handler = this._handler
// this will delete the json, unused VHDs will be detected by `cleanVm`
await asyncMapSettled(backups, ({ _filename }) => handler.unlink(_filename))
await pEach(backups, ({ _filename }) => handler.unlink(_filename), {
stopOnError: false,
})
}
async deleteMetadataBackup(backupId) {
@@ -248,13 +248,16 @@ class RemoteAdapter {
let list = await handler.list(dir)
list.sort()
list = list.filter(timestamp => /^\d{8}T\d{6}Z$/.test(timestamp)).slice(0, -retention)
await asyncMapSettled(list, timestamp => handler.rmtree(`${dir}/${timestamp}`))
await pEach(list, timestamp => handler.rmtree(`${dir}/${timestamp}`), { stopOnError: false })
}
async deleteFullVmBackups(backups) {
const handler = this._handler
await asyncMapSettled(backups, ({ _filename, xva }) =>
Promise.all([handler.unlink(_filename), handler.unlink(resolveRelativeFromFile(_filename, xva))])
await pEach(
backups,
({ _filename, xva }) =>
Promise.all([handler.unlink(_filename), handler.unlink(resolveRelativeFromFile(_filename, xva))]),
{ stopOnError: false }
)
}
@@ -263,7 +266,7 @@ class RemoteAdapter {
}
async deleteVmBackups(files) {
const metadatas = await asyncMap(files, file => this.readVmBackupMetadata(file))
const metadatas = await pEach(files, file => this.readVmBackupMetadata(file))
const { delta, full, ...others } = groupBy(metadatas, 'mode')
const unsupportedModes = Object.keys(others)
@@ -283,7 +286,7 @@ class RemoteAdapter {
}
const dedupedVmUuid = new Set(metadatas.map(_ => _.vm.uuid))
await asyncMap(dedupedVmUuid, vmUuid => this.invalidateVmBackupListCache(vmUuid))
await pEach(dedupedVmUuid, vmUuid => this.invalidateVmBackupListCache(vmUuid))
}
#getCompressionType() {
@@ -362,7 +365,7 @@ class RemoteAdapter {
const handler = this._handler
const backups = { __proto__: null }
await asyncMap(await handler.list(BACKUP_DIR), async entry => {
await pEach(await handler.list(BACKUP_DIR), async entry => {
// ignore hidden and lock files
if (entry[0] !== '.' && !entry.endsWith('.lock')) {
const vmBackups = await this.listVmBackups(entry)
@@ -380,7 +383,7 @@ class RemoteAdapter {
path = resolveSubpath(rootPath, path)
const entriesMap = {}
await asyncMap(await readdir(path), async name => {
await pEach(await readdir(path), async name => {
try {
const stats = await lstat(`${path}/${name}`)
if (stats.isDirectory()) {
@@ -413,10 +416,13 @@ class RemoteAdapter {
}
const results = []
await asyncMapSettled(partitions, partition =>
partition.type === LVM_PARTITION_TYPE
? this._listLvmLogicalVolumes(devicePath, partition, results)
: results.push(partition)
await pEach(
partitions,
partition =>
partition.type === LVM_PARTITION_TYPE
? this._listLvmLogicalVolumes(devicePath, partition, results)
: results.push(partition),
{ stopOnError: false }
)
return results
})
@@ -427,10 +433,10 @@ class RemoteAdapter {
const safeReaddir = createSafeReaddir(handler, 'listPoolMetadataBackups')
const backupsByPool = {}
await asyncMap(await safeReaddir(DIR_XO_POOL_METADATA_BACKUPS, { prependDir: true }), async scheduleDir =>
asyncMap(await safeReaddir(scheduleDir), async poolId => {
await pEach(await safeReaddir(DIR_XO_POOL_METADATA_BACKUPS, { prependDir: true }), async scheduleDir =>
pEach(await safeReaddir(scheduleDir), async poolId => {
const backups = backupsByPool[poolId] ?? (backupsByPool[poolId] = [])
return asyncMap(await safeReaddir(`${scheduleDir}/${poolId}`, { prependDir: true }), async backupDir => {
return pEach(await safeReaddir(`${scheduleDir}/${poolId}`, { prependDir: true }), async backupDir => {
try {
backups.push({
id: backupDir,
@@ -471,7 +477,7 @@ class RemoteAdapter {
filter: isMetadataFile,
prependDir: true,
})
await asyncMap(files, async file => {
await pEach(files, async file => {
try {
const metadata = await this.readVmBackupMetadata(file)
// inject an id usable by importVmBackupNg()
@@ -555,8 +561,8 @@ class RemoteAdapter {
const safeReaddir = createSafeReaddir(handler, 'listXoMetadataBackups')
const backups = []
await asyncMap(await safeReaddir(DIR_XO_CONFIG_BACKUPS, { prependDir: true }), async scheduleDir =>
asyncMap(await safeReaddir(scheduleDir, { prependDir: true }), async backupDir => {
await pEach(await safeReaddir(DIR_XO_CONFIG_BACKUPS, { prependDir: true }), async scheduleDir =>
pEach(await safeReaddir(scheduleDir, { prependDir: true }), async backupDir => {
try {
backups.push({
id: backupDir,
@@ -635,9 +641,13 @@ class RemoteAdapter {
const vdis = ignoredVdis === undefined ? metadata.vdis : pickBy(metadata.vdis, vdi => !ignoredVdis.has(vdi.uuid))
const streams = {}
await asyncMapSettled(Object.keys(vdis), async ref => {
streams[`${ref}.vhd`] = await this._createSyntheticStream(handler, join(dir, vhds[ref]))
})
await pEach(
Object.keys(vdis),
async ref => {
streams[`${ref}.vhd`] = await this._createSyntheticStream(handler, join(dir, vhds[ref]))
},
{ stopOnError: false }
)
return {
streams,

View File

@@ -1,6 +1,6 @@
'use strict'
const { asyncMap } = require('@xen-orchestra/async-map')
const { pEach } = require('@vates/async-each')
const { DIR_XO_POOL_METADATA_BACKUPS } = require('./RemoteAdapter.js')
const { forkStreamUnpipe } = require('./_forkStreamUnpipe.js')
@@ -52,7 +52,7 @@ exports.PoolMetadataBackup = class PoolMetadataBackup {
)
const metaDataFileName = `${dir}/metadata.json`
await asyncMap(
await pEach(
Object.entries(this._remoteAdapters),
([remoteId, adapter]) =>
Task.run(

View File

@@ -146,7 +146,7 @@ class VmBackup {
}
const errors = []
await (parallel ? asyncMap : asyncEach)(writers, async function (writer) {
await (parallel ? pEach : asyncEach)(writers, async function (writer) {
try {
await fn(writer)
} catch (error) {
@@ -331,13 +331,13 @@ class VmBackup {
const snapshotsPerSchedule = groupBy(this._jobSnapshots, _ => _.other_config['xo:backup:schedule'])
const xapi = this._xapi
await asyncMap(Object.entries(snapshotsPerSchedule), ([scheduleId, snapshots]) => {
await pEach(Object.entries(snapshotsPerSchedule), ([scheduleId, snapshots]) => {
const settings = {
...baseSettings,
...allSettings[scheduleId],
...allSettings[this.vm.uuid],
}
return asyncMap(getOldEntries(settings.snapshotRetention, snapshots), ({ $ref }) => {
return pEach(getOldEntries(settings.snapshotRetention, snapshots), ({ $ref }) => {
if ($ref !== baseVmRef) {
return xapi.VM_destroy($ref)
}
@@ -367,7 +367,7 @@ class VmBackup {
baseVm = await xapi.getRecord('VM', baseVm.$ref)
const baseUuidToSrcVdi = new Map()
await asyncMap(await baseVm.$getDisks(), async baseRef => {
await pEach(await baseVm.$getDisks(), async baseRef => {
const [baseUuid, snapshotOf] = await Promise.all([
xapi.getField('VDI', baseRef, 'uuid'),
xapi.getField('VDI', baseRef, 'snapshot_of'),

View File

@@ -38,7 +38,7 @@ exports.XoMetadataBackup = class XoMetadataBackup {
)
const metaDataFileName = `${dir}/metadata.json`
await asyncMap(
await pEach(
Object.entries(this._remoteAdapters),
([remoteId, adapter]) =>
Task.run(

View File

@@ -4,11 +4,12 @@ const assert = require('assert')
const sum = require('lodash/sum')
const { asyncMap } = require('@xen-orchestra/async-map')
const { Constants, mergeVhd, openVhd, VhdAbstract, VhdFile } = require('vhd-lib')
const { isVhdAlias, resolveVhdAlias } = require('vhd-lib/aliases')
const { dirname, resolve } = require('path')
const { DISK_TYPES } = Constants
const { isMetadataFile, isVhdFile, isXvaFile, isXvaSumFile } = require('./_backupType.js')
const { isVhdAlias, resolveVhdAlias } = require('vhd-lib/aliases')
const { limitConcurrency } = require('limit-concurrency-decorator')
const { pEach } = require('@vates/async-each')
const { Task } = require('./Task.js')
const { Disposable } = require('promise-toolbox')
@@ -85,13 +86,13 @@ const listVhds = async (handler, vmDir) => {
const aliases = {}
const interruptedVhds = new Map()
await asyncMap(
await pEach(
await handler.list(`${vmDir}/vdis`, {
ignoreMissing: true,
prependDir: true,
}),
async jobDir =>
asyncMap(
pEach(
await handler.list(jobDir, {
prependDir: true,
}),
@@ -193,7 +194,7 @@ exports.cleanVm = async function cleanVm(
const { vhds, interruptedVhds, aliases } = await listVhds(handler, vmDir)
// remove broken VHDs
await asyncMap(vhds, async path => {
await pEach(vhds, async path => {
try {
await Disposable.use(openVhd(handler, path, { checkSecondFooter: !interruptedVhds.has(path) }), vhd => {
if (vhd.footer.diskType === DISK_TYPES.DIFFERENCING) {
@@ -238,7 +239,7 @@ exports.cleanVm = async function cleanVm(
// check if alias are correct
// check if all vhd in data subfolder have a corresponding alias
await asyncMap(Object.keys(aliases), async dir => {
await pEach(Object.keys(aliases), async dir => {
await checkAliases(aliases[dir], `${dir}/data`, { handler, logInfo, logWarn, remove })
})
@@ -296,7 +297,7 @@ exports.cleanVm = async function cleanVm(
}
})
await asyncMap(xvas, async path => {
await pEach(xvas, async path => {
// check is not good enough to delete the file, the best we can do is report
// it
if (!(await this.isValidXva(path))) {
@@ -309,7 +310,7 @@ exports.cleanVm = async function cleanVm(
// compile the list of unused XVAs and VHDs, and remove backup metadata which
// reference a missing XVA/VHD
await asyncMap(jsons, async json => {
await pEach(jsons, async json => {
let metadata
try {
metadata = JSON.parse(await handler.readFile(json))
@@ -416,7 +417,7 @@ exports.cleanVm = async function cleanVm(
const metadataWithMergedVhd = {}
const doMerge = async () => {
await asyncMap(toMerge, async chain => {
await pEach(toMerge, async chain => {
const merged = await limitedMergeVhdChain(chain, { handler, logInfo, logWarn, remove, merge })
if (merged !== undefined) {
const metadataPath = vhdsToJSons[chain[0]] // all the chain should have the same metada file
@@ -428,14 +429,14 @@ exports.cleanVm = async function cleanVm(
await Promise.all([
...unusedVhdsDeletion,
toMerge.length !== 0 && (merge ? Task.run({ name: 'merge' }, doMerge) : doMerge()),
asyncMap(unusedXvas, path => {
pEach(unusedXvas, path => {
logWarn('unused XVA', { path })
if (remove) {
logInfo('deleting unused XVA', { path })
return handler.unlink(path)
}
}),
asyncMap(xvaSums, path => {
pEach(xvaSums, path => {
// no need to handle checksums for XVAs deleted by the script, they will be handled by `unlink()`
if (!xvas.has(path.slice(0, -'.checksum'.length))) {
logInfo('unused XVA checksum', { path })
@@ -450,7 +451,7 @@ exports.cleanVm = async function cleanVm(
// update size for delta metadata with merged VHD
// check for the other that the size is the same as the real file size
await asyncMap(jsons, async metadataPath => {
await pEach(jsons, async metadataPath => {
const metadata = JSON.parse(await handler.readFile(metadataPath))
let fileSystemSize

View File

@@ -5,10 +5,10 @@ const find = require('lodash/find.js')
const groupBy = require('lodash/groupBy.js')
const ignoreErrors = require('promise-toolbox/ignoreErrors')
const omit = require('lodash/omit.js')
const { asyncMap } = require('@xen-orchestra/async-map')
const { CancelToken } = require('promise-toolbox')
const { createVhdStreamWithLength } = require('vhd-lib')
const { defer } = require('golike-defer')
const { pEach } = require('@vates/async-each')
const { cancelableMap } = require('./_cancelableMap.js')
const { Task } = require('./Task.js')
@@ -237,13 +237,13 @@ exports.importDeltaVm = defer(async function importDeltaVm(
$defer.onFailure.call(xapi, 'VM_destroy', vmRef)
// 2. Delete all VBDs which may have been created by the import.
await asyncMap(await xapi.getField('VM', vmRef, 'VBDs'), ref => ignoreErrors.call(xapi.call('VBD.destroy', ref)))
await pEach(await xapi.getField('VM', vmRef, 'VBDs'), ref => ignoreErrors.call(xapi.call('VBD.destroy', ref)))
// 3. Create VDIs & VBDs.
const vbdRecords = deltaVm.vbds
const vbds = groupBy(vbdRecords, 'VDI')
const newVdis = {}
await asyncMap(Object.keys(vdiRecords), async vdiRef => {
await pEach(Object.keys(vdiRecords), async vdiRef => {
const vdi = vdiRecords[vdiRef]
let newVdi
@@ -279,7 +279,7 @@ exports.importDeltaVm = defer(async function importDeltaVm(
const vdiVbds = vbds[vdiRef]
if (vdiVbds !== undefined) {
await asyncMap(Object.values(vdiVbds), vbd =>
await pEach(Object.values(vdiVbds), vbd =>
xapi.VBD_create({
...vbd,
VDI: newVdi.$ref,
@@ -323,7 +323,7 @@ exports.importDeltaVm = defer(async function importDeltaVm(
}),
// Create VIFs.
asyncMap(Object.values(deltaVm.vifs), vif => {
pEach(Object.values(deltaVm.vifs), vif => {
let network = vif.$network$uuid && xapi.getObjectByUuid(vif.$network$uuid, undefined)
if (network === undefined) {

View File

@@ -33,7 +33,7 @@ exports.DeltaBackupWriter = class DeltaBackupWriter extends MixinBackupWriter(Ab
const backupDir = getVmBackupDir(backup.vm.uuid)
const vdisDir = `${backupDir}/vdis/${backup.job.id}`
await asyncMap(baseUuidToSrcVdi, async ([baseUuid, srcVdi]) => {
await pEach(baseUuidToSrcVdi, async ([baseUuid, srcVdi]) => {
let found = false
try {
const vhds = await handler.list(`${vdisDir}/${srcVdi.uuid}`, {
@@ -41,7 +41,7 @@ exports.DeltaBackupWriter = class DeltaBackupWriter extends MixinBackupWriter(Ab
prependDir: true,
})
const packedBaseUuid = packUuid(baseUuid)
await asyncMap(vhds, async path => {
await pEach(vhds, async path => {
try {
await checkVhdChain(handler, path)
// Warning, this should not be written as found = found || await adapter.isMergeableParent(packedBaseUuid, path)

View File

@@ -25,7 +25,7 @@ exports.DeltaReplicationWriter = class DeltaReplicationWriter extends MixinRepli
const xapi = replicatedVm.$xapi
const replicatedVdis = new Set(
await asyncMap(await replicatedVm.$getDisks(), async vdiRef => {
await pEach(await replicatedVm.$getDisks(), async vdiRef => {
const otherConfig = await xapi.getField('VDI', vdiRef, 'other_config')
return otherConfig[TAG_COPY_SRC]
})
@@ -60,7 +60,10 @@ exports.DeltaReplicationWriter = class DeltaReplicationWriter extends MixinRepli
const { scheduleId, vm } = this._backup
// delete previous interrupted copies
ignoreErrors.call(asyncMapSettled(listReplicatedVms(xapi, scheduleId, undefined, vm.uuid), vm => vm.$destroy))
ignoreErrors.call(
pEach(listReplicatedVms(xapi, scheduleId, undefined, vm.uuid), vm => vm.$destroy),
{ stopOnError: false }
)
this._oldEntries = getOldEntries(settings.copyRetention - 1, listReplicatedVms(xapi, scheduleId, srUuid, vm.uuid))
@@ -76,7 +79,7 @@ exports.DeltaReplicationWriter = class DeltaReplicationWriter extends MixinRepli
}
async _deleteOldEntries() {
return asyncMapSettled(this._oldEntries, vm => vm.$destroy())
return pEach(this._oldEntries, vm => vm.$destroy(), { stopOnError: false })
}
async _transfer({ timestamp, deltaExport, sizeContainers }) {
@@ -108,7 +111,7 @@ exports.DeltaReplicationWriter = class DeltaReplicationWriter extends MixinRepli
targetVm.ha_restart_priority !== '' &&
Promise.all([targetVm.set_ha_restart_priority(''), targetVm.add_tags('HA disabled')]),
targetVm.set_name_label(`${vm.name_label} - ${job.name} - (${formatFilenameDate(timestamp)})`),
asyncMap(['start', 'start_on'], op =>
pEach(['start', 'start_on'], op =>
targetVm.update_blocked_operations(
op,
'Start operation for this vm is blocked, clone it if you want to use it.'

View File

@@ -40,12 +40,14 @@ exports.FullReplicationWriter = class FullReplicationWriter extends MixinReplica
// delete previous interrupted copies
ignoreErrors.call(
asyncMapSettled(listReplicatedVms(xapi, scheduleId, undefined, vm.uuid), vm => xapi.VM_destroy(vm.$ref))
pEach(listReplicatedVms(xapi, scheduleId, undefined, vm.uuid), vm => xapi.VM_destroy(vm.$ref), {
stopOnError: false,
})
)
const oldVms = getOldEntries(settings.copyRetention - 1, listReplicatedVms(xapi, scheduleId, srUuid, vm.uuid))
const deleteOldBackups = () => asyncMapSettled(oldVms, vm => xapi.VM_destroy(vm.$ref))
const deleteOldBackups = () => pEach(oldVms, vm => xapi.VM_destroy(vm.$ref), { stopOnError: false })
const { deleteFirst } = settings
if (deleteFirst) {
await deleteOldBackups()
@@ -66,7 +68,7 @@ exports.FullReplicationWriter = class FullReplicationWriter extends MixinReplica
const targetVm = await xapi.getRecord('VM', targetVmRef)
await Promise.all([
asyncMap(['start', 'start_on'], op =>
pEach(['start', 'start_on'], op =>
targetVm.update_blocked_operations(
op,
'Start operation for this vm is blocked, clone it if you want to use it.'

View File

@@ -554,15 +554,18 @@ export default class RemoteHandlerAbstract {
}
const files = await this._list(dir)
await asyncMapSettled(files, file =>
this._unlink(`${dir}/${file}`).catch(error => {
// Unlink dir behavior is not consistent across platforms
// https://github.com/nodejs/node-v0.x-archive/issues/5791
if (error.code === 'EISDIR' || error.code === 'EPERM') {
return this._rmtree(`${dir}/${file}`)
}
throw error
})
await pEach(files, file =>
this._unlink(`${dir}/${file}`).catch(
error => {
// Unlink dir behavior is not consistent across platforms
// https://github.com/nodejs/node-v0.x-archive/issues/5791
if (error.code === 'EISDIR' || error.code === 'EPERM') {
return this._rmtree(`${dir}/${file}`)
}
throw error
},
{ stopOnError: false }
)
)
return this._rmtree(dir)
}

View File

@@ -28,7 +28,7 @@ import createBufferFromStream from './_createBufferFromStream.js'
import guessAwsRegion from './_guessAwsRegion.js'
import RemoteHandlerAbstract from './abstract'
import { basename, join, split } from './_path'
import { asyncEach } from '@vates/async-each'
import { pEach } from '@vates/async-each'
// endpoints https://docs.aws.amazon.com/general/latest/gr/s3.html
@@ -369,7 +369,7 @@ export default class S3Handler extends RemoteHandlerAbstract {
)
NextContinuationToken = result.IsTruncated ? result.NextContinuationToken : undefined
await asyncEach(
await pEach(
result.Contents ?? [],
async ({ Key }) => {
// _unlink will add the prefix, but Key contains everything

View File

@@ -250,7 +250,7 @@ export default class Backups {
listPoolMetadataBackups: [
async ({ remotes }) => {
const backupsByRemote = {}
await asyncMap(Object.entries(remotes), async ([remoteId, remote]) => {
await pEach(Object.entries(remotes), async ([remoteId, remote]) => {
try {
await Disposable.use(this.getAdapter(remote), async adapter => {
backupsByRemote[remoteId] = await adapter.listPoolMetadataBackups()
@@ -274,7 +274,7 @@ export default class Backups {
listVmBackups: [
async ({ remotes }) => {
const backups = {}
await asyncMap(Object.keys(remotes), async remoteId => {
await pEach(Object.keys(remotes), async remoteId => {
try {
await Disposable.use(this.getAdapter(remotes[remoteId]), async adapter => {
backups[remoteId] = formatVmBackups(await adapter.listAllVmBackups())
@@ -304,7 +304,7 @@ export default class Backups {
listXoMetadataBackups: [
async ({ remotes }) => {
const backupsByRemote = {}
await asyncMap(Object.entries(remotes), async ([remoteId, remote]) => {
await pEach(Object.entries(remotes), async ([remoteId, remote]) => {
try {
await Disposable.use(this.getAdapter(remote), async adapter => {
backupsByRemote[remoteId] = await adapter.listXoMetadataBackups()

View File

@@ -68,12 +68,12 @@ class Sr {
}
}
}
await asyncMap(await this.getField('SR', ref, 'VDIs'), async ref => {
await asyncMap(await this.getField('VDI', ref, 'VBDs'), handleVbd)
await pEach(await this.getField('SR', ref, 'VDIs'), async ref => {
await pEach(await this.getField('VDI', ref, 'VBDs'), handleVbd)
})
{
const runningVmUuids = await asyncMap(runningVms.keys(), ref => this.getField('VM', ref, 'uuid'))
const runningVmUuids = await pEach(runningVms.keys(), ref => this.getField('VM', ref, 'uuid'))
const set = new Set(vmsToShutdown)
for (const vmUuid of runningVmUuids) {
@@ -89,29 +89,37 @@ class Sr {
state.shutdownVms = {}
await asyncMapSettled(runningVms, async ([ref, isPaused]) => {
state.shutdownVms[await this.getField('VM', ref, 'uuid')] = isPaused
await pEach(
runningVms,
async ([ref, isPaused]) => {
state.shutdownVms[await this.getField('VM', ref, 'uuid')] = isPaused
try {
await this.callAsync('VM.clean_shutdown', ref)
} catch (error) {
warn('SR_enableMaintenanceMode, VM clean shutdown', { error })
await this.callAsync('VM.hard_shutdown', ref)
}
try {
await this.callAsync('VM.clean_shutdown', ref)
} catch (error) {
warn('SR_enableMaintenanceMode, VM clean shutdown', { error })
await this.callAsync('VM.hard_shutdown', ref)
}
$defer.onFailure.call(this, 'callAsync', 'VM.start', ref, isPaused, true)
})
$defer.onFailure.call(this, 'callAsync', 'VM.start', ref, isPaused, true)
},
{ stopOnError: false }
)
state.unpluggedPbds = []
await asyncMapSettled(await this.getField('SR', ref, 'PBDs'), async ref => {
if (await this.getField('PBD', ref, 'currently_attached')) {
state.unpluggedPbds.push(await this.getField('PBD', ref, 'uuid'))
await pEach(
await this.getField('SR', ref, 'PBDs'),
async ref => {
if (await this.getField('PBD', ref, 'currently_attached')) {
state.unpluggedPbds.push(await this.getField('PBD', ref, 'uuid'))
await this.callAsync('PBD.unplug', ref)
await this.callAsync('PBD.unplug', ref)
$defer.onFailure.call(this, 'callAsync', 'PBD.plug', ref)
}
})
$defer.onFailure.call(this, 'callAsync', 'PBD.plug', ref)
}
},
{ stopOnError: false }
)
await this.setFieldEntry('SR', ref, 'other_config', OC_MAINTENANCE, JSON.stringify(state))
}
@@ -125,7 +133,7 @@ class Sr {
const errors = []
await asyncMap(state.unpluggedPbds, async uuid => {
await pEach(state.unpluggedPbds, async uuid => {
try {
await this.callAsync('PBD.plug', await this.call('PBD.get_by_uuid', uuid))
} catch (error) {
@@ -133,7 +141,7 @@ class Sr {
}
})
await asyncMap(Object.entries(state.shutdownVms), async ([uuid, isPaused]) => {
await pEach(Object.entries(state.shutdownVms), async ([uuid, isPaused]) => {
try {
await this.callAsync('VM.start', await this.call('VM.get_by_uuid', uuid), isPaused, true)
} catch (error) {

View File

@@ -48,7 +48,7 @@ const cleanBiosStrings = biosStrings => {
async function listNobakVbds(xapi, vbdRefs) {
const vbds = []
await asyncMap(vbdRefs, async vbdRef => {
await pEach(vbdRefs, async vbdRef => {
const vbd = await xapi.getRecord('VBD', vbdRef)
if (
vbd.type === 'Disk' &&
@@ -152,7 +152,7 @@ class Vm {
if (ignoreNobakVdis) {
if (vm.power_state === 'Halted') {
await asyncMap(await listNobakVbds(this, vm.VBDs), async vbd => {
await pEach(await listNobakVbds(this, vm.VBDs), async vbd => {
await this.VBD_destroy(vbd.$ref)
$defer.call(this, 'VBD_create', vbd)
})
@@ -181,9 +181,7 @@ class Vm {
)
if (destroyNobakVdis) {
await asyncMap(await listNobakVbds(this, await this.getField('VM', ref, 'VBDs')), vbd =>
this.VDI_destroy(vbd.VDI)
)
await pEach(await listNobakVbds(this, await this.getField('VM', ref, 'VBDs')), vbd => this.VDI_destroy(vbd.VDI))
}
return ref
@@ -385,7 +383,7 @@ class Vm {
await this.call('VM.destroy', vmRef)
await Promise.all([
asyncMap(vm.snapshots, snapshotRef =>
pEach(vm.snapshots, snapshotRef =>
this.VM_destroy(snapshotRef).catch(error => {
warn('VM_destroy: failed to destroy snapshot', {
error,
@@ -395,7 +393,7 @@ class Vm {
})
),
deleteDisks &&
asyncMap(disks, async vdiRef => {
pEach(disks, async vdiRef => {
try {
// Dont destroy if attached to other (non control domain) VMs
for (const vbdRef of await this.getField('VDI', vdiRef, 'VBDs')) {
@@ -551,7 +549,7 @@ class Vm {
// vm.VUSBs can be undefined (e.g. on XS 7.0.0)
const vusbs = vm.VUSBs
if (vusbs !== undefined) {
await asyncMap(vusbs, async ref => {
await pEach(vusbs, async ref => {
const vusb = await this.getRecord('VUSB', ref)
await vusb.$call('destroy')
$defer.call(this, 'call', 'VUSB.create', vusb.VM, vusb.USB_group, vusb.other_config)
@@ -562,7 +560,7 @@ class Vm {
let destroyNobakVdis = false
if (ignoreNobakVdis) {
if (isHalted) {
await asyncMap(await listNobakVbds(this, vm.VBDs), async vbd => {
await pEach(await listNobakVbds(this, vm.VBDs), async vbd => {
await this.VBD_destroy(vbd.$ref)
$defer.call(this, 'VBD_create', vbd)
})
@@ -659,9 +657,7 @@ class Vm {
)
if (destroyNobakVdis) {
await asyncMap(await listNobakVbds(this, await this.getField('VM', ref, 'VBDs')), vbd =>
this.VDI_destroy(vbd.VDI)
)
await pEach(await listNobakVbds(this, await this.getField('VM', ref, 'VBDs')), vbd => this.VDI_destroy(vbd.VDI))
}
return ref

View File

@@ -64,7 +64,7 @@ const VhdSynthetic = class VhdSynthetic extends VhdAbstract {
}
async readBlockAllocationTable() {
await asyncMap(this.#vhds, vhd => vhd.readBlockAllocationTable())
await pEach(this.#vhds, vhd => vhd.readBlockAllocationTable())
}
containsBlock(blockId) {
@@ -74,7 +74,7 @@ const VhdSynthetic = class VhdSynthetic extends VhdAbstract {
async readHeaderAndFooter() {
const vhds = this.#vhds
await asyncMap(vhds, vhd => vhd.readHeaderAndFooter())
await pEach(vhds, vhd => vhd.readHeaderAndFooter())
for (let i = 0, n = vhds.length - 1; i < n; ++i) {
const child = vhds[i]

View File

@@ -4,13 +4,13 @@ const { createLogger } = require('@xen-orchestra/log')
const { parseVhdStream } = require('./parseVhdStream.js')
const { VhdDirectory } = require('./Vhd/VhdDirectory.js')
const { Disposable } = require('promise-toolbox')
const { asyncEach } = require('@vates/async-each')
const { pEach } = require('@vates/async-each')
const { warn } = createLogger('vhd-lib:createVhdDirectoryFromStream')
const buildVhd = Disposable.wrap(async function* (handler, path, inputStream, { concurrency, compression }) {
const vhd = yield VhdDirectory.create(handler, path, { compression })
await asyncEach(
await pEach(
parseVhdStream(inputStream),
async function (item) {
switch (item.type) {

View File

@@ -12,7 +12,7 @@ const { openVhd } = require('./openVhd')
const { basename, dirname } = require('path')
const { DISK_TYPES } = require('./_constants')
const { Disposable } = require('promise-toolbox')
const { asyncEach } = require('@vates/async-each')
const { pEach } = require('@vates/async-each')
const { VhdAbstract } = require('./Vhd/VhdAbstract')
const { VhdDirectory } = require('./Vhd/VhdDirectory')
const { VhdSynthetic } = require('./Vhd/VhdSynthetic')
@@ -63,7 +63,7 @@ function cleanupVhds(handler, parent, children, { logInfo = noop, remove = false
return Promise.all([
VhdAbstract.rename(handler, parent, mergeTargetChild),
asyncMap(children, child => {
pEach(children, child => {
logInfo(`the VHD child is already merged`, { child })
if (remove) {
logInfo(`deleting merged VHD child`, { child })
@@ -167,7 +167,7 @@ module.exports.mergeVhd = limitConcurrency(2)(async function merge(
let counter = 0
const mergeStateWriter = makeThrottledWriter(parentHandler, mergeStatePath, 10e3)
await asyncEach(
await pEach(
toMerge,
async blockId => {
merging.add(blockId)

View File

@@ -152,7 +152,7 @@ export async function installPatches({ pool, patches, hosts }) {
await hosts.shift().$restartAgent()
}
await asyncMap(hosts, host => host.$restartAgent())
await pEach(hosts, host => host.$restartAgent())
}
installPatches.params = {

View File

@@ -67,7 +67,7 @@ export async function destroy({ sr }) {
const config = xapi.xo.getData(sr, 'xosan_config')
// we simply forget because the hosted disks are being destroyed with the VMs
await xapi.forgetSr(sr._xapiId)
await asyncMapSettled(config.nodes, node => this.getXapiObject(node.vm.id).$destroy())
await pEach(config.nodes, node => this.getXapiObject(node.vm.id).$destroy(), { stopOnError: false })
await xapi.deleteNetwork(config.network)
if (sr.SR_type === 'xosan') {
await this.unbindXosanLicense({ srId: sr.id })

View File

@@ -374,13 +374,17 @@ const delete_ = defer(async function (
$defer.onFailure(() => this.setVmResourceSet(vm._xapiId, resourceSet, true)::ignoreErrors())
}
await asyncMapSettled(vm.snapshots, async id => {
const { resourceSet } = this.getObject(id)
if (resourceSet !== undefined) {
await this.setVmResourceSet(id, null)
$defer.onFailure(() => this.setVmResourceSet(id, resourceSet, true))
}
})
await pEach(
vm.snapshots,
async id => {
const { resourceSet } = this.getObject(id)
if (resourceSet !== undefined) {
await this.setVmResourceSet(id, null)
$defer.onFailure(() => this.setVmResourceSet(id, resourceSet, true))
}
},
{ stopOnError: false }
)
return xapi.VM_destroy(vm._xapiRef, { deleteDisks, force, forceDeleteDefaultTemplate })
})

View File

@@ -401,19 +401,25 @@ const createNetworkAndInsertHosts = defer(async function ($defer, xapi, pif, vla
pif,
address: networkPrefix + hostIpLastNumber++,
}))
await asyncMapSettled(addresses, addressAndPif => reconfigurePifIP(xapi, addressAndPif.pif, addressAndPif.address))
await pEach(addresses, addressAndPif => reconfigurePifIP(xapi, addressAndPif.pif, addressAndPif.address), {
stopOnError: false,
})
const master = xapi.pool.$master
const otherAddresses = addresses.filter(addr => addr.pif.$host !== master)
await asyncMapSettled(otherAddresses, async address => {
const result = await callPlugin(xapi, master, 'run_ping', {
address: address.address,
})
if (result.exit !== 0) {
throw invalidParameters(
`Could not ping ${master.name_label}->${address.pif.$host.name_label} (${address.address}) \n${result.stdout}`
)
}
})
await pEach(
otherAddresses,
async address => {
const result = await callPlugin(xapi, master, 'run_ping', {
address: address.address,
})
if (result.exit !== 0) {
throw invalidParameters(
`Could not ping ${master.name_label}->${address.pif.$host.name_label} (${address.address}) \n${result.stdout}`
)
}
},
{ stopOnError: false }
)
return xosanNetwork
})
@@ -441,10 +447,14 @@ async function getOrCreateSshKey(xapi) {
}
const _probePoolAndWaitForPresence = defer(async function ($defer, glusterEndpoint, addresses) {
await asyncMapSettled(addresses, async address => {
await glusterCmd(glusterEndpoint, 'peer probe ' + address)
$defer.onFailure(() => glusterCmd(glusterEndpoint, 'peer detach ' + address, true))
})
await pEach(
addresses,
async address => {
await glusterCmd(glusterEndpoint, 'peer probe ' + address)
$defer.onFailure(() => glusterCmd(glusterEndpoint, 'peer detach ' + address, true))
},
{ stopOnError: false }
)
function shouldRetry(peers) {
for (const peer of peers) {
@@ -1129,11 +1139,11 @@ export const removeBricks = defer(async function ($defer, { xosansr, bricks }) {
const dict = _getIPToVMDict(xapi, xosansr.id)
const brickVMs = map(bricks, b => dict[b])
await glusterCmd(glusterEndpoint, `volume remove-brick xosan ${bricks.join(' ')} force`)
await asyncMapSettled(ips, ip => glusterCmd(glusterEndpoint, 'peer detach ' + ip, true))
await pEach(ips, ip => glusterCmd(glusterEndpoint, 'peer detach ' + ip, true), { stopOnError: false })
remove(data.nodes, node => ips.includes(node.vm.ip))
await xapi.xo.setData(xosansr.id, 'xosan_config', data)
await xapi.callAsync('SR.scan', xapi.getObject(xosansr._xapiId).$ref)
await asyncMapSettled(brickVMs, vm => xapi.VM_destroy(vm.vm.$ref, true))
await pEach(brickVMs, vm => xapi.VM_destroy(vm.vm.$ref, true), { stopOnError: false })
} finally {
delete CURRENT_POOL_OPERATIONS[xapi.pool.$id]
}

View File

@@ -81,22 +81,31 @@ export default class Redis extends Collection {
await redis.sadd(`${prefix}::indexes`, indexes)
await asyncMapSettled(indexes, index =>
redis.keys(`${prefix}_${index}:*`).then(keys => keys.length !== 0 && redis.del(keys))
await pEach(
indexes,
index => redis.keys(`${prefix}_${index}:*`).then(keys => keys.length !== 0 && redis.del(keys)),
{ stopOnError: false }
)
const idsIndex = `${prefix}_ids`
await asyncMapSettled(redis.smembers(idsIndex), id =>
redis.hgetall(`${prefix}:${id}`).then(values =>
values == null
? redis.srem(idsIndex, id) // entry no longer exists
: asyncMapSettled(indexes, index => {
const value = values[index]
if (value !== undefined) {
return redis.sadd(`${prefix}_${index}:${String(value).toLowerCase()}`, id)
}
})
)
await pEach(
redis.smembers(idsIndex),
id =>
redis.hgetall(`${prefix}:${id}`).then(values =>
values == null
? redis.srem(idsIndex, id) // entry no longer exists
: pEach(
indexes,
index => {
const value = values[index]
if (value !== undefined) {
return redis.sadd(`${prefix}_${index}:${String(value).toLowerCase()}`, id)
}
},
{ stopOnError: false }
)
),
{ stopOnError: false }
)
}
@@ -146,12 +155,16 @@ export default class Redis extends Collection {
// remove the previous values from indexes
if (indexes.length !== 0) {
const previous = await redis.hgetall(`${prefix}:${id}`)
await asyncMapSettled(indexes, index => {
const value = previous[index]
if (value !== undefined) {
return redis.srem(`${prefix}_${index}:${String(value).toLowerCase()}`, id)
}
})
await pEach(
indexes,
index => {
const value = previous[index]
if (value !== undefined) {
return redis.srem(`${prefix}_${index}:${String(value).toLowerCase()}`, id)
}
},
{ stopOnError: false }
)
}
}
@@ -230,17 +243,24 @@ export default class Redis extends Collection {
if (indexes.length !== 0) {
promise = Promise.all([
promise,
asyncMapSettled(ids, id =>
redis.hgetall(`${prefix}:${id}`).then(
values =>
values != null &&
asyncMapSettled(indexes, index => {
const value = values[index]
if (value !== undefined) {
return redis.srem(`${prefix}_${index}:${String(value).toLowerCase()}`, id)
}
})
)
pEach(
ids,
id =>
redis.hgetall(`${prefix}:${id}`).then(
values =>
values != null &&
pEach(
indexes,
index => {
const value = values[index]
if (value !== undefined) {
return redis.srem(`${prefix}_${index}:${String(value).toLowerCase()}`, id)
}
},
{ stopOnError: false }
)
),
{ stopOnError: false }
),
])
}

View File

@@ -369,7 +369,7 @@ async function registerPluginsInPath(path, prefix) {
throw error
})
await asyncMap(files, name => {
await pEach(files, name => {
if (name.startsWith(prefix)) {
return registerPluginWrapper.call(this, `${path}/${name}`, name.slice(prefix.length))
}

View File

@@ -42,6 +42,7 @@ import {
parseDateTime,
prepareXapiParam,
} from './utils.mjs'
import { pEach } from '@vates/async-each'
const log = createLogger('xo:xapi')
@@ -171,7 +172,7 @@ export default class Xapi extends XapiBase {
const host = this.getObject(hostId)
const vms = host.$resident_VMs
log.debug(`Emergency shutdown: ${host.name_label}`)
await asyncMap(vms, vm => {
await pEach(vms, vm => {
if (!vm.is_control_domain) {
return ignoreErrors.call(this.callAsync('VM.suspend', vm.$ref))
}
@@ -261,11 +262,15 @@ export default class Xapi extends XapiBase {
// from host to another. It only works when a shared SR is present
// in the host. For this reason we chose to show a warning instead.
const pluggedPbds = host.$PBDs.filter(pbd => pbd.currently_attached)
await asyncMapSettled(pluggedPbds, async pbd => {
const ref = pbd.$ref
await this.unplugPbd(ref)
$defer(() => this.plugPbd(ref))
})
await pEach(
pluggedPbds,
async pbd => {
const ref = pbd.$ref
await this.unplugPbd(ref)
$defer(() => this.plugPbd(ref))
},
{ stopOnError: false }
)
return host.update_other_config(
multipathing
@@ -702,7 +707,9 @@ export default class Xapi extends XapiBase {
$defer.onFailure(() => this.VM_destroy(vm.$ref))
// Disable start and change the VM name label during import.
await Promise.all([
asyncMapSettled(['start', 'start_on'], op => vm.update_blocked_operations(op, 'OVA import in progress...')),
pEach(['start', 'start_on'], op => vm.update_blocked_operations(op, 'OVA import in progress...'), {
stopOnError: false,
}),
vm.set_name_label(`[Importing...] ${nameLabel}`),
])
@@ -879,7 +886,7 @@ export default class Xapi extends XapiBase {
}
throw new AggregateError(
await asyncMap(await this.call('host.get_all'), async hostRef => {
await pEach(await this.call('host.get_all'), async hostRef => {
const hostNameLabel = await this.call('host.get_name_label', hostRef)
try {
await this.call('VM.assert_can_boot_here', vmRef, hostRef)
@@ -1008,13 +1015,17 @@ export default class Xapi extends XapiBase {
throw error
}
const newVdi = await this.barrier(await this.callAsync('VDI.copy', vdi.$ref, sr.$ref).then(extractOpaqueRef))
await asyncMapSettled(vdi.$VBDs, async vbd => {
await this.call('VBD.destroy', vbd.$ref)
await this.VBD_create({
...vbd,
VDI: newVdi.$ref,
})
})
await pEach(
vdi.$VBDs,
async vbd => {
await this.call('VBD.destroy', vbd.$ref)
await this.VBD_create({
...vbd,
VDI: newVdi.$ref,
})
},
{ stopOnError: false }
)
await vdi.$destroy()
return newVdi
@@ -1195,7 +1206,7 @@ export default class Xapi extends XapiBase {
})
})
await asyncMapSettled(pifsByHost, pifs => this.call('Bond.create', network.$ref, pifs, '', bondMode))
await pEach(pifsByHost, pifs => this.call('Bond.create', network.$ref, pifs, '', bondMode), { stopOnError: false })
return network
}

View File

@@ -194,32 +194,40 @@ export default class BackupNg {
const remotes = {}
const xapis = {}
await waitAll([
asyncMapSettled(remoteIds, async id => {
const remote = await app.getRemoteWithCredentials(id)
if (remote.proxy !== proxyId) {
throw new Error(
proxyId === undefined
? 'The remote must not be linked to a proxy'
: `The remote ${remote.name} must be linked to the proxy ${proxyId}`
)
}
pEach(
remoteIds,
async id => {
const remote = await app.getRemoteWithCredentials(id)
if (remote.proxy !== proxyId) {
throw new Error(
proxyId === undefined
? 'The remote must not be linked to a proxy'
: `The remote ${remote.name} must be linked to the proxy ${proxyId}`
)
}
remotes[id] = remote
}),
asyncMapSettled([...servers], async id => {
const { allowUnauthorized, password, username } = await app.getXenServer(id)
remotes[id] = remote
},
{ stopOnError: false }
),
pEach(
servers,
async id => {
const { allowUnauthorized, password, username } = await app.getXenServer(id)
const xapi = app.getAllXapis()[id]
const xapi = app.getAllXapis()[id]
xapis[id] = {
allowUnauthorized,
credentials: {
username,
password,
},
url: await xapi.getHostBackupUrl(xapi.pool.$master),
}
}),
xapis[id] = {
allowUnauthorized,
credentials: {
username,
password,
},
url: await xapi.getHostBackupUrl(xapi.pool.$master),
}
},
{ stopOnError: false }
),
])
const params = {
@@ -295,12 +303,16 @@ export default class BackupNg {
if (schedules !== undefined) {
const { id, settings } = job
const tmpIds = Object.keys(schedules)
await asyncMapSettled(tmpIds, async tmpId => {
const schedule = schedules[tmpId]
schedule.jobId = id
settings[(await app.createSchedule(schedule)).id] = settings[tmpId]
delete settings[tmpId]
})
await pEach(
tmpIds,
async tmpId => {
const schedule = schedules[tmpId]
schedule.jobId = id
settings[(await app.createSchedule(schedule)).id] = settings[tmpId]
delete settings[tmpId]
},
{ stopOnError: false }
)
await app.updateJob({ id, settings })
}
@@ -355,11 +367,15 @@ export default class BackupNg {
const [schedules] = await Promise.all([app.getAllSchedules(), app.getJob(id, 'backup')])
await Promise.all([
app.removeJob(id),
asyncMapSettled(schedules, schedule => {
if (schedule.id === id) {
app.deleteSchedule(schedule.id)
}
}),
pEach(
schedules,
schedule => {
if (schedule.id === id) {
app.deleteSchedule(schedule.id)
}
},
{ stopOnError: false }
),
])
}
@@ -370,23 +386,27 @@ export default class BackupNg {
async deleteVmBackupsNg(ids) {
const app = this._app
const backupsByRemote = groupBy(ids.map(parseVmBackupId), 'remoteId')
await asyncMapSettled(Object.entries(backupsByRemote), async ([remoteId, backups]) => {
const filenames = backups.map(_ => _.metadataFilename)
const remote = await app.getRemoteWithCredentials(remoteId)
if (remote.proxy !== undefined) {
await app.callProxyMethod(remote.proxy, 'backup.deleteVmBackups', {
filenames,
remote: {
url: remote.url,
options: remote.options,
},
})
} else {
await Disposable.use(app.getBackupsRemoteAdapter(remote), adapter => adapter.deleteVmBackups(filenames))
}
await pEach(
Object.entries(backupsByRemote),
async ([remoteId, backups]) => {
const filenames = backups.map(_ => _.metadataFilename)
const remote = await app.getRemoteWithCredentials(remoteId)
if (remote.proxy !== undefined) {
await app.callProxyMethod(remote.proxy, 'backup.deleteVmBackups', {
filenames,
remote: {
url: remote.url,
options: remote.options,
},
})
} else {
await Disposable.use(app.getBackupsRemoteAdapter(remote), adapter => adapter.deleteVmBackups(filenames))
}
this._listVmBackupsOnRemote(REMOVE_CACHE_ENTRY, remoteId)
})
this._listVmBackupsOnRemote(REMOVE_CACHE_ENTRY, remoteId)
},
{ stopOnError: false }
)
}
async importVmBackupNg(id, srId, settings) {

View File

@@ -58,49 +58,53 @@ export default async function executeJobCall({ app, job, logger, runJobId, sched
timezone: schedule !== undefined ? schedule.timezone : undefined,
}
await asyncMapSettled(paramsFlatVector, params => {
const runCallId = logger.notice(`Starting ${job.method} call. (${job.id})`, {
event: 'jobCall.start',
runJobId,
method: job.method,
params,
})
await pEach(
paramsFlatVector,
params => {
const runCallId = logger.notice(`Starting ${job.method} call. (${job.id})`, {
event: 'jobCall.start',
runJobId,
method: job.method,
params,
})
const call = (execStatus.calls[runCallId] = {
method: job.method,
params,
start: Date.now(),
})
let promise = app.callApiMethod(connection, job.method, Object.assign({}, params))
if (job.timeout) {
promise = promise::timeout(job.timeout)
}
return promise.then(
value => {
logger.notice(`Call ${job.method} (${runCallId}) is a success. (${job.id})`, {
event: 'jobCall.end',
runJobId,
runCallId,
returnedValue: value,
})
call.returnedValue = value
call.end = Date.now()
},
reason => {
logger.notice(`Call ${job.method} (${runCallId}) has failed. (${job.id})`, {
event: 'jobCall.end',
runJobId,
runCallId,
error: serializeError(reason),
})
call.error = reason
call.end = Date.now()
const call = (execStatus.calls[runCallId] = {
method: job.method,
params,
start: Date.now(),
})
let promise = app.callApiMethod(connection, job.method, Object.assign({}, params))
if (job.timeout) {
promise = promise::timeout(job.timeout)
}
)
})
return promise.then(
value => {
logger.notice(`Call ${job.method} (${runCallId}) is a success. (${job.id})`, {
event: 'jobCall.end',
runJobId,
runCallId,
returnedValue: value,
})
call.returnedValue = value
call.end = Date.now()
},
reason => {
logger.notice(`Call ${job.method} (${runCallId}) has failed. (${job.id})`, {
event: 'jobCall.end',
runJobId,
runCallId,
error: serializeError(reason),
})
call.error = reason
call.end = Date.now()
}
)
},
{ stopOnError: false }
)
execStatus.end = Date.now()

View File

@@ -96,23 +96,27 @@ export default class Jobs {
)
})
// it sends a report for the interrupted backup jobs
app.on('plugins:registered', () =>
asyncMapSettled(this._jobs.get(), job => {
// only the interrupted backup jobs have the runId property
if (job.runId === undefined) {
return
}
app.emit(
'job:terminated',
// This cast can be removed after merging the PR: https://github.com/vatesfr/xen-orchestra/pull/3209
String(job.runId),
{
type: job.type,
app.on('plugins:registered', async () =>
pEach(
await this._jobs.get(),
job => {
// only the interrupted backup jobs have the runId property
if (job.runId === undefined) {
return
}
)
return this.updateJob({ id: job.id, runId: null })
})
app.emit(
'job:terminated',
// This cast can be removed after merging the PR: https://github.com/vatesfr/xen-orchestra/pull/3209
String(job.runId),
{
type: job.type,
}
)
return this.updateJob({ id: job.id, runId: null })
},
{ stopOnError: false }
)
)
}

View File

@@ -69,25 +69,33 @@ export default class metadataBackup {
const remotes = {}
const xapis = {}
await waitAll([
asyncMapSettled(remoteIds, async id => {
const remote = await app.getRemoteWithCredentials(id)
if (remote.proxy !== proxyId) {
throw new Error(`The remote ${remote.name} must be linked to the proxy ${proxyId}`)
}
pEach(
remoteIds,
async id => {
const remote = await app.getRemoteWithCredentials(id)
if (remote.proxy !== proxyId) {
throw new Error(`The remote ${remote.name} must be linked to the proxy ${proxyId}`)
}
remotes[id] = remote
}),
asyncMapSettled([...servers], async id => {
const { allowUnauthorized, host, password, username } = await app.getXenServer(id)
xapis[id] = {
allowUnauthorized,
credentials: {
username,
password,
},
url: host,
}
}),
remotes[id] = remote
},
{ stopOnError: false }
),
pEach(
servers,
async id => {
const { allowUnauthorized, host, password, username } = await app.getXenServer(id)
xapis[id] = {
allowUnauthorized,
credentials: {
username,
password,
},
url: host,
}
},
{ stopOnError: false }
),
])
const params = {
@@ -157,14 +165,18 @@ export default class metadataBackup {
})
const { id: jobId, settings } = job
await asyncMapSettled(schedules, async (schedule, tmpId) => {
const { id: scheduleId } = await app.createSchedule({
...schedule,
jobId,
})
settings[scheduleId] = settings[tmpId]
delete settings[tmpId]
})
await pEach(
schedules,
async (schedule, tmpId) => {
const { id: scheduleId } = await app.createSchedule({
...schedule,
jobId,
})
settings[scheduleId] = settings[tmpId]
delete settings[tmpId]
},
{ stopOnError: false }
)
await app.updateJob({ id: jobId, settings })
return job
@@ -180,11 +192,15 @@ export default class metadataBackup {
await Promise.all([
app.removeJob(id),
asyncMapSettled(schedules, schedule => {
if (schedule.id === id) {
return app.deleteSchedule(id)
}
}),
pEach(
schedules,
schedule => {
if (schedule.id === id) {
return app.deleteSchedule(id)
}
},
{ stopOnError: false }
),
])
}

View File

@@ -120,27 +120,31 @@ export default class {
async getAllRemotesInfo() {
const remotesInfo = this._remotesInfo
await asyncMapSettled(this._remotes.get(), async remote => {
if (!remote.enabled) {
return
}
await pEach(
this._remotes.get(),
async remote => {
if (!remote.enabled) {
return
}
const promise =
remote.proxy !== undefined
? this._app.callProxyMethod(remote.proxy, 'remote.getInfo', {
remote,
})
: this.getRemoteHandler(remote.id).then(handler => handler.getInfo())
const promise =
remote.proxy !== undefined
? this._app.callProxyMethod(remote.proxy, 'remote.getInfo', {
remote,
})
: this.getRemoteHandler(remote.id).then(handler => handler.getInfo())
try {
await timeout.call(
promise.then(info => {
remotesInfo[remote.id] = info
}),
5e3
)
} catch (_) {}
})
try {
await timeout.call(
promise.then(info => {
remotesInfo[remote.id] = info
}),
5e3
)
} catch (_) {}
},
{ stopOnError: false }
)
return remotesInfo
}

View File

@@ -426,6 +426,6 @@ export default class {
}
const { subjects } = await this.getResourceSet(resourceSetId)
await asyncMapSettled(subjects, subject => this._app.addAcl(subject, vmId, 'admin'))
await pEach(subjects, subject => this._app.addAcl(subject, vmId, 'admin'), { stopOnError: false })
}
}

View File

@@ -53,10 +53,14 @@ export default class Scheduling {
'schedules',
() => db.get(),
schedules =>
asyncMapSettled(schedules, async schedule => {
await db.update(normalize(schedule))
this._start(schedule.id)
}),
pEach(
schedules,
async schedule => {
await db.update(normalize(schedule))
this._start(schedule.id)
},
{ stopOnError: false }
),
['jobs']
)

View File

@@ -1,7 +1,7 @@
import assert from 'assert'
import findKey from 'lodash/findKey.js'
import pick from 'lodash/pick.js'
import { asyncEach } from '@vates/async-each'
import { pEach } from '@vates/async-each'
import { BaseError } from 'make-error'
import { createLogger } from '@xen-orchestra/log'
import { fibonacci } from 'iterable-backoff'
@@ -391,7 +391,7 @@ export default class {
const { pool } = xapi
try {
await asyncEach(Object.entries(pool.other_config), ([key, value]) => {
await pEach(Object.entries(pool.other_config), ([key, value]) => {
if (key.startsWith(poolMarkingPrefix)) {
const { lastConnected } = JSON.parse(value)
if (now - lastConnected > poolMarkingMaxAge) {