Compare commits

...

1 Commits

Author SHA1 Message Date
Julien Fontanet
4c991c1a57 WiP 2022-07-04 17:41:46 +02:00
40 changed files with 510 additions and 388 deletions

View File

@@ -1,4 +1,4 @@
### `asyncEach(iterable, iteratee, [opts])` ### `pEach(iterable, iteratee, [opts])`
Executes `iteratee` in order for each value yielded by `iterable`. Executes `iteratee` in order for each value yielded by `iterable`.
@@ -6,7 +6,7 @@ Returns a promise wich rejects as soon as a call to `iteratee` throws or a promi
`iterable` must be an iterable or async iterable. `iterable` must be an iterable or async iterable.
`iteratee` is called with the same `this` value as `asyncEach`, and with the following arguments: `iteratee` is called with the same `this` value as `pEach`, and with the following arguments:
- `value`: the value yielded by `iterable` - `value`: the value yielded by `iterable`
- `index`: the 0-based index for this value - `index`: the 0-based index for this value
@@ -19,10 +19,10 @@ Returns a promise wich rejects as soon as a call to `iteratee` throws or a promi
- `stopOnError`: wether to stop iteration of first error, or wait for all calls to finish and throw an `AggregateError`, defaults to `true` - `stopOnError`: wether to stop iteration of first error, or wait for all calls to finish and throw an `AggregateError`, defaults to `true`
```js ```js
import { asyncEach } from '@vates/async-each' import { pEach } from '@vates/async-each'
const contents = [] const contents = []
await asyncEach( await pEach(
['foo.txt', 'bar.txt', 'baz.txt'], ['foo.txt', 'bar.txt', 'baz.txt'],
async function (filename, i) { async function (filename, i) {
contents[i] = await readFile(filename) contents[i] = await readFile(filename)

View File

@@ -16,7 +16,7 @@ Installation of the [npm package](https://npmjs.org/package/@vates/async-each):
## Usage ## Usage
### `asyncEach(iterable, iteratee, [opts])` ### `pEach(iterable, iteratee, [opts])`
Executes `iteratee` in order for each value yielded by `iterable`. Executes `iteratee` in order for each value yielded by `iterable`.
@@ -24,7 +24,7 @@ Returns a promise wich rejects as soon as a call to `iteratee` throws or a promi
`iterable` must be an iterable or async iterable. `iterable` must be an iterable or async iterable.
`iteratee` is called with the same `this` value as `asyncEach`, and with the following arguments: `iteratee` is called with the same `this` value as `pEach`, and with the following arguments:
- `value`: the value yielded by `iterable` - `value`: the value yielded by `iterable`
- `index`: the 0-based index for this value - `index`: the 0-based index for this value
@@ -37,10 +37,10 @@ Returns a promise wich rejects as soon as a call to `iteratee` throws or a promi
- `stopOnError`: wether to stop iteration of first error, or wait for all calls to finish and throw an `AggregateError`, defaults to `true` - `stopOnError`: wether to stop iteration of first error, or wait for all calls to finish and throw an `AggregateError`, defaults to `true`
```js ```js
import { asyncEach } from '@vates/async-each' import { pEach } from '@vates/async-each'
const contents = [] const contents = []
await asyncEach( await pEach(
['foo.txt', 'bar.txt', 'baz.txt'], ['foo.txt', 'bar.txt', 'baz.txt'],
async function (filename, i) { async function (filename, i) {
contents[i] = await readFile(filename) contents[i] = await readFile(filename)

View File

@@ -15,7 +15,7 @@ class AggregateError extends Error {
* @param {(item: Item, index: number, iterable: Iterable<Item>) => Promise<void>} iteratee * @param {(item: Item, index: number, iterable: Iterable<Item>) => Promise<void>} iteratee
* @returns {Promise<void>} * @returns {Promise<void>}
*/ */
exports.asyncEach = function asyncEach(iterable, iteratee, { concurrency = 1, signal, stopOnError = true } = {}) { exports.pEach = function pEach(iterable, iteratee, { concurrency = 10, signal, stopOnError = true } = {}) {
if (concurrency === 0) { if (concurrency === 0) {
concurrency = Infinity concurrency = Infinity
} }
@@ -28,7 +28,7 @@ exports.asyncEach = function asyncEach(iterable, iteratee, { concurrency = 1, si
let onAbort let onAbort
if (signal !== undefined) { if (signal !== undefined) {
onAbort = () => { onAbort = () => {
onRejectedWrapper(new Error('asyncEach aborted')) onRejectedWrapper(new Error('pEach aborted'))
} }
signal.addEventListener('abort', onAbort) signal.addEventListener('abort', onAbort)
} }
@@ -51,11 +51,10 @@ exports.asyncEach = function asyncEach(iterable, iteratee, { concurrency = 1, si
clean() clean()
})(reject) })(reject)
let onFulfilled = value => { let onFulfilled = () => {
--running --running
next() next()
} }
const onFulfilledWrapper = value => onFulfilled(value)
let onRejected = stopOnError let onRejected = stopOnError
? reject ? reject
@@ -66,6 +65,14 @@ exports.asyncEach = function asyncEach(iterable, iteratee, { concurrency = 1, si
} }
const onRejectedWrapper = reason => onRejected(reason) const onRejectedWrapper = reason => onRejected(reason)
const run = async (value, index) => {
try {
onFulfilled(await iteratee.call(this, value, index++))
} catch (error) {
onRejected(error)
}
}
let nextIsRunning = false let nextIsRunning = false
let next = async () => { let next = async () => {
if (nextIsRunning) { if (nextIsRunning) {
@@ -86,17 +93,7 @@ exports.asyncEach = function asyncEach(iterable, iteratee, { concurrency = 1, si
} }
} else { } else {
++running ++running
try { run(cursor.value, index++, iterable)
const result = iteratee.call(this, cursor.value, index++, iterable)
let then
if (result != null && typeof result === 'object' && typeof (then = result.then) === 'function') {
then.call(result, onFulfilledWrapper, onRejectedWrapper)
} else {
onFulfilled(result)
}
} catch (error) {
onRejected(error)
}
} }
nextIsRunning = false nextIsRunning = false
return next() return next()

View File

@@ -2,7 +2,7 @@
/* eslint-env jest */ /* eslint-env jest */
const { asyncEach } = require('./') const { pEach } = require('./')
const randomDelay = (max = 10) => const randomDelay = (max = 10) =>
new Promise(resolve => { new Promise(resolve => {
@@ -14,7 +14,7 @@ const rejectionOf = p =>
p.then(reject, resolve) p.then(reject, resolve)
}) })
describe('asyncEach', () => { describe('pEach', () => {
const thisArg = 'qux' const thisArg = 'qux'
const values = ['foo', 'bar', 'baz'] const values = ['foo', 'bar', 'baz']
@@ -36,7 +36,7 @@ describe('asyncEach', () => {
it('works', async () => { it('works', async () => {
const iteratee = jest.fn(async () => {}) const iteratee = jest.fn(async () => {})
await asyncEach.call(thisArg, iterable, iteratee) await pEach.call(thisArg, iterable, iteratee)
expect(iteratee.mock.instances).toEqual(Array.from(values, () => thisArg)) expect(iteratee.mock.instances).toEqual(Array.from(values, () => thisArg))
expect(iteratee.mock.calls).toEqual(Array.from(values, (value, index) => [value, index, iterable])) expect(iteratee.mock.calls).toEqual(Array.from(values, (value, index) => [value, index, iterable]))
@@ -45,7 +45,7 @@ describe('asyncEach', () => {
it('respects a concurrency of ' + concurrency, async () => { it('respects a concurrency of ' + concurrency, async () => {
let running = 0 let running = 0
await asyncEach( await pEach(
values, values,
async () => { async () => {
++running ++running
@@ -66,7 +66,7 @@ describe('asyncEach', () => {
} }
}) })
expect(await rejectionOf(asyncEach(iterable, iteratee, { stopOnError: true }))).toBe(error) expect(await rejectionOf(pEach(iterable, iteratee, { stopOnError: true }))).toBe(error)
expect(iteratee).toHaveBeenCalledTimes(2) expect(iteratee).toHaveBeenCalledTimes(2)
}) })
@@ -78,7 +78,7 @@ describe('asyncEach', () => {
throw error throw error
}) })
const error = await rejectionOf(asyncEach(iterable, iteratee, { stopOnError: false })) const error = await rejectionOf(pEach(iterable, iteratee, { stopOnError: false }))
expect(error.errors).toEqual(errors) expect(error.errors).toEqual(errors)
expect(iteratee.mock.calls).toEqual(Array.from(values, (value, index) => [value, index, iterable])) expect(iteratee.mock.calls).toEqual(Array.from(values, (value, index) => [value, index, iterable]))
}) })
@@ -91,7 +91,7 @@ describe('asyncEach', () => {
} }
}) })
await expect(asyncEach(iterable, iteratee, { signal: ac.signal })).rejects.toThrow('asyncEach aborted') await expect(pEach(iterable, iteratee, { signal: ac.signal })).rejects.toThrow('pEach aborted')
expect(iteratee).toHaveBeenCalledTimes(2) expect(iteratee).toHaveBeenCalledTimes(2)
}) })
}) })

View File

@@ -23,7 +23,7 @@ module.exports = async function main(args) {
}, },
}) })
await asyncMap(_, async vmDir => { await pEach(_, async vmDir => {
vmDir = resolve(vmDir) vmDir = resolve(vmDir)
try { try {
await adapter.cleanVm(vmDir, { fixMetadata: fix, remove, merge, onLog: (...args) => console.warn(...args) }) await adapter.cleanVm(vmDir, { fixMetadata: fix, remove, merge, onLog: (...args) => console.warn(...args) })

View File

@@ -11,8 +11,8 @@ module.exports = async function createSymlinkIndex([backupDir, fieldPath]) {
const indexDir = join(backupDir, 'indexes', filenamify(fieldPath)) const indexDir = join(backupDir, 'indexes', filenamify(fieldPath))
await mktree(indexDir) await mktree(indexDir)
await asyncMap(await readdir2(backupDir), async vmDir => await pEach(await readdir2(backupDir), async vmDir =>
asyncMap( pEach(
(await readdir2(vmDir)).filter(_ => _.endsWith('.json')), (await readdir2(vmDir)).filter(_ => _.endsWith('.json')),
async json => { async json => {
const metadata = JSON.parse(await readFile(json)) const metadata = JSON.parse(await readFile(json))

View File

@@ -159,7 +159,7 @@ exports.Backup = class Backup {
const promises = [] const promises = []
if (pools.length !== 0 && settings.retentionPoolMetadata !== 0) { if (pools.length !== 0 && settings.retentionPoolMetadata !== 0) {
promises.push( promises.push(
asyncMap(pools, async pool => pEach(pools, async pool =>
runTask( runTask(
{ {
name: `Starting metadata backup for the pool (${pool.$id}). (${job.id})`, name: `Starting metadata backup for the pool (${pool.$id}). (${job.id})`,
@@ -245,7 +245,7 @@ exports.Backup = class Backup {
}) })
) )
), ),
() => settings.healthCheckSr !== undefined ? this._getRecord('SR', settings.healthCheckSr) : undefined, () => (settings.healthCheckSr !== undefined ? this._getRecord('SR', settings.healthCheckSr) : undefined),
async (srs, remoteAdapters, healthCheckSr) => { async (srs, remoteAdapters, healthCheckSr) => {
// remove adapters that failed (already handled) // remove adapters that failed (already handled)
remoteAdapters = remoteAdapters.filter(_ => _ !== undefined) remoteAdapters = remoteAdapters.filter(_ => _ !== undefined)
@@ -284,7 +284,10 @@ exports.Backup = class Backup {
) )
) )
const { concurrency } = settings const { concurrency } = settings
await asyncMapSettled(vmIds, concurrency === 0 ? handleVm : limitConcurrency(concurrency)(handleVm)) await pEach(vmIds, handleVm, {
concurrency,
stopOnError: false,
})
} }
) )
} }

View File

@@ -1,6 +1,8 @@
'use strict' 'use strict'
const { asyncMap } = require('@xen-orchestra/async-map') const { pEach } = require('@xen-orchestra/async-map')
const noop = Function.protoype
exports.DurablePartition = class DurablePartition { exports.DurablePartition = class DurablePartition {
// private resource API is used exceptionally to be able to separate resource creation and release // private resource API is used exceptionally to be able to separate resource creation and release
@@ -8,10 +10,10 @@ exports.DurablePartition = class DurablePartition {
flushAll() { flushAll() {
const partitionDisposers = this.#partitionDisposers const partitionDisposers = this.#partitionDisposers
return asyncMap(Object.keys(partitionDisposers), path => { return pEach(Object.keys(partitionDisposers), path => {
const disposers = partitionDisposers[path] const disposers = partitionDisposers[path]
delete partitionDisposers[path] delete partitionDisposers[path]
return asyncMap(disposers, d => d(path).catch(noop => {})) return pEach(disposers, d => d(path).catch(noop))
}) })
} }

View File

@@ -49,9 +49,7 @@ const RE_VHDI = /^vhdi(\d+)$/
async function addDirectory(files, realPath, metadataPath) { async function addDirectory(files, realPath, metadataPath) {
const stats = await lstat(realPath) const stats = await lstat(realPath)
if (stats.isDirectory()) { if (stats.isDirectory()) {
await asyncMap(await readdir(realPath), file => await pEach(await readdir(realPath), file => addDirectory(files, realPath + '/' + file, metadataPath + '/' + file))
addDirectory(files, realPath + '/' + file, metadataPath + '/' + file)
)
} else if (stats.isFile()) { } else if (stats.isFile()) {
files.push({ files.push({
realPath, realPath,
@@ -182,7 +180,7 @@ class RemoteAdapter {
const path = yield this.getPartition(diskId, partitionId) const path = yield this.getPartition(diskId, partitionId)
const files = [] const files = []
await asyncMap(paths, file => await pEach(paths, file =>
addDirectory(files, resolveSubpath(path, file), normalize('./' + file).replace(/\/+$/, '')) addDirectory(files, resolveSubpath(path, file), normalize('./' + file).replace(/\/+$/, ''))
) )
@@ -228,7 +226,9 @@ class RemoteAdapter {
const handler = this._handler const handler = this._handler
// this will delete the json, unused VHDs will be detected by `cleanVm` // this will delete the json, unused VHDs will be detected by `cleanVm`
await asyncMapSettled(backups, ({ _filename }) => handler.unlink(_filename)) await pEach(backups, ({ _filename }) => handler.unlink(_filename), {
stopOnError: false,
})
} }
async deleteMetadataBackup(backupId) { async deleteMetadataBackup(backupId) {
@@ -248,13 +248,16 @@ class RemoteAdapter {
let list = await handler.list(dir) let list = await handler.list(dir)
list.sort() list.sort()
list = list.filter(timestamp => /^\d{8}T\d{6}Z$/.test(timestamp)).slice(0, -retention) list = list.filter(timestamp => /^\d{8}T\d{6}Z$/.test(timestamp)).slice(0, -retention)
await asyncMapSettled(list, timestamp => handler.rmtree(`${dir}/${timestamp}`)) await pEach(list, timestamp => handler.rmtree(`${dir}/${timestamp}`), { stopOnError: false })
} }
async deleteFullVmBackups(backups) { async deleteFullVmBackups(backups) {
const handler = this._handler const handler = this._handler
await asyncMapSettled(backups, ({ _filename, xva }) => await pEach(
Promise.all([handler.unlink(_filename), handler.unlink(resolveRelativeFromFile(_filename, xva))]) backups,
({ _filename, xva }) =>
Promise.all([handler.unlink(_filename), handler.unlink(resolveRelativeFromFile(_filename, xva))]),
{ stopOnError: false }
) )
} }
@@ -263,7 +266,7 @@ class RemoteAdapter {
} }
async deleteVmBackups(files) { async deleteVmBackups(files) {
const metadatas = await asyncMap(files, file => this.readVmBackupMetadata(file)) const metadatas = await pEach(files, file => this.readVmBackupMetadata(file))
const { delta, full, ...others } = groupBy(metadatas, 'mode') const { delta, full, ...others } = groupBy(metadatas, 'mode')
const unsupportedModes = Object.keys(others) const unsupportedModes = Object.keys(others)
@@ -283,7 +286,7 @@ class RemoteAdapter {
} }
const dedupedVmUuid = new Set(metadatas.map(_ => _.vm.uuid)) const dedupedVmUuid = new Set(metadatas.map(_ => _.vm.uuid))
await asyncMap(dedupedVmUuid, vmUuid => this.invalidateVmBackupListCache(vmUuid)) await pEach(dedupedVmUuid, vmUuid => this.invalidateVmBackupListCache(vmUuid))
} }
#getCompressionType() { #getCompressionType() {
@@ -362,7 +365,7 @@ class RemoteAdapter {
const handler = this._handler const handler = this._handler
const backups = { __proto__: null } const backups = { __proto__: null }
await asyncMap(await handler.list(BACKUP_DIR), async entry => { await pEach(await handler.list(BACKUP_DIR), async entry => {
// ignore hidden and lock files // ignore hidden and lock files
if (entry[0] !== '.' && !entry.endsWith('.lock')) { if (entry[0] !== '.' && !entry.endsWith('.lock')) {
const vmBackups = await this.listVmBackups(entry) const vmBackups = await this.listVmBackups(entry)
@@ -380,7 +383,7 @@ class RemoteAdapter {
path = resolveSubpath(rootPath, path) path = resolveSubpath(rootPath, path)
const entriesMap = {} const entriesMap = {}
await asyncMap(await readdir(path), async name => { await pEach(await readdir(path), async name => {
try { try {
const stats = await lstat(`${path}/${name}`) const stats = await lstat(`${path}/${name}`)
if (stats.isDirectory()) { if (stats.isDirectory()) {
@@ -413,10 +416,13 @@ class RemoteAdapter {
} }
const results = [] const results = []
await asyncMapSettled(partitions, partition => await pEach(
partition.type === LVM_PARTITION_TYPE partitions,
? this._listLvmLogicalVolumes(devicePath, partition, results) partition =>
: results.push(partition) partition.type === LVM_PARTITION_TYPE
? this._listLvmLogicalVolumes(devicePath, partition, results)
: results.push(partition),
{ stopOnError: false }
) )
return results return results
}) })
@@ -427,10 +433,10 @@ class RemoteAdapter {
const safeReaddir = createSafeReaddir(handler, 'listPoolMetadataBackups') const safeReaddir = createSafeReaddir(handler, 'listPoolMetadataBackups')
const backupsByPool = {} const backupsByPool = {}
await asyncMap(await safeReaddir(DIR_XO_POOL_METADATA_BACKUPS, { prependDir: true }), async scheduleDir => await pEach(await safeReaddir(DIR_XO_POOL_METADATA_BACKUPS, { prependDir: true }), async scheduleDir =>
asyncMap(await safeReaddir(scheduleDir), async poolId => { pEach(await safeReaddir(scheduleDir), async poolId => {
const backups = backupsByPool[poolId] ?? (backupsByPool[poolId] = []) const backups = backupsByPool[poolId] ?? (backupsByPool[poolId] = [])
return asyncMap(await safeReaddir(`${scheduleDir}/${poolId}`, { prependDir: true }), async backupDir => { return pEach(await safeReaddir(`${scheduleDir}/${poolId}`, { prependDir: true }), async backupDir => {
try { try {
backups.push({ backups.push({
id: backupDir, id: backupDir,
@@ -471,7 +477,7 @@ class RemoteAdapter {
filter: isMetadataFile, filter: isMetadataFile,
prependDir: true, prependDir: true,
}) })
await asyncMap(files, async file => { await pEach(files, async file => {
try { try {
const metadata = await this.readVmBackupMetadata(file) const metadata = await this.readVmBackupMetadata(file)
// inject an id usable by importVmBackupNg() // inject an id usable by importVmBackupNg()
@@ -555,8 +561,8 @@ class RemoteAdapter {
const safeReaddir = createSafeReaddir(handler, 'listXoMetadataBackups') const safeReaddir = createSafeReaddir(handler, 'listXoMetadataBackups')
const backups = [] const backups = []
await asyncMap(await safeReaddir(DIR_XO_CONFIG_BACKUPS, { prependDir: true }), async scheduleDir => await pEach(await safeReaddir(DIR_XO_CONFIG_BACKUPS, { prependDir: true }), async scheduleDir =>
asyncMap(await safeReaddir(scheduleDir, { prependDir: true }), async backupDir => { pEach(await safeReaddir(scheduleDir, { prependDir: true }), async backupDir => {
try { try {
backups.push({ backups.push({
id: backupDir, id: backupDir,
@@ -635,9 +641,13 @@ class RemoteAdapter {
const vdis = ignoredVdis === undefined ? metadata.vdis : pickBy(metadata.vdis, vdi => !ignoredVdis.has(vdi.uuid)) const vdis = ignoredVdis === undefined ? metadata.vdis : pickBy(metadata.vdis, vdi => !ignoredVdis.has(vdi.uuid))
const streams = {} const streams = {}
await asyncMapSettled(Object.keys(vdis), async ref => { await pEach(
streams[`${ref}.vhd`] = await this._createSyntheticStream(handler, join(dir, vhds[ref])) Object.keys(vdis),
}) async ref => {
streams[`${ref}.vhd`] = await this._createSyntheticStream(handler, join(dir, vhds[ref]))
},
{ stopOnError: false }
)
return { return {
streams, streams,

View File

@@ -1,6 +1,6 @@
'use strict' 'use strict'
const { asyncMap } = require('@xen-orchestra/async-map') const { pEach } = require('@vates/async-each')
const { DIR_XO_POOL_METADATA_BACKUPS } = require('./RemoteAdapter.js') const { DIR_XO_POOL_METADATA_BACKUPS } = require('./RemoteAdapter.js')
const { forkStreamUnpipe } = require('./_forkStreamUnpipe.js') const { forkStreamUnpipe } = require('./_forkStreamUnpipe.js')
@@ -52,7 +52,7 @@ exports.PoolMetadataBackup = class PoolMetadataBackup {
) )
const metaDataFileName = `${dir}/metadata.json` const metaDataFileName = `${dir}/metadata.json`
await asyncMap( await pEach(
Object.entries(this._remoteAdapters), Object.entries(this._remoteAdapters),
([remoteId, adapter]) => ([remoteId, adapter]) =>
Task.run( Task.run(

View File

@@ -146,7 +146,7 @@ class VmBackup {
} }
const errors = [] const errors = []
await (parallel ? asyncMap : asyncEach)(writers, async function (writer) { await (parallel ? pEach : asyncEach)(writers, async function (writer) {
try { try {
await fn(writer) await fn(writer)
} catch (error) { } catch (error) {
@@ -331,13 +331,13 @@ class VmBackup {
const snapshotsPerSchedule = groupBy(this._jobSnapshots, _ => _.other_config['xo:backup:schedule']) const snapshotsPerSchedule = groupBy(this._jobSnapshots, _ => _.other_config['xo:backup:schedule'])
const xapi = this._xapi const xapi = this._xapi
await asyncMap(Object.entries(snapshotsPerSchedule), ([scheduleId, snapshots]) => { await pEach(Object.entries(snapshotsPerSchedule), ([scheduleId, snapshots]) => {
const settings = { const settings = {
...baseSettings, ...baseSettings,
...allSettings[scheduleId], ...allSettings[scheduleId],
...allSettings[this.vm.uuid], ...allSettings[this.vm.uuid],
} }
return asyncMap(getOldEntries(settings.snapshotRetention, snapshots), ({ $ref }) => { return pEach(getOldEntries(settings.snapshotRetention, snapshots), ({ $ref }) => {
if ($ref !== baseVmRef) { if ($ref !== baseVmRef) {
return xapi.VM_destroy($ref) return xapi.VM_destroy($ref)
} }
@@ -367,7 +367,7 @@ class VmBackup {
baseVm = await xapi.getRecord('VM', baseVm.$ref) baseVm = await xapi.getRecord('VM', baseVm.$ref)
const baseUuidToSrcVdi = new Map() const baseUuidToSrcVdi = new Map()
await asyncMap(await baseVm.$getDisks(), async baseRef => { await pEach(await baseVm.$getDisks(), async baseRef => {
const [baseUuid, snapshotOf] = await Promise.all([ const [baseUuid, snapshotOf] = await Promise.all([
xapi.getField('VDI', baseRef, 'uuid'), xapi.getField('VDI', baseRef, 'uuid'),
xapi.getField('VDI', baseRef, 'snapshot_of'), xapi.getField('VDI', baseRef, 'snapshot_of'),

View File

@@ -38,7 +38,7 @@ exports.XoMetadataBackup = class XoMetadataBackup {
) )
const metaDataFileName = `${dir}/metadata.json` const metaDataFileName = `${dir}/metadata.json`
await asyncMap( await pEach(
Object.entries(this._remoteAdapters), Object.entries(this._remoteAdapters),
([remoteId, adapter]) => ([remoteId, adapter]) =>
Task.run( Task.run(

View File

@@ -4,11 +4,12 @@ const assert = require('assert')
const sum = require('lodash/sum') const sum = require('lodash/sum')
const { asyncMap } = require('@xen-orchestra/async-map') const { asyncMap } = require('@xen-orchestra/async-map')
const { Constants, mergeVhd, openVhd, VhdAbstract, VhdFile } = require('vhd-lib') const { Constants, mergeVhd, openVhd, VhdAbstract, VhdFile } = require('vhd-lib')
const { isVhdAlias, resolveVhdAlias } = require('vhd-lib/aliases')
const { dirname, resolve } = require('path') const { dirname, resolve } = require('path')
const { DISK_TYPES } = Constants const { DISK_TYPES } = Constants
const { isMetadataFile, isVhdFile, isXvaFile, isXvaSumFile } = require('./_backupType.js') const { isMetadataFile, isVhdFile, isXvaFile, isXvaSumFile } = require('./_backupType.js')
const { isVhdAlias, resolveVhdAlias } = require('vhd-lib/aliases')
const { limitConcurrency } = require('limit-concurrency-decorator') const { limitConcurrency } = require('limit-concurrency-decorator')
const { pEach } = require('@vates/async-each')
const { Task } = require('./Task.js') const { Task } = require('./Task.js')
const { Disposable } = require('promise-toolbox') const { Disposable } = require('promise-toolbox')
@@ -85,13 +86,13 @@ const listVhds = async (handler, vmDir) => {
const aliases = {} const aliases = {}
const interruptedVhds = new Map() const interruptedVhds = new Map()
await asyncMap( await pEach(
await handler.list(`${vmDir}/vdis`, { await handler.list(`${vmDir}/vdis`, {
ignoreMissing: true, ignoreMissing: true,
prependDir: true, prependDir: true,
}), }),
async jobDir => async jobDir =>
asyncMap( pEach(
await handler.list(jobDir, { await handler.list(jobDir, {
prependDir: true, prependDir: true,
}), }),
@@ -193,7 +194,7 @@ exports.cleanVm = async function cleanVm(
const { vhds, interruptedVhds, aliases } = await listVhds(handler, vmDir) const { vhds, interruptedVhds, aliases } = await listVhds(handler, vmDir)
// remove broken VHDs // remove broken VHDs
await asyncMap(vhds, async path => { await pEach(vhds, async path => {
try { try {
await Disposable.use(openVhd(handler, path, { checkSecondFooter: !interruptedVhds.has(path) }), vhd => { await Disposable.use(openVhd(handler, path, { checkSecondFooter: !interruptedVhds.has(path) }), vhd => {
if (vhd.footer.diskType === DISK_TYPES.DIFFERENCING) { if (vhd.footer.diskType === DISK_TYPES.DIFFERENCING) {
@@ -238,7 +239,7 @@ exports.cleanVm = async function cleanVm(
// check if alias are correct // check if alias are correct
// check if all vhd in data subfolder have a corresponding alias // check if all vhd in data subfolder have a corresponding alias
await asyncMap(Object.keys(aliases), async dir => { await pEach(Object.keys(aliases), async dir => {
await checkAliases(aliases[dir], `${dir}/data`, { handler, logInfo, logWarn, remove }) await checkAliases(aliases[dir], `${dir}/data`, { handler, logInfo, logWarn, remove })
}) })
@@ -296,7 +297,7 @@ exports.cleanVm = async function cleanVm(
} }
}) })
await asyncMap(xvas, async path => { await pEach(xvas, async path => {
// check is not good enough to delete the file, the best we can do is report // check is not good enough to delete the file, the best we can do is report
// it // it
if (!(await this.isValidXva(path))) { if (!(await this.isValidXva(path))) {
@@ -309,7 +310,7 @@ exports.cleanVm = async function cleanVm(
// compile the list of unused XVAs and VHDs, and remove backup metadata which // compile the list of unused XVAs and VHDs, and remove backup metadata which
// reference a missing XVA/VHD // reference a missing XVA/VHD
await asyncMap(jsons, async json => { await pEach(jsons, async json => {
let metadata let metadata
try { try {
metadata = JSON.parse(await handler.readFile(json)) metadata = JSON.parse(await handler.readFile(json))
@@ -416,7 +417,7 @@ exports.cleanVm = async function cleanVm(
const metadataWithMergedVhd = {} const metadataWithMergedVhd = {}
const doMerge = async () => { const doMerge = async () => {
await asyncMap(toMerge, async chain => { await pEach(toMerge, async chain => {
const merged = await limitedMergeVhdChain(chain, { handler, logInfo, logWarn, remove, merge }) const merged = await limitedMergeVhdChain(chain, { handler, logInfo, logWarn, remove, merge })
if (merged !== undefined) { if (merged !== undefined) {
const metadataPath = vhdsToJSons[chain[0]] // all the chain should have the same metada file const metadataPath = vhdsToJSons[chain[0]] // all the chain should have the same metada file
@@ -428,14 +429,14 @@ exports.cleanVm = async function cleanVm(
await Promise.all([ await Promise.all([
...unusedVhdsDeletion, ...unusedVhdsDeletion,
toMerge.length !== 0 && (merge ? Task.run({ name: 'merge' }, doMerge) : doMerge()), toMerge.length !== 0 && (merge ? Task.run({ name: 'merge' }, doMerge) : doMerge()),
asyncMap(unusedXvas, path => { pEach(unusedXvas, path => {
logWarn('unused XVA', { path }) logWarn('unused XVA', { path })
if (remove) { if (remove) {
logInfo('deleting unused XVA', { path }) logInfo('deleting unused XVA', { path })
return handler.unlink(path) return handler.unlink(path)
} }
}), }),
asyncMap(xvaSums, path => { pEach(xvaSums, path => {
// no need to handle checksums for XVAs deleted by the script, they will be handled by `unlink()` // no need to handle checksums for XVAs deleted by the script, they will be handled by `unlink()`
if (!xvas.has(path.slice(0, -'.checksum'.length))) { if (!xvas.has(path.slice(0, -'.checksum'.length))) {
logInfo('unused XVA checksum', { path }) logInfo('unused XVA checksum', { path })
@@ -450,7 +451,7 @@ exports.cleanVm = async function cleanVm(
// update size for delta metadata with merged VHD // update size for delta metadata with merged VHD
// check for the other that the size is the same as the real file size // check for the other that the size is the same as the real file size
await asyncMap(jsons, async metadataPath => { await pEach(jsons, async metadataPath => {
const metadata = JSON.parse(await handler.readFile(metadataPath)) const metadata = JSON.parse(await handler.readFile(metadataPath))
let fileSystemSize let fileSystemSize

View File

@@ -5,10 +5,10 @@ const find = require('lodash/find.js')
const groupBy = require('lodash/groupBy.js') const groupBy = require('lodash/groupBy.js')
const ignoreErrors = require('promise-toolbox/ignoreErrors') const ignoreErrors = require('promise-toolbox/ignoreErrors')
const omit = require('lodash/omit.js') const omit = require('lodash/omit.js')
const { asyncMap } = require('@xen-orchestra/async-map')
const { CancelToken } = require('promise-toolbox') const { CancelToken } = require('promise-toolbox')
const { createVhdStreamWithLength } = require('vhd-lib') const { createVhdStreamWithLength } = require('vhd-lib')
const { defer } = require('golike-defer') const { defer } = require('golike-defer')
const { pEach } = require('@vates/async-each')
const { cancelableMap } = require('./_cancelableMap.js') const { cancelableMap } = require('./_cancelableMap.js')
const { Task } = require('./Task.js') const { Task } = require('./Task.js')
@@ -237,13 +237,13 @@ exports.importDeltaVm = defer(async function importDeltaVm(
$defer.onFailure.call(xapi, 'VM_destroy', vmRef) $defer.onFailure.call(xapi, 'VM_destroy', vmRef)
// 2. Delete all VBDs which may have been created by the import. // 2. Delete all VBDs which may have been created by the import.
await asyncMap(await xapi.getField('VM', vmRef, 'VBDs'), ref => ignoreErrors.call(xapi.call('VBD.destroy', ref))) await pEach(await xapi.getField('VM', vmRef, 'VBDs'), ref => ignoreErrors.call(xapi.call('VBD.destroy', ref)))
// 3. Create VDIs & VBDs. // 3. Create VDIs & VBDs.
const vbdRecords = deltaVm.vbds const vbdRecords = deltaVm.vbds
const vbds = groupBy(vbdRecords, 'VDI') const vbds = groupBy(vbdRecords, 'VDI')
const newVdis = {} const newVdis = {}
await asyncMap(Object.keys(vdiRecords), async vdiRef => { await pEach(Object.keys(vdiRecords), async vdiRef => {
const vdi = vdiRecords[vdiRef] const vdi = vdiRecords[vdiRef]
let newVdi let newVdi
@@ -279,7 +279,7 @@ exports.importDeltaVm = defer(async function importDeltaVm(
const vdiVbds = vbds[vdiRef] const vdiVbds = vbds[vdiRef]
if (vdiVbds !== undefined) { if (vdiVbds !== undefined) {
await asyncMap(Object.values(vdiVbds), vbd => await pEach(Object.values(vdiVbds), vbd =>
xapi.VBD_create({ xapi.VBD_create({
...vbd, ...vbd,
VDI: newVdi.$ref, VDI: newVdi.$ref,
@@ -323,7 +323,7 @@ exports.importDeltaVm = defer(async function importDeltaVm(
}), }),
// Create VIFs. // Create VIFs.
asyncMap(Object.values(deltaVm.vifs), vif => { pEach(Object.values(deltaVm.vifs), vif => {
let network = vif.$network$uuid && xapi.getObjectByUuid(vif.$network$uuid, undefined) let network = vif.$network$uuid && xapi.getObjectByUuid(vif.$network$uuid, undefined)
if (network === undefined) { if (network === undefined) {

View File

@@ -33,7 +33,7 @@ exports.DeltaBackupWriter = class DeltaBackupWriter extends MixinBackupWriter(Ab
const backupDir = getVmBackupDir(backup.vm.uuid) const backupDir = getVmBackupDir(backup.vm.uuid)
const vdisDir = `${backupDir}/vdis/${backup.job.id}` const vdisDir = `${backupDir}/vdis/${backup.job.id}`
await asyncMap(baseUuidToSrcVdi, async ([baseUuid, srcVdi]) => { await pEach(baseUuidToSrcVdi, async ([baseUuid, srcVdi]) => {
let found = false let found = false
try { try {
const vhds = await handler.list(`${vdisDir}/${srcVdi.uuid}`, { const vhds = await handler.list(`${vdisDir}/${srcVdi.uuid}`, {
@@ -41,7 +41,7 @@ exports.DeltaBackupWriter = class DeltaBackupWriter extends MixinBackupWriter(Ab
prependDir: true, prependDir: true,
}) })
const packedBaseUuid = packUuid(baseUuid) const packedBaseUuid = packUuid(baseUuid)
await asyncMap(vhds, async path => { await pEach(vhds, async path => {
try { try {
await checkVhdChain(handler, path) await checkVhdChain(handler, path)
// Warning, this should not be written as found = found || await adapter.isMergeableParent(packedBaseUuid, path) // Warning, this should not be written as found = found || await adapter.isMergeableParent(packedBaseUuid, path)

View File

@@ -25,7 +25,7 @@ exports.DeltaReplicationWriter = class DeltaReplicationWriter extends MixinRepli
const xapi = replicatedVm.$xapi const xapi = replicatedVm.$xapi
const replicatedVdis = new Set( const replicatedVdis = new Set(
await asyncMap(await replicatedVm.$getDisks(), async vdiRef => { await pEach(await replicatedVm.$getDisks(), async vdiRef => {
const otherConfig = await xapi.getField('VDI', vdiRef, 'other_config') const otherConfig = await xapi.getField('VDI', vdiRef, 'other_config')
return otherConfig[TAG_COPY_SRC] return otherConfig[TAG_COPY_SRC]
}) })
@@ -60,7 +60,10 @@ exports.DeltaReplicationWriter = class DeltaReplicationWriter extends MixinRepli
const { scheduleId, vm } = this._backup const { scheduleId, vm } = this._backup
// delete previous interrupted copies // delete previous interrupted copies
ignoreErrors.call(asyncMapSettled(listReplicatedVms(xapi, scheduleId, undefined, vm.uuid), vm => vm.$destroy)) ignoreErrors.call(
pEach(listReplicatedVms(xapi, scheduleId, undefined, vm.uuid), vm => vm.$destroy),
{ stopOnError: false }
)
this._oldEntries = getOldEntries(settings.copyRetention - 1, listReplicatedVms(xapi, scheduleId, srUuid, vm.uuid)) this._oldEntries = getOldEntries(settings.copyRetention - 1, listReplicatedVms(xapi, scheduleId, srUuid, vm.uuid))
@@ -76,7 +79,7 @@ exports.DeltaReplicationWriter = class DeltaReplicationWriter extends MixinRepli
} }
async _deleteOldEntries() { async _deleteOldEntries() {
return asyncMapSettled(this._oldEntries, vm => vm.$destroy()) return pEach(this._oldEntries, vm => vm.$destroy(), { stopOnError: false })
} }
async _transfer({ timestamp, deltaExport, sizeContainers }) { async _transfer({ timestamp, deltaExport, sizeContainers }) {
@@ -108,7 +111,7 @@ exports.DeltaReplicationWriter = class DeltaReplicationWriter extends MixinRepli
targetVm.ha_restart_priority !== '' && targetVm.ha_restart_priority !== '' &&
Promise.all([targetVm.set_ha_restart_priority(''), targetVm.add_tags('HA disabled')]), Promise.all([targetVm.set_ha_restart_priority(''), targetVm.add_tags('HA disabled')]),
targetVm.set_name_label(`${vm.name_label} - ${job.name} - (${formatFilenameDate(timestamp)})`), targetVm.set_name_label(`${vm.name_label} - ${job.name} - (${formatFilenameDate(timestamp)})`),
asyncMap(['start', 'start_on'], op => pEach(['start', 'start_on'], op =>
targetVm.update_blocked_operations( targetVm.update_blocked_operations(
op, op,
'Start operation for this vm is blocked, clone it if you want to use it.' 'Start operation for this vm is blocked, clone it if you want to use it.'

View File

@@ -40,12 +40,14 @@ exports.FullReplicationWriter = class FullReplicationWriter extends MixinReplica
// delete previous interrupted copies // delete previous interrupted copies
ignoreErrors.call( ignoreErrors.call(
asyncMapSettled(listReplicatedVms(xapi, scheduleId, undefined, vm.uuid), vm => xapi.VM_destroy(vm.$ref)) pEach(listReplicatedVms(xapi, scheduleId, undefined, vm.uuid), vm => xapi.VM_destroy(vm.$ref), {
stopOnError: false,
})
) )
const oldVms = getOldEntries(settings.copyRetention - 1, listReplicatedVms(xapi, scheduleId, srUuid, vm.uuid)) const oldVms = getOldEntries(settings.copyRetention - 1, listReplicatedVms(xapi, scheduleId, srUuid, vm.uuid))
const deleteOldBackups = () => asyncMapSettled(oldVms, vm => xapi.VM_destroy(vm.$ref)) const deleteOldBackups = () => pEach(oldVms, vm => xapi.VM_destroy(vm.$ref), { stopOnError: false })
const { deleteFirst } = settings const { deleteFirst } = settings
if (deleteFirst) { if (deleteFirst) {
await deleteOldBackups() await deleteOldBackups()
@@ -66,7 +68,7 @@ exports.FullReplicationWriter = class FullReplicationWriter extends MixinReplica
const targetVm = await xapi.getRecord('VM', targetVmRef) const targetVm = await xapi.getRecord('VM', targetVmRef)
await Promise.all([ await Promise.all([
asyncMap(['start', 'start_on'], op => pEach(['start', 'start_on'], op =>
targetVm.update_blocked_operations( targetVm.update_blocked_operations(
op, op,
'Start operation for this vm is blocked, clone it if you want to use it.' 'Start operation for this vm is blocked, clone it if you want to use it.'

View File

@@ -554,15 +554,18 @@ export default class RemoteHandlerAbstract {
} }
const files = await this._list(dir) const files = await this._list(dir)
await asyncMapSettled(files, file => await pEach(files, file =>
this._unlink(`${dir}/${file}`).catch(error => { this._unlink(`${dir}/${file}`).catch(
// Unlink dir behavior is not consistent across platforms error => {
// https://github.com/nodejs/node-v0.x-archive/issues/5791 // Unlink dir behavior is not consistent across platforms
if (error.code === 'EISDIR' || error.code === 'EPERM') { // https://github.com/nodejs/node-v0.x-archive/issues/5791
return this._rmtree(`${dir}/${file}`) if (error.code === 'EISDIR' || error.code === 'EPERM') {
} return this._rmtree(`${dir}/${file}`)
throw error }
}) throw error
},
{ stopOnError: false }
)
) )
return this._rmtree(dir) return this._rmtree(dir)
} }

View File

@@ -28,7 +28,7 @@ import createBufferFromStream from './_createBufferFromStream.js'
import guessAwsRegion from './_guessAwsRegion.js' import guessAwsRegion from './_guessAwsRegion.js'
import RemoteHandlerAbstract from './abstract' import RemoteHandlerAbstract from './abstract'
import { basename, join, split } from './_path' import { basename, join, split } from './_path'
import { asyncEach } from '@vates/async-each' import { pEach } from '@vates/async-each'
// endpoints https://docs.aws.amazon.com/general/latest/gr/s3.html // endpoints https://docs.aws.amazon.com/general/latest/gr/s3.html
@@ -369,7 +369,7 @@ export default class S3Handler extends RemoteHandlerAbstract {
) )
NextContinuationToken = result.IsTruncated ? result.NextContinuationToken : undefined NextContinuationToken = result.IsTruncated ? result.NextContinuationToken : undefined
await asyncEach( await pEach(
result.Contents ?? [], result.Contents ?? [],
async ({ Key }) => { async ({ Key }) => {
// _unlink will add the prefix, but Key contains everything // _unlink will add the prefix, but Key contains everything

View File

@@ -250,7 +250,7 @@ export default class Backups {
listPoolMetadataBackups: [ listPoolMetadataBackups: [
async ({ remotes }) => { async ({ remotes }) => {
const backupsByRemote = {} const backupsByRemote = {}
await asyncMap(Object.entries(remotes), async ([remoteId, remote]) => { await pEach(Object.entries(remotes), async ([remoteId, remote]) => {
try { try {
await Disposable.use(this.getAdapter(remote), async adapter => { await Disposable.use(this.getAdapter(remote), async adapter => {
backupsByRemote[remoteId] = await adapter.listPoolMetadataBackups() backupsByRemote[remoteId] = await adapter.listPoolMetadataBackups()
@@ -274,7 +274,7 @@ export default class Backups {
listVmBackups: [ listVmBackups: [
async ({ remotes }) => { async ({ remotes }) => {
const backups = {} const backups = {}
await asyncMap(Object.keys(remotes), async remoteId => { await pEach(Object.keys(remotes), async remoteId => {
try { try {
await Disposable.use(this.getAdapter(remotes[remoteId]), async adapter => { await Disposable.use(this.getAdapter(remotes[remoteId]), async adapter => {
backups[remoteId] = formatVmBackups(await adapter.listAllVmBackups()) backups[remoteId] = formatVmBackups(await adapter.listAllVmBackups())
@@ -304,7 +304,7 @@ export default class Backups {
listXoMetadataBackups: [ listXoMetadataBackups: [
async ({ remotes }) => { async ({ remotes }) => {
const backupsByRemote = {} const backupsByRemote = {}
await asyncMap(Object.entries(remotes), async ([remoteId, remote]) => { await pEach(Object.entries(remotes), async ([remoteId, remote]) => {
try { try {
await Disposable.use(this.getAdapter(remote), async adapter => { await Disposable.use(this.getAdapter(remote), async adapter => {
backupsByRemote[remoteId] = await adapter.listXoMetadataBackups() backupsByRemote[remoteId] = await adapter.listXoMetadataBackups()

View File

@@ -68,12 +68,12 @@ class Sr {
} }
} }
} }
await asyncMap(await this.getField('SR', ref, 'VDIs'), async ref => { await pEach(await this.getField('SR', ref, 'VDIs'), async ref => {
await asyncMap(await this.getField('VDI', ref, 'VBDs'), handleVbd) await pEach(await this.getField('VDI', ref, 'VBDs'), handleVbd)
}) })
{ {
const runningVmUuids = await asyncMap(runningVms.keys(), ref => this.getField('VM', ref, 'uuid')) const runningVmUuids = await pEach(runningVms.keys(), ref => this.getField('VM', ref, 'uuid'))
const set = new Set(vmsToShutdown) const set = new Set(vmsToShutdown)
for (const vmUuid of runningVmUuids) { for (const vmUuid of runningVmUuids) {
@@ -89,29 +89,37 @@ class Sr {
state.shutdownVms = {} state.shutdownVms = {}
await asyncMapSettled(runningVms, async ([ref, isPaused]) => { await pEach(
state.shutdownVms[await this.getField('VM', ref, 'uuid')] = isPaused runningVms,
async ([ref, isPaused]) => {
state.shutdownVms[await this.getField('VM', ref, 'uuid')] = isPaused
try { try {
await this.callAsync('VM.clean_shutdown', ref) await this.callAsync('VM.clean_shutdown', ref)
} catch (error) { } catch (error) {
warn('SR_enableMaintenanceMode, VM clean shutdown', { error }) warn('SR_enableMaintenanceMode, VM clean shutdown', { error })
await this.callAsync('VM.hard_shutdown', ref) await this.callAsync('VM.hard_shutdown', ref)
} }
$defer.onFailure.call(this, 'callAsync', 'VM.start', ref, isPaused, true) $defer.onFailure.call(this, 'callAsync', 'VM.start', ref, isPaused, true)
}) },
{ stopOnError: false }
)
state.unpluggedPbds = [] state.unpluggedPbds = []
await asyncMapSettled(await this.getField('SR', ref, 'PBDs'), async ref => { await pEach(
if (await this.getField('PBD', ref, 'currently_attached')) { await this.getField('SR', ref, 'PBDs'),
state.unpluggedPbds.push(await this.getField('PBD', ref, 'uuid')) async ref => {
if (await this.getField('PBD', ref, 'currently_attached')) {
state.unpluggedPbds.push(await this.getField('PBD', ref, 'uuid'))
await this.callAsync('PBD.unplug', ref) await this.callAsync('PBD.unplug', ref)
$defer.onFailure.call(this, 'callAsync', 'PBD.plug', ref) $defer.onFailure.call(this, 'callAsync', 'PBD.plug', ref)
} }
}) },
{ stopOnError: false }
)
await this.setFieldEntry('SR', ref, 'other_config', OC_MAINTENANCE, JSON.stringify(state)) await this.setFieldEntry('SR', ref, 'other_config', OC_MAINTENANCE, JSON.stringify(state))
} }
@@ -125,7 +133,7 @@ class Sr {
const errors = [] const errors = []
await asyncMap(state.unpluggedPbds, async uuid => { await pEach(state.unpluggedPbds, async uuid => {
try { try {
await this.callAsync('PBD.plug', await this.call('PBD.get_by_uuid', uuid)) await this.callAsync('PBD.plug', await this.call('PBD.get_by_uuid', uuid))
} catch (error) { } catch (error) {
@@ -133,7 +141,7 @@ class Sr {
} }
}) })
await asyncMap(Object.entries(state.shutdownVms), async ([uuid, isPaused]) => { await pEach(Object.entries(state.shutdownVms), async ([uuid, isPaused]) => {
try { try {
await this.callAsync('VM.start', await this.call('VM.get_by_uuid', uuid), isPaused, true) await this.callAsync('VM.start', await this.call('VM.get_by_uuid', uuid), isPaused, true)
} catch (error) { } catch (error) {

View File

@@ -48,7 +48,7 @@ const cleanBiosStrings = biosStrings => {
async function listNobakVbds(xapi, vbdRefs) { async function listNobakVbds(xapi, vbdRefs) {
const vbds = [] const vbds = []
await asyncMap(vbdRefs, async vbdRef => { await pEach(vbdRefs, async vbdRef => {
const vbd = await xapi.getRecord('VBD', vbdRef) const vbd = await xapi.getRecord('VBD', vbdRef)
if ( if (
vbd.type === 'Disk' && vbd.type === 'Disk' &&
@@ -152,7 +152,7 @@ class Vm {
if (ignoreNobakVdis) { if (ignoreNobakVdis) {
if (vm.power_state === 'Halted') { if (vm.power_state === 'Halted') {
await asyncMap(await listNobakVbds(this, vm.VBDs), async vbd => { await pEach(await listNobakVbds(this, vm.VBDs), async vbd => {
await this.VBD_destroy(vbd.$ref) await this.VBD_destroy(vbd.$ref)
$defer.call(this, 'VBD_create', vbd) $defer.call(this, 'VBD_create', vbd)
}) })
@@ -181,9 +181,7 @@ class Vm {
) )
if (destroyNobakVdis) { if (destroyNobakVdis) {
await asyncMap(await listNobakVbds(this, await this.getField('VM', ref, 'VBDs')), vbd => await pEach(await listNobakVbds(this, await this.getField('VM', ref, 'VBDs')), vbd => this.VDI_destroy(vbd.VDI))
this.VDI_destroy(vbd.VDI)
)
} }
return ref return ref
@@ -385,7 +383,7 @@ class Vm {
await this.call('VM.destroy', vmRef) await this.call('VM.destroy', vmRef)
await Promise.all([ await Promise.all([
asyncMap(vm.snapshots, snapshotRef => pEach(vm.snapshots, snapshotRef =>
this.VM_destroy(snapshotRef).catch(error => { this.VM_destroy(snapshotRef).catch(error => {
warn('VM_destroy: failed to destroy snapshot', { warn('VM_destroy: failed to destroy snapshot', {
error, error,
@@ -395,7 +393,7 @@ class Vm {
}) })
), ),
deleteDisks && deleteDisks &&
asyncMap(disks, async vdiRef => { pEach(disks, async vdiRef => {
try { try {
// Dont destroy if attached to other (non control domain) VMs // Dont destroy if attached to other (non control domain) VMs
for (const vbdRef of await this.getField('VDI', vdiRef, 'VBDs')) { for (const vbdRef of await this.getField('VDI', vdiRef, 'VBDs')) {
@@ -551,7 +549,7 @@ class Vm {
// vm.VUSBs can be undefined (e.g. on XS 7.0.0) // vm.VUSBs can be undefined (e.g. on XS 7.0.0)
const vusbs = vm.VUSBs const vusbs = vm.VUSBs
if (vusbs !== undefined) { if (vusbs !== undefined) {
await asyncMap(vusbs, async ref => { await pEach(vusbs, async ref => {
const vusb = await this.getRecord('VUSB', ref) const vusb = await this.getRecord('VUSB', ref)
await vusb.$call('destroy') await vusb.$call('destroy')
$defer.call(this, 'call', 'VUSB.create', vusb.VM, vusb.USB_group, vusb.other_config) $defer.call(this, 'call', 'VUSB.create', vusb.VM, vusb.USB_group, vusb.other_config)
@@ -562,7 +560,7 @@ class Vm {
let destroyNobakVdis = false let destroyNobakVdis = false
if (ignoreNobakVdis) { if (ignoreNobakVdis) {
if (isHalted) { if (isHalted) {
await asyncMap(await listNobakVbds(this, vm.VBDs), async vbd => { await pEach(await listNobakVbds(this, vm.VBDs), async vbd => {
await this.VBD_destroy(vbd.$ref) await this.VBD_destroy(vbd.$ref)
$defer.call(this, 'VBD_create', vbd) $defer.call(this, 'VBD_create', vbd)
}) })
@@ -659,9 +657,7 @@ class Vm {
) )
if (destroyNobakVdis) { if (destroyNobakVdis) {
await asyncMap(await listNobakVbds(this, await this.getField('VM', ref, 'VBDs')), vbd => await pEach(await listNobakVbds(this, await this.getField('VM', ref, 'VBDs')), vbd => this.VDI_destroy(vbd.VDI))
this.VDI_destroy(vbd.VDI)
)
} }
return ref return ref

View File

@@ -64,7 +64,7 @@ const VhdSynthetic = class VhdSynthetic extends VhdAbstract {
} }
async readBlockAllocationTable() { async readBlockAllocationTable() {
await asyncMap(this.#vhds, vhd => vhd.readBlockAllocationTable()) await pEach(this.#vhds, vhd => vhd.readBlockAllocationTable())
} }
containsBlock(blockId) { containsBlock(blockId) {
@@ -74,7 +74,7 @@ const VhdSynthetic = class VhdSynthetic extends VhdAbstract {
async readHeaderAndFooter() { async readHeaderAndFooter() {
const vhds = this.#vhds const vhds = this.#vhds
await asyncMap(vhds, vhd => vhd.readHeaderAndFooter()) await pEach(vhds, vhd => vhd.readHeaderAndFooter())
for (let i = 0, n = vhds.length - 1; i < n; ++i) { for (let i = 0, n = vhds.length - 1; i < n; ++i) {
const child = vhds[i] const child = vhds[i]

View File

@@ -4,13 +4,13 @@ const { createLogger } = require('@xen-orchestra/log')
const { parseVhdStream } = require('./parseVhdStream.js') const { parseVhdStream } = require('./parseVhdStream.js')
const { VhdDirectory } = require('./Vhd/VhdDirectory.js') const { VhdDirectory } = require('./Vhd/VhdDirectory.js')
const { Disposable } = require('promise-toolbox') const { Disposable } = require('promise-toolbox')
const { asyncEach } = require('@vates/async-each') const { pEach } = require('@vates/async-each')
const { warn } = createLogger('vhd-lib:createVhdDirectoryFromStream') const { warn } = createLogger('vhd-lib:createVhdDirectoryFromStream')
const buildVhd = Disposable.wrap(async function* (handler, path, inputStream, { concurrency, compression }) { const buildVhd = Disposable.wrap(async function* (handler, path, inputStream, { concurrency, compression }) {
const vhd = yield VhdDirectory.create(handler, path, { compression }) const vhd = yield VhdDirectory.create(handler, path, { compression })
await asyncEach( await pEach(
parseVhdStream(inputStream), parseVhdStream(inputStream),
async function (item) { async function (item) {
switch (item.type) { switch (item.type) {

View File

@@ -12,7 +12,7 @@ const { openVhd } = require('./openVhd')
const { basename, dirname } = require('path') const { basename, dirname } = require('path')
const { DISK_TYPES } = require('./_constants') const { DISK_TYPES } = require('./_constants')
const { Disposable } = require('promise-toolbox') const { Disposable } = require('promise-toolbox')
const { asyncEach } = require('@vates/async-each') const { pEach } = require('@vates/async-each')
const { VhdAbstract } = require('./Vhd/VhdAbstract') const { VhdAbstract } = require('./Vhd/VhdAbstract')
const { VhdDirectory } = require('./Vhd/VhdDirectory') const { VhdDirectory } = require('./Vhd/VhdDirectory')
const { VhdSynthetic } = require('./Vhd/VhdSynthetic') const { VhdSynthetic } = require('./Vhd/VhdSynthetic')
@@ -63,7 +63,7 @@ function cleanupVhds(handler, parent, children, { logInfo = noop, remove = false
return Promise.all([ return Promise.all([
VhdAbstract.rename(handler, parent, mergeTargetChild), VhdAbstract.rename(handler, parent, mergeTargetChild),
asyncMap(children, child => { pEach(children, child => {
logInfo(`the VHD child is already merged`, { child }) logInfo(`the VHD child is already merged`, { child })
if (remove) { if (remove) {
logInfo(`deleting merged VHD child`, { child }) logInfo(`deleting merged VHD child`, { child })
@@ -167,7 +167,7 @@ module.exports.mergeVhd = limitConcurrency(2)(async function merge(
let counter = 0 let counter = 0
const mergeStateWriter = makeThrottledWriter(parentHandler, mergeStatePath, 10e3) const mergeStateWriter = makeThrottledWriter(parentHandler, mergeStatePath, 10e3)
await asyncEach( await pEach(
toMerge, toMerge,
async blockId => { async blockId => {
merging.add(blockId) merging.add(blockId)

View File

@@ -152,7 +152,7 @@ export async function installPatches({ pool, patches, hosts }) {
await hosts.shift().$restartAgent() await hosts.shift().$restartAgent()
} }
await asyncMap(hosts, host => host.$restartAgent()) await pEach(hosts, host => host.$restartAgent())
} }
installPatches.params = { installPatches.params = {

View File

@@ -67,7 +67,7 @@ export async function destroy({ sr }) {
const config = xapi.xo.getData(sr, 'xosan_config') const config = xapi.xo.getData(sr, 'xosan_config')
// we simply forget because the hosted disks are being destroyed with the VMs // we simply forget because the hosted disks are being destroyed with the VMs
await xapi.forgetSr(sr._xapiId) await xapi.forgetSr(sr._xapiId)
await asyncMapSettled(config.nodes, node => this.getXapiObject(node.vm.id).$destroy()) await pEach(config.nodes, node => this.getXapiObject(node.vm.id).$destroy(), { stopOnError: false })
await xapi.deleteNetwork(config.network) await xapi.deleteNetwork(config.network)
if (sr.SR_type === 'xosan') { if (sr.SR_type === 'xosan') {
await this.unbindXosanLicense({ srId: sr.id }) await this.unbindXosanLicense({ srId: sr.id })

View File

@@ -374,13 +374,17 @@ const delete_ = defer(async function (
$defer.onFailure(() => this.setVmResourceSet(vm._xapiId, resourceSet, true)::ignoreErrors()) $defer.onFailure(() => this.setVmResourceSet(vm._xapiId, resourceSet, true)::ignoreErrors())
} }
await asyncMapSettled(vm.snapshots, async id => { await pEach(
const { resourceSet } = this.getObject(id) vm.snapshots,
if (resourceSet !== undefined) { async id => {
await this.setVmResourceSet(id, null) const { resourceSet } = this.getObject(id)
$defer.onFailure(() => this.setVmResourceSet(id, resourceSet, true)) if (resourceSet !== undefined) {
} await this.setVmResourceSet(id, null)
}) $defer.onFailure(() => this.setVmResourceSet(id, resourceSet, true))
}
},
{ stopOnError: false }
)
return xapi.VM_destroy(vm._xapiRef, { deleteDisks, force, forceDeleteDefaultTemplate }) return xapi.VM_destroy(vm._xapiRef, { deleteDisks, force, forceDeleteDefaultTemplate })
}) })

View File

@@ -401,19 +401,25 @@ const createNetworkAndInsertHosts = defer(async function ($defer, xapi, pif, vla
pif, pif,
address: networkPrefix + hostIpLastNumber++, address: networkPrefix + hostIpLastNumber++,
})) }))
await asyncMapSettled(addresses, addressAndPif => reconfigurePifIP(xapi, addressAndPif.pif, addressAndPif.address)) await pEach(addresses, addressAndPif => reconfigurePifIP(xapi, addressAndPif.pif, addressAndPif.address), {
stopOnError: false,
})
const master = xapi.pool.$master const master = xapi.pool.$master
const otherAddresses = addresses.filter(addr => addr.pif.$host !== master) const otherAddresses = addresses.filter(addr => addr.pif.$host !== master)
await asyncMapSettled(otherAddresses, async address => { await pEach(
const result = await callPlugin(xapi, master, 'run_ping', { otherAddresses,
address: address.address, async address => {
}) const result = await callPlugin(xapi, master, 'run_ping', {
if (result.exit !== 0) { address: address.address,
throw invalidParameters( })
`Could not ping ${master.name_label}->${address.pif.$host.name_label} (${address.address}) \n${result.stdout}` if (result.exit !== 0) {
) throw invalidParameters(
} `Could not ping ${master.name_label}->${address.pif.$host.name_label} (${address.address}) \n${result.stdout}`
}) )
}
},
{ stopOnError: false }
)
return xosanNetwork return xosanNetwork
}) })
@@ -441,10 +447,14 @@ async function getOrCreateSshKey(xapi) {
} }
const _probePoolAndWaitForPresence = defer(async function ($defer, glusterEndpoint, addresses) { const _probePoolAndWaitForPresence = defer(async function ($defer, glusterEndpoint, addresses) {
await asyncMapSettled(addresses, async address => { await pEach(
await glusterCmd(glusterEndpoint, 'peer probe ' + address) addresses,
$defer.onFailure(() => glusterCmd(glusterEndpoint, 'peer detach ' + address, true)) async address => {
}) await glusterCmd(glusterEndpoint, 'peer probe ' + address)
$defer.onFailure(() => glusterCmd(glusterEndpoint, 'peer detach ' + address, true))
},
{ stopOnError: false }
)
function shouldRetry(peers) { function shouldRetry(peers) {
for (const peer of peers) { for (const peer of peers) {
@@ -1129,11 +1139,11 @@ export const removeBricks = defer(async function ($defer, { xosansr, bricks }) {
const dict = _getIPToVMDict(xapi, xosansr.id) const dict = _getIPToVMDict(xapi, xosansr.id)
const brickVMs = map(bricks, b => dict[b]) const brickVMs = map(bricks, b => dict[b])
await glusterCmd(glusterEndpoint, `volume remove-brick xosan ${bricks.join(' ')} force`) await glusterCmd(glusterEndpoint, `volume remove-brick xosan ${bricks.join(' ')} force`)
await asyncMapSettled(ips, ip => glusterCmd(glusterEndpoint, 'peer detach ' + ip, true)) await pEach(ips, ip => glusterCmd(glusterEndpoint, 'peer detach ' + ip, true), { stopOnError: false })
remove(data.nodes, node => ips.includes(node.vm.ip)) remove(data.nodes, node => ips.includes(node.vm.ip))
await xapi.xo.setData(xosansr.id, 'xosan_config', data) await xapi.xo.setData(xosansr.id, 'xosan_config', data)
await xapi.callAsync('SR.scan', xapi.getObject(xosansr._xapiId).$ref) await xapi.callAsync('SR.scan', xapi.getObject(xosansr._xapiId).$ref)
await asyncMapSettled(brickVMs, vm => xapi.VM_destroy(vm.vm.$ref, true)) await pEach(brickVMs, vm => xapi.VM_destroy(vm.vm.$ref, true), { stopOnError: false })
} finally { } finally {
delete CURRENT_POOL_OPERATIONS[xapi.pool.$id] delete CURRENT_POOL_OPERATIONS[xapi.pool.$id]
} }

View File

@@ -81,22 +81,31 @@ export default class Redis extends Collection {
await redis.sadd(`${prefix}::indexes`, indexes) await redis.sadd(`${prefix}::indexes`, indexes)
await asyncMapSettled(indexes, index => await pEach(
redis.keys(`${prefix}_${index}:*`).then(keys => keys.length !== 0 && redis.del(keys)) indexes,
index => redis.keys(`${prefix}_${index}:*`).then(keys => keys.length !== 0 && redis.del(keys)),
{ stopOnError: false }
) )
const idsIndex = `${prefix}_ids` const idsIndex = `${prefix}_ids`
await asyncMapSettled(redis.smembers(idsIndex), id => await pEach(
redis.hgetall(`${prefix}:${id}`).then(values => redis.smembers(idsIndex),
values == null id =>
? redis.srem(idsIndex, id) // entry no longer exists redis.hgetall(`${prefix}:${id}`).then(values =>
: asyncMapSettled(indexes, index => { values == null
const value = values[index] ? redis.srem(idsIndex, id) // entry no longer exists
if (value !== undefined) { : pEach(
return redis.sadd(`${prefix}_${index}:${String(value).toLowerCase()}`, id) indexes,
} index => {
}) const value = values[index]
) if (value !== undefined) {
return redis.sadd(`${prefix}_${index}:${String(value).toLowerCase()}`, id)
}
},
{ stopOnError: false }
)
),
{ stopOnError: false }
) )
} }
@@ -146,12 +155,16 @@ export default class Redis extends Collection {
// remove the previous values from indexes // remove the previous values from indexes
if (indexes.length !== 0) { if (indexes.length !== 0) {
const previous = await redis.hgetall(`${prefix}:${id}`) const previous = await redis.hgetall(`${prefix}:${id}`)
await asyncMapSettled(indexes, index => { await pEach(
const value = previous[index] indexes,
if (value !== undefined) { index => {
return redis.srem(`${prefix}_${index}:${String(value).toLowerCase()}`, id) const value = previous[index]
} if (value !== undefined) {
}) return redis.srem(`${prefix}_${index}:${String(value).toLowerCase()}`, id)
}
},
{ stopOnError: false }
)
} }
} }
@@ -230,17 +243,24 @@ export default class Redis extends Collection {
if (indexes.length !== 0) { if (indexes.length !== 0) {
promise = Promise.all([ promise = Promise.all([
promise, promise,
asyncMapSettled(ids, id => pEach(
redis.hgetall(`${prefix}:${id}`).then( ids,
values => id =>
values != null && redis.hgetall(`${prefix}:${id}`).then(
asyncMapSettled(indexes, index => { values =>
const value = values[index] values != null &&
if (value !== undefined) { pEach(
return redis.srem(`${prefix}_${index}:${String(value).toLowerCase()}`, id) indexes,
} index => {
}) const value = values[index]
) if (value !== undefined) {
return redis.srem(`${prefix}_${index}:${String(value).toLowerCase()}`, id)
}
},
{ stopOnError: false }
)
),
{ stopOnError: false }
), ),
]) ])
} }

View File

@@ -369,7 +369,7 @@ async function registerPluginsInPath(path, prefix) {
throw error throw error
}) })
await asyncMap(files, name => { await pEach(files, name => {
if (name.startsWith(prefix)) { if (name.startsWith(prefix)) {
return registerPluginWrapper.call(this, `${path}/${name}`, name.slice(prefix.length)) return registerPluginWrapper.call(this, `${path}/${name}`, name.slice(prefix.length))
} }

View File

@@ -42,6 +42,7 @@ import {
parseDateTime, parseDateTime,
prepareXapiParam, prepareXapiParam,
} from './utils.mjs' } from './utils.mjs'
import { pEach } from '@vates/async-each'
const log = createLogger('xo:xapi') const log = createLogger('xo:xapi')
@@ -171,7 +172,7 @@ export default class Xapi extends XapiBase {
const host = this.getObject(hostId) const host = this.getObject(hostId)
const vms = host.$resident_VMs const vms = host.$resident_VMs
log.debug(`Emergency shutdown: ${host.name_label}`) log.debug(`Emergency shutdown: ${host.name_label}`)
await asyncMap(vms, vm => { await pEach(vms, vm => {
if (!vm.is_control_domain) { if (!vm.is_control_domain) {
return ignoreErrors.call(this.callAsync('VM.suspend', vm.$ref)) return ignoreErrors.call(this.callAsync('VM.suspend', vm.$ref))
} }
@@ -261,11 +262,15 @@ export default class Xapi extends XapiBase {
// from host to another. It only works when a shared SR is present // from host to another. It only works when a shared SR is present
// in the host. For this reason we chose to show a warning instead. // in the host. For this reason we chose to show a warning instead.
const pluggedPbds = host.$PBDs.filter(pbd => pbd.currently_attached) const pluggedPbds = host.$PBDs.filter(pbd => pbd.currently_attached)
await asyncMapSettled(pluggedPbds, async pbd => { await pEach(
const ref = pbd.$ref pluggedPbds,
await this.unplugPbd(ref) async pbd => {
$defer(() => this.plugPbd(ref)) const ref = pbd.$ref
}) await this.unplugPbd(ref)
$defer(() => this.plugPbd(ref))
},
{ stopOnError: false }
)
return host.update_other_config( return host.update_other_config(
multipathing multipathing
@@ -702,7 +707,9 @@ export default class Xapi extends XapiBase {
$defer.onFailure(() => this.VM_destroy(vm.$ref)) $defer.onFailure(() => this.VM_destroy(vm.$ref))
// Disable start and change the VM name label during import. // Disable start and change the VM name label during import.
await Promise.all([ await Promise.all([
asyncMapSettled(['start', 'start_on'], op => vm.update_blocked_operations(op, 'OVA import in progress...')), pEach(['start', 'start_on'], op => vm.update_blocked_operations(op, 'OVA import in progress...'), {
stopOnError: false,
}),
vm.set_name_label(`[Importing...] ${nameLabel}`), vm.set_name_label(`[Importing...] ${nameLabel}`),
]) ])
@@ -879,7 +886,7 @@ export default class Xapi extends XapiBase {
} }
throw new AggregateError( throw new AggregateError(
await asyncMap(await this.call('host.get_all'), async hostRef => { await pEach(await this.call('host.get_all'), async hostRef => {
const hostNameLabel = await this.call('host.get_name_label', hostRef) const hostNameLabel = await this.call('host.get_name_label', hostRef)
try { try {
await this.call('VM.assert_can_boot_here', vmRef, hostRef) await this.call('VM.assert_can_boot_here', vmRef, hostRef)
@@ -1008,13 +1015,17 @@ export default class Xapi extends XapiBase {
throw error throw error
} }
const newVdi = await this.barrier(await this.callAsync('VDI.copy', vdi.$ref, sr.$ref).then(extractOpaqueRef)) const newVdi = await this.barrier(await this.callAsync('VDI.copy', vdi.$ref, sr.$ref).then(extractOpaqueRef))
await asyncMapSettled(vdi.$VBDs, async vbd => { await pEach(
await this.call('VBD.destroy', vbd.$ref) vdi.$VBDs,
await this.VBD_create({ async vbd => {
...vbd, await this.call('VBD.destroy', vbd.$ref)
VDI: newVdi.$ref, await this.VBD_create({
}) ...vbd,
}) VDI: newVdi.$ref,
})
},
{ stopOnError: false }
)
await vdi.$destroy() await vdi.$destroy()
return newVdi return newVdi
@@ -1195,7 +1206,7 @@ export default class Xapi extends XapiBase {
}) })
}) })
await asyncMapSettled(pifsByHost, pifs => this.call('Bond.create', network.$ref, pifs, '', bondMode)) await pEach(pifsByHost, pifs => this.call('Bond.create', network.$ref, pifs, '', bondMode), { stopOnError: false })
return network return network
} }

View File

@@ -194,32 +194,40 @@ export default class BackupNg {
const remotes = {} const remotes = {}
const xapis = {} const xapis = {}
await waitAll([ await waitAll([
asyncMapSettled(remoteIds, async id => { pEach(
const remote = await app.getRemoteWithCredentials(id) remoteIds,
if (remote.proxy !== proxyId) { async id => {
throw new Error( const remote = await app.getRemoteWithCredentials(id)
proxyId === undefined if (remote.proxy !== proxyId) {
? 'The remote must not be linked to a proxy' throw new Error(
: `The remote ${remote.name} must be linked to the proxy ${proxyId}` proxyId === undefined
) ? 'The remote must not be linked to a proxy'
} : `The remote ${remote.name} must be linked to the proxy ${proxyId}`
)
}
remotes[id] = remote remotes[id] = remote
}), },
asyncMapSettled([...servers], async id => { { stopOnError: false }
const { allowUnauthorized, password, username } = await app.getXenServer(id) ),
pEach(
servers,
async id => {
const { allowUnauthorized, password, username } = await app.getXenServer(id)
const xapi = app.getAllXapis()[id] const xapi = app.getAllXapis()[id]
xapis[id] = { xapis[id] = {
allowUnauthorized, allowUnauthorized,
credentials: { credentials: {
username, username,
password, password,
}, },
url: await xapi.getHostBackupUrl(xapi.pool.$master), url: await xapi.getHostBackupUrl(xapi.pool.$master),
} }
}), },
{ stopOnError: false }
),
]) ])
const params = { const params = {
@@ -295,12 +303,16 @@ export default class BackupNg {
if (schedules !== undefined) { if (schedules !== undefined) {
const { id, settings } = job const { id, settings } = job
const tmpIds = Object.keys(schedules) const tmpIds = Object.keys(schedules)
await asyncMapSettled(tmpIds, async tmpId => { await pEach(
const schedule = schedules[tmpId] tmpIds,
schedule.jobId = id async tmpId => {
settings[(await app.createSchedule(schedule)).id] = settings[tmpId] const schedule = schedules[tmpId]
delete settings[tmpId] schedule.jobId = id
}) settings[(await app.createSchedule(schedule)).id] = settings[tmpId]
delete settings[tmpId]
},
{ stopOnError: false }
)
await app.updateJob({ id, settings }) await app.updateJob({ id, settings })
} }
@@ -355,11 +367,15 @@ export default class BackupNg {
const [schedules] = await Promise.all([app.getAllSchedules(), app.getJob(id, 'backup')]) const [schedules] = await Promise.all([app.getAllSchedules(), app.getJob(id, 'backup')])
await Promise.all([ await Promise.all([
app.removeJob(id), app.removeJob(id),
asyncMapSettled(schedules, schedule => { pEach(
if (schedule.id === id) { schedules,
app.deleteSchedule(schedule.id) schedule => {
} if (schedule.id === id) {
}), app.deleteSchedule(schedule.id)
}
},
{ stopOnError: false }
),
]) ])
} }
@@ -370,23 +386,27 @@ export default class BackupNg {
async deleteVmBackupsNg(ids) { async deleteVmBackupsNg(ids) {
const app = this._app const app = this._app
const backupsByRemote = groupBy(ids.map(parseVmBackupId), 'remoteId') const backupsByRemote = groupBy(ids.map(parseVmBackupId), 'remoteId')
await asyncMapSettled(Object.entries(backupsByRemote), async ([remoteId, backups]) => { await pEach(
const filenames = backups.map(_ => _.metadataFilename) Object.entries(backupsByRemote),
const remote = await app.getRemoteWithCredentials(remoteId) async ([remoteId, backups]) => {
if (remote.proxy !== undefined) { const filenames = backups.map(_ => _.metadataFilename)
await app.callProxyMethod(remote.proxy, 'backup.deleteVmBackups', { const remote = await app.getRemoteWithCredentials(remoteId)
filenames, if (remote.proxy !== undefined) {
remote: { await app.callProxyMethod(remote.proxy, 'backup.deleteVmBackups', {
url: remote.url, filenames,
options: remote.options, remote: {
}, url: remote.url,
}) options: remote.options,
} else { },
await Disposable.use(app.getBackupsRemoteAdapter(remote), adapter => adapter.deleteVmBackups(filenames)) })
} } else {
await Disposable.use(app.getBackupsRemoteAdapter(remote), adapter => adapter.deleteVmBackups(filenames))
}
this._listVmBackupsOnRemote(REMOVE_CACHE_ENTRY, remoteId) this._listVmBackupsOnRemote(REMOVE_CACHE_ENTRY, remoteId)
}) },
{ stopOnError: false }
)
} }
async importVmBackupNg(id, srId, settings) { async importVmBackupNg(id, srId, settings) {

View File

@@ -58,49 +58,53 @@ export default async function executeJobCall({ app, job, logger, runJobId, sched
timezone: schedule !== undefined ? schedule.timezone : undefined, timezone: schedule !== undefined ? schedule.timezone : undefined,
} }
await asyncMapSettled(paramsFlatVector, params => { await pEach(
const runCallId = logger.notice(`Starting ${job.method} call. (${job.id})`, { paramsFlatVector,
event: 'jobCall.start', params => {
runJobId, const runCallId = logger.notice(`Starting ${job.method} call. (${job.id})`, {
method: job.method, event: 'jobCall.start',
params, runJobId,
}) method: job.method,
params,
})
const call = (execStatus.calls[runCallId] = { const call = (execStatus.calls[runCallId] = {
method: job.method, method: job.method,
params, params,
start: Date.now(), start: Date.now(),
}) })
let promise = app.callApiMethod(connection, job.method, Object.assign({}, params)) let promise = app.callApiMethod(connection, job.method, Object.assign({}, params))
if (job.timeout) { if (job.timeout) {
promise = promise::timeout(job.timeout) promise = promise::timeout(job.timeout)
}
return promise.then(
value => {
logger.notice(`Call ${job.method} (${runCallId}) is a success. (${job.id})`, {
event: 'jobCall.end',
runJobId,
runCallId,
returnedValue: value,
})
call.returnedValue = value
call.end = Date.now()
},
reason => {
logger.notice(`Call ${job.method} (${runCallId}) has failed. (${job.id})`, {
event: 'jobCall.end',
runJobId,
runCallId,
error: serializeError(reason),
})
call.error = reason
call.end = Date.now()
} }
)
}) return promise.then(
value => {
logger.notice(`Call ${job.method} (${runCallId}) is a success. (${job.id})`, {
event: 'jobCall.end',
runJobId,
runCallId,
returnedValue: value,
})
call.returnedValue = value
call.end = Date.now()
},
reason => {
logger.notice(`Call ${job.method} (${runCallId}) has failed. (${job.id})`, {
event: 'jobCall.end',
runJobId,
runCallId,
error: serializeError(reason),
})
call.error = reason
call.end = Date.now()
}
)
},
{ stopOnError: false }
)
execStatus.end = Date.now() execStatus.end = Date.now()

View File

@@ -96,23 +96,27 @@ export default class Jobs {
) )
}) })
// it sends a report for the interrupted backup jobs // it sends a report for the interrupted backup jobs
app.on('plugins:registered', () => app.on('plugins:registered', async () =>
asyncMapSettled(this._jobs.get(), job => { pEach(
// only the interrupted backup jobs have the runId property await this._jobs.get(),
if (job.runId === undefined) { job => {
return // only the interrupted backup jobs have the runId property
} if (job.runId === undefined) {
return
app.emit(
'job:terminated',
// This cast can be removed after merging the PR: https://github.com/vatesfr/xen-orchestra/pull/3209
String(job.runId),
{
type: job.type,
} }
)
return this.updateJob({ id: job.id, runId: null }) app.emit(
}) 'job:terminated',
// This cast can be removed after merging the PR: https://github.com/vatesfr/xen-orchestra/pull/3209
String(job.runId),
{
type: job.type,
}
)
return this.updateJob({ id: job.id, runId: null })
},
{ stopOnError: false }
)
) )
} }

View File

@@ -69,25 +69,33 @@ export default class metadataBackup {
const remotes = {} const remotes = {}
const xapis = {} const xapis = {}
await waitAll([ await waitAll([
asyncMapSettled(remoteIds, async id => { pEach(
const remote = await app.getRemoteWithCredentials(id) remoteIds,
if (remote.proxy !== proxyId) { async id => {
throw new Error(`The remote ${remote.name} must be linked to the proxy ${proxyId}`) const remote = await app.getRemoteWithCredentials(id)
} if (remote.proxy !== proxyId) {
throw new Error(`The remote ${remote.name} must be linked to the proxy ${proxyId}`)
}
remotes[id] = remote remotes[id] = remote
}), },
asyncMapSettled([...servers], async id => { { stopOnError: false }
const { allowUnauthorized, host, password, username } = await app.getXenServer(id) ),
xapis[id] = { pEach(
allowUnauthorized, servers,
credentials: { async id => {
username, const { allowUnauthorized, host, password, username } = await app.getXenServer(id)
password, xapis[id] = {
}, allowUnauthorized,
url: host, credentials: {
} username,
}), password,
},
url: host,
}
},
{ stopOnError: false }
),
]) ])
const params = { const params = {
@@ -157,14 +165,18 @@ export default class metadataBackup {
}) })
const { id: jobId, settings } = job const { id: jobId, settings } = job
await asyncMapSettled(schedules, async (schedule, tmpId) => { await pEach(
const { id: scheduleId } = await app.createSchedule({ schedules,
...schedule, async (schedule, tmpId) => {
jobId, const { id: scheduleId } = await app.createSchedule({
}) ...schedule,
settings[scheduleId] = settings[tmpId] jobId,
delete settings[tmpId] })
}) settings[scheduleId] = settings[tmpId]
delete settings[tmpId]
},
{ stopOnError: false }
)
await app.updateJob({ id: jobId, settings }) await app.updateJob({ id: jobId, settings })
return job return job
@@ -180,11 +192,15 @@ export default class metadataBackup {
await Promise.all([ await Promise.all([
app.removeJob(id), app.removeJob(id),
asyncMapSettled(schedules, schedule => { pEach(
if (schedule.id === id) { schedules,
return app.deleteSchedule(id) schedule => {
} if (schedule.id === id) {
}), return app.deleteSchedule(id)
}
},
{ stopOnError: false }
),
]) ])
} }

View File

@@ -120,27 +120,31 @@ export default class {
async getAllRemotesInfo() { async getAllRemotesInfo() {
const remotesInfo = this._remotesInfo const remotesInfo = this._remotesInfo
await asyncMapSettled(this._remotes.get(), async remote => { await pEach(
if (!remote.enabled) { this._remotes.get(),
return async remote => {
} if (!remote.enabled) {
return
}
const promise = const promise =
remote.proxy !== undefined remote.proxy !== undefined
? this._app.callProxyMethod(remote.proxy, 'remote.getInfo', { ? this._app.callProxyMethod(remote.proxy, 'remote.getInfo', {
remote, remote,
}) })
: this.getRemoteHandler(remote.id).then(handler => handler.getInfo()) : this.getRemoteHandler(remote.id).then(handler => handler.getInfo())
try { try {
await timeout.call( await timeout.call(
promise.then(info => { promise.then(info => {
remotesInfo[remote.id] = info remotesInfo[remote.id] = info
}), }),
5e3 5e3
) )
} catch (_) {} } catch (_) {}
}) },
{ stopOnError: false }
)
return remotesInfo return remotesInfo
} }

View File

@@ -426,6 +426,6 @@ export default class {
} }
const { subjects } = await this.getResourceSet(resourceSetId) const { subjects } = await this.getResourceSet(resourceSetId)
await asyncMapSettled(subjects, subject => this._app.addAcl(subject, vmId, 'admin')) await pEach(subjects, subject => this._app.addAcl(subject, vmId, 'admin'), { stopOnError: false })
} }
} }

View File

@@ -53,10 +53,14 @@ export default class Scheduling {
'schedules', 'schedules',
() => db.get(), () => db.get(),
schedules => schedules =>
asyncMapSettled(schedules, async schedule => { pEach(
await db.update(normalize(schedule)) schedules,
this._start(schedule.id) async schedule => {
}), await db.update(normalize(schedule))
this._start(schedule.id)
},
{ stopOnError: false }
),
['jobs'] ['jobs']
) )

View File

@@ -1,7 +1,7 @@
import assert from 'assert' import assert from 'assert'
import findKey from 'lodash/findKey.js' import findKey from 'lodash/findKey.js'
import pick from 'lodash/pick.js' import pick from 'lodash/pick.js'
import { asyncEach } from '@vates/async-each' import { pEach } from '@vates/async-each'
import { BaseError } from 'make-error' import { BaseError } from 'make-error'
import { createLogger } from '@xen-orchestra/log' import { createLogger } from '@xen-orchestra/log'
import { fibonacci } from 'iterable-backoff' import { fibonacci } from 'iterable-backoff'
@@ -391,7 +391,7 @@ export default class {
const { pool } = xapi const { pool } = xapi
try { try {
await asyncEach(Object.entries(pool.other_config), ([key, value]) => { await pEach(Object.entries(pool.other_config), ([key, value]) => {
if (key.startsWith(poolMarkingPrefix)) { if (key.startsWith(poolMarkingPrefix)) {
const { lastConnected } = JSON.parse(value) const { lastConnected } = JSON.parse(value)
if (now - lastConnected > poolMarkingMaxAge) { if (now - lastConnected > poolMarkingMaxAge) {