Compare commits
1 Commits
token-last
...
async-each
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4c991c1a57 |
@@ -1,4 +1,4 @@
|
||||
### `asyncEach(iterable, iteratee, [opts])`
|
||||
### `pEach(iterable, iteratee, [opts])`
|
||||
|
||||
Executes `iteratee` in order for each value yielded by `iterable`.
|
||||
|
||||
@@ -6,7 +6,7 @@ Returns a promise wich rejects as soon as a call to `iteratee` throws or a promi
|
||||
|
||||
`iterable` must be an iterable or async iterable.
|
||||
|
||||
`iteratee` is called with the same `this` value as `asyncEach`, and with the following arguments:
|
||||
`iteratee` is called with the same `this` value as `pEach`, and with the following arguments:
|
||||
|
||||
- `value`: the value yielded by `iterable`
|
||||
- `index`: the 0-based index for this value
|
||||
@@ -19,10 +19,10 @@ Returns a promise wich rejects as soon as a call to `iteratee` throws or a promi
|
||||
- `stopOnError`: wether to stop iteration of first error, or wait for all calls to finish and throw an `AggregateError`, defaults to `true`
|
||||
|
||||
```js
|
||||
import { asyncEach } from '@vates/async-each'
|
||||
import { pEach } from '@vates/async-each'
|
||||
|
||||
const contents = []
|
||||
await asyncEach(
|
||||
await pEach(
|
||||
['foo.txt', 'bar.txt', 'baz.txt'],
|
||||
async function (filename, i) {
|
||||
contents[i] = await readFile(filename)
|
||||
|
||||
@@ -16,7 +16,7 @@ Installation of the [npm package](https://npmjs.org/package/@vates/async-each):
|
||||
|
||||
## Usage
|
||||
|
||||
### `asyncEach(iterable, iteratee, [opts])`
|
||||
### `pEach(iterable, iteratee, [opts])`
|
||||
|
||||
Executes `iteratee` in order for each value yielded by `iterable`.
|
||||
|
||||
@@ -24,7 +24,7 @@ Returns a promise wich rejects as soon as a call to `iteratee` throws or a promi
|
||||
|
||||
`iterable` must be an iterable or async iterable.
|
||||
|
||||
`iteratee` is called with the same `this` value as `asyncEach`, and with the following arguments:
|
||||
`iteratee` is called with the same `this` value as `pEach`, and with the following arguments:
|
||||
|
||||
- `value`: the value yielded by `iterable`
|
||||
- `index`: the 0-based index for this value
|
||||
@@ -37,10 +37,10 @@ Returns a promise wich rejects as soon as a call to `iteratee` throws or a promi
|
||||
- `stopOnError`: wether to stop iteration of first error, or wait for all calls to finish and throw an `AggregateError`, defaults to `true`
|
||||
|
||||
```js
|
||||
import { asyncEach } from '@vates/async-each'
|
||||
import { pEach } from '@vates/async-each'
|
||||
|
||||
const contents = []
|
||||
await asyncEach(
|
||||
await pEach(
|
||||
['foo.txt', 'bar.txt', 'baz.txt'],
|
||||
async function (filename, i) {
|
||||
contents[i] = await readFile(filename)
|
||||
|
||||
@@ -15,7 +15,7 @@ class AggregateError extends Error {
|
||||
* @param {(item: Item, index: number, iterable: Iterable<Item>) => Promise<void>} iteratee
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
exports.asyncEach = function asyncEach(iterable, iteratee, { concurrency = 1, signal, stopOnError = true } = {}) {
|
||||
exports.pEach = function pEach(iterable, iteratee, { concurrency = 10, signal, stopOnError = true } = {}) {
|
||||
if (concurrency === 0) {
|
||||
concurrency = Infinity
|
||||
}
|
||||
@@ -28,7 +28,7 @@ exports.asyncEach = function asyncEach(iterable, iteratee, { concurrency = 1, si
|
||||
let onAbort
|
||||
if (signal !== undefined) {
|
||||
onAbort = () => {
|
||||
onRejectedWrapper(new Error('asyncEach aborted'))
|
||||
onRejectedWrapper(new Error('pEach aborted'))
|
||||
}
|
||||
signal.addEventListener('abort', onAbort)
|
||||
}
|
||||
@@ -51,11 +51,10 @@ exports.asyncEach = function asyncEach(iterable, iteratee, { concurrency = 1, si
|
||||
clean()
|
||||
})(reject)
|
||||
|
||||
let onFulfilled = value => {
|
||||
let onFulfilled = () => {
|
||||
--running
|
||||
next()
|
||||
}
|
||||
const onFulfilledWrapper = value => onFulfilled(value)
|
||||
|
||||
let onRejected = stopOnError
|
||||
? reject
|
||||
@@ -66,6 +65,14 @@ exports.asyncEach = function asyncEach(iterable, iteratee, { concurrency = 1, si
|
||||
}
|
||||
const onRejectedWrapper = reason => onRejected(reason)
|
||||
|
||||
const run = async (value, index) => {
|
||||
try {
|
||||
onFulfilled(await iteratee.call(this, value, index++))
|
||||
} catch (error) {
|
||||
onRejected(error)
|
||||
}
|
||||
}
|
||||
|
||||
let nextIsRunning = false
|
||||
let next = async () => {
|
||||
if (nextIsRunning) {
|
||||
@@ -86,17 +93,7 @@ exports.asyncEach = function asyncEach(iterable, iteratee, { concurrency = 1, si
|
||||
}
|
||||
} else {
|
||||
++running
|
||||
try {
|
||||
const result = iteratee.call(this, cursor.value, index++, iterable)
|
||||
let then
|
||||
if (result != null && typeof result === 'object' && typeof (then = result.then) === 'function') {
|
||||
then.call(result, onFulfilledWrapper, onRejectedWrapper)
|
||||
} else {
|
||||
onFulfilled(result)
|
||||
}
|
||||
} catch (error) {
|
||||
onRejected(error)
|
||||
}
|
||||
run(cursor.value, index++, iterable)
|
||||
}
|
||||
nextIsRunning = false
|
||||
return next()
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
/* eslint-env jest */
|
||||
|
||||
const { asyncEach } = require('./')
|
||||
const { pEach } = require('./')
|
||||
|
||||
const randomDelay = (max = 10) =>
|
||||
new Promise(resolve => {
|
||||
@@ -14,7 +14,7 @@ const rejectionOf = p =>
|
||||
p.then(reject, resolve)
|
||||
})
|
||||
|
||||
describe('asyncEach', () => {
|
||||
describe('pEach', () => {
|
||||
const thisArg = 'qux'
|
||||
const values = ['foo', 'bar', 'baz']
|
||||
|
||||
@@ -36,7 +36,7 @@ describe('asyncEach', () => {
|
||||
it('works', async () => {
|
||||
const iteratee = jest.fn(async () => {})
|
||||
|
||||
await asyncEach.call(thisArg, iterable, iteratee)
|
||||
await pEach.call(thisArg, iterable, iteratee)
|
||||
|
||||
expect(iteratee.mock.instances).toEqual(Array.from(values, () => thisArg))
|
||||
expect(iteratee.mock.calls).toEqual(Array.from(values, (value, index) => [value, index, iterable]))
|
||||
@@ -45,7 +45,7 @@ describe('asyncEach', () => {
|
||||
it('respects a concurrency of ' + concurrency, async () => {
|
||||
let running = 0
|
||||
|
||||
await asyncEach(
|
||||
await pEach(
|
||||
values,
|
||||
async () => {
|
||||
++running
|
||||
@@ -66,7 +66,7 @@ describe('asyncEach', () => {
|
||||
}
|
||||
})
|
||||
|
||||
expect(await rejectionOf(asyncEach(iterable, iteratee, { stopOnError: true }))).toBe(error)
|
||||
expect(await rejectionOf(pEach(iterable, iteratee, { stopOnError: true }))).toBe(error)
|
||||
expect(iteratee).toHaveBeenCalledTimes(2)
|
||||
})
|
||||
|
||||
@@ -78,7 +78,7 @@ describe('asyncEach', () => {
|
||||
throw error
|
||||
})
|
||||
|
||||
const error = await rejectionOf(asyncEach(iterable, iteratee, { stopOnError: false }))
|
||||
const error = await rejectionOf(pEach(iterable, iteratee, { stopOnError: false }))
|
||||
expect(error.errors).toEqual(errors)
|
||||
expect(iteratee.mock.calls).toEqual(Array.from(values, (value, index) => [value, index, iterable]))
|
||||
})
|
||||
@@ -91,7 +91,7 @@ describe('asyncEach', () => {
|
||||
}
|
||||
})
|
||||
|
||||
await expect(asyncEach(iterable, iteratee, { signal: ac.signal })).rejects.toThrow('asyncEach aborted')
|
||||
await expect(pEach(iterable, iteratee, { signal: ac.signal })).rejects.toThrow('pEach aborted')
|
||||
expect(iteratee).toHaveBeenCalledTimes(2)
|
||||
})
|
||||
})
|
||||
|
||||
@@ -23,7 +23,7 @@ module.exports = async function main(args) {
|
||||
},
|
||||
})
|
||||
|
||||
await asyncMap(_, async vmDir => {
|
||||
await pEach(_, async vmDir => {
|
||||
vmDir = resolve(vmDir)
|
||||
try {
|
||||
await adapter.cleanVm(vmDir, { fixMetadata: fix, remove, merge, onLog: (...args) => console.warn(...args) })
|
||||
|
||||
@@ -11,8 +11,8 @@ module.exports = async function createSymlinkIndex([backupDir, fieldPath]) {
|
||||
const indexDir = join(backupDir, 'indexes', filenamify(fieldPath))
|
||||
await mktree(indexDir)
|
||||
|
||||
await asyncMap(await readdir2(backupDir), async vmDir =>
|
||||
asyncMap(
|
||||
await pEach(await readdir2(backupDir), async vmDir =>
|
||||
pEach(
|
||||
(await readdir2(vmDir)).filter(_ => _.endsWith('.json')),
|
||||
async json => {
|
||||
const metadata = JSON.parse(await readFile(json))
|
||||
|
||||
@@ -159,7 +159,7 @@ exports.Backup = class Backup {
|
||||
const promises = []
|
||||
if (pools.length !== 0 && settings.retentionPoolMetadata !== 0) {
|
||||
promises.push(
|
||||
asyncMap(pools, async pool =>
|
||||
pEach(pools, async pool =>
|
||||
runTask(
|
||||
{
|
||||
name: `Starting metadata backup for the pool (${pool.$id}). (${job.id})`,
|
||||
@@ -245,7 +245,7 @@ exports.Backup = class Backup {
|
||||
})
|
||||
)
|
||||
),
|
||||
() => settings.healthCheckSr !== undefined ? this._getRecord('SR', settings.healthCheckSr) : undefined,
|
||||
() => (settings.healthCheckSr !== undefined ? this._getRecord('SR', settings.healthCheckSr) : undefined),
|
||||
async (srs, remoteAdapters, healthCheckSr) => {
|
||||
// remove adapters that failed (already handled)
|
||||
remoteAdapters = remoteAdapters.filter(_ => _ !== undefined)
|
||||
@@ -284,7 +284,10 @@ exports.Backup = class Backup {
|
||||
)
|
||||
)
|
||||
const { concurrency } = settings
|
||||
await asyncMapSettled(vmIds, concurrency === 0 ? handleVm : limitConcurrency(concurrency)(handleVm))
|
||||
await pEach(vmIds, handleVm, {
|
||||
concurrency,
|
||||
stopOnError: false,
|
||||
})
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
'use strict'
|
||||
|
||||
const { asyncMap } = require('@xen-orchestra/async-map')
|
||||
const { pEach } = require('@xen-orchestra/async-map')
|
||||
|
||||
const noop = Function.protoype
|
||||
|
||||
exports.DurablePartition = class DurablePartition {
|
||||
// private resource API is used exceptionally to be able to separate resource creation and release
|
||||
@@ -8,10 +10,10 @@ exports.DurablePartition = class DurablePartition {
|
||||
|
||||
flushAll() {
|
||||
const partitionDisposers = this.#partitionDisposers
|
||||
return asyncMap(Object.keys(partitionDisposers), path => {
|
||||
return pEach(Object.keys(partitionDisposers), path => {
|
||||
const disposers = partitionDisposers[path]
|
||||
delete partitionDisposers[path]
|
||||
return asyncMap(disposers, d => d(path).catch(noop => {}))
|
||||
return pEach(disposers, d => d(path).catch(noop))
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -49,9 +49,7 @@ const RE_VHDI = /^vhdi(\d+)$/
|
||||
async function addDirectory(files, realPath, metadataPath) {
|
||||
const stats = await lstat(realPath)
|
||||
if (stats.isDirectory()) {
|
||||
await asyncMap(await readdir(realPath), file =>
|
||||
addDirectory(files, realPath + '/' + file, metadataPath + '/' + file)
|
||||
)
|
||||
await pEach(await readdir(realPath), file => addDirectory(files, realPath + '/' + file, metadataPath + '/' + file))
|
||||
} else if (stats.isFile()) {
|
||||
files.push({
|
||||
realPath,
|
||||
@@ -182,7 +180,7 @@ class RemoteAdapter {
|
||||
const path = yield this.getPartition(diskId, partitionId)
|
||||
|
||||
const files = []
|
||||
await asyncMap(paths, file =>
|
||||
await pEach(paths, file =>
|
||||
addDirectory(files, resolveSubpath(path, file), normalize('./' + file).replace(/\/+$/, ''))
|
||||
)
|
||||
|
||||
@@ -228,7 +226,9 @@ class RemoteAdapter {
|
||||
const handler = this._handler
|
||||
|
||||
// this will delete the json, unused VHDs will be detected by `cleanVm`
|
||||
await asyncMapSettled(backups, ({ _filename }) => handler.unlink(_filename))
|
||||
await pEach(backups, ({ _filename }) => handler.unlink(_filename), {
|
||||
stopOnError: false,
|
||||
})
|
||||
}
|
||||
|
||||
async deleteMetadataBackup(backupId) {
|
||||
@@ -248,13 +248,16 @@ class RemoteAdapter {
|
||||
let list = await handler.list(dir)
|
||||
list.sort()
|
||||
list = list.filter(timestamp => /^\d{8}T\d{6}Z$/.test(timestamp)).slice(0, -retention)
|
||||
await asyncMapSettled(list, timestamp => handler.rmtree(`${dir}/${timestamp}`))
|
||||
await pEach(list, timestamp => handler.rmtree(`${dir}/${timestamp}`), { stopOnError: false })
|
||||
}
|
||||
|
||||
async deleteFullVmBackups(backups) {
|
||||
const handler = this._handler
|
||||
await asyncMapSettled(backups, ({ _filename, xva }) =>
|
||||
Promise.all([handler.unlink(_filename), handler.unlink(resolveRelativeFromFile(_filename, xva))])
|
||||
await pEach(
|
||||
backups,
|
||||
({ _filename, xva }) =>
|
||||
Promise.all([handler.unlink(_filename), handler.unlink(resolveRelativeFromFile(_filename, xva))]),
|
||||
{ stopOnError: false }
|
||||
)
|
||||
}
|
||||
|
||||
@@ -263,7 +266,7 @@ class RemoteAdapter {
|
||||
}
|
||||
|
||||
async deleteVmBackups(files) {
|
||||
const metadatas = await asyncMap(files, file => this.readVmBackupMetadata(file))
|
||||
const metadatas = await pEach(files, file => this.readVmBackupMetadata(file))
|
||||
const { delta, full, ...others } = groupBy(metadatas, 'mode')
|
||||
|
||||
const unsupportedModes = Object.keys(others)
|
||||
@@ -283,7 +286,7 @@ class RemoteAdapter {
|
||||
}
|
||||
|
||||
const dedupedVmUuid = new Set(metadatas.map(_ => _.vm.uuid))
|
||||
await asyncMap(dedupedVmUuid, vmUuid => this.invalidateVmBackupListCache(vmUuid))
|
||||
await pEach(dedupedVmUuid, vmUuid => this.invalidateVmBackupListCache(vmUuid))
|
||||
}
|
||||
|
||||
#getCompressionType() {
|
||||
@@ -362,7 +365,7 @@ class RemoteAdapter {
|
||||
const handler = this._handler
|
||||
|
||||
const backups = { __proto__: null }
|
||||
await asyncMap(await handler.list(BACKUP_DIR), async entry => {
|
||||
await pEach(await handler.list(BACKUP_DIR), async entry => {
|
||||
// ignore hidden and lock files
|
||||
if (entry[0] !== '.' && !entry.endsWith('.lock')) {
|
||||
const vmBackups = await this.listVmBackups(entry)
|
||||
@@ -380,7 +383,7 @@ class RemoteAdapter {
|
||||
path = resolveSubpath(rootPath, path)
|
||||
|
||||
const entriesMap = {}
|
||||
await asyncMap(await readdir(path), async name => {
|
||||
await pEach(await readdir(path), async name => {
|
||||
try {
|
||||
const stats = await lstat(`${path}/${name}`)
|
||||
if (stats.isDirectory()) {
|
||||
@@ -413,10 +416,13 @@ class RemoteAdapter {
|
||||
}
|
||||
|
||||
const results = []
|
||||
await asyncMapSettled(partitions, partition =>
|
||||
partition.type === LVM_PARTITION_TYPE
|
||||
? this._listLvmLogicalVolumes(devicePath, partition, results)
|
||||
: results.push(partition)
|
||||
await pEach(
|
||||
partitions,
|
||||
partition =>
|
||||
partition.type === LVM_PARTITION_TYPE
|
||||
? this._listLvmLogicalVolumes(devicePath, partition, results)
|
||||
: results.push(partition),
|
||||
{ stopOnError: false }
|
||||
)
|
||||
return results
|
||||
})
|
||||
@@ -427,10 +433,10 @@ class RemoteAdapter {
|
||||
const safeReaddir = createSafeReaddir(handler, 'listPoolMetadataBackups')
|
||||
|
||||
const backupsByPool = {}
|
||||
await asyncMap(await safeReaddir(DIR_XO_POOL_METADATA_BACKUPS, { prependDir: true }), async scheduleDir =>
|
||||
asyncMap(await safeReaddir(scheduleDir), async poolId => {
|
||||
await pEach(await safeReaddir(DIR_XO_POOL_METADATA_BACKUPS, { prependDir: true }), async scheduleDir =>
|
||||
pEach(await safeReaddir(scheduleDir), async poolId => {
|
||||
const backups = backupsByPool[poolId] ?? (backupsByPool[poolId] = [])
|
||||
return asyncMap(await safeReaddir(`${scheduleDir}/${poolId}`, { prependDir: true }), async backupDir => {
|
||||
return pEach(await safeReaddir(`${scheduleDir}/${poolId}`, { prependDir: true }), async backupDir => {
|
||||
try {
|
||||
backups.push({
|
||||
id: backupDir,
|
||||
@@ -471,7 +477,7 @@ class RemoteAdapter {
|
||||
filter: isMetadataFile,
|
||||
prependDir: true,
|
||||
})
|
||||
await asyncMap(files, async file => {
|
||||
await pEach(files, async file => {
|
||||
try {
|
||||
const metadata = await this.readVmBackupMetadata(file)
|
||||
// inject an id usable by importVmBackupNg()
|
||||
@@ -555,8 +561,8 @@ class RemoteAdapter {
|
||||
const safeReaddir = createSafeReaddir(handler, 'listXoMetadataBackups')
|
||||
|
||||
const backups = []
|
||||
await asyncMap(await safeReaddir(DIR_XO_CONFIG_BACKUPS, { prependDir: true }), async scheduleDir =>
|
||||
asyncMap(await safeReaddir(scheduleDir, { prependDir: true }), async backupDir => {
|
||||
await pEach(await safeReaddir(DIR_XO_CONFIG_BACKUPS, { prependDir: true }), async scheduleDir =>
|
||||
pEach(await safeReaddir(scheduleDir, { prependDir: true }), async backupDir => {
|
||||
try {
|
||||
backups.push({
|
||||
id: backupDir,
|
||||
@@ -635,9 +641,13 @@ class RemoteAdapter {
|
||||
const vdis = ignoredVdis === undefined ? metadata.vdis : pickBy(metadata.vdis, vdi => !ignoredVdis.has(vdi.uuid))
|
||||
|
||||
const streams = {}
|
||||
await asyncMapSettled(Object.keys(vdis), async ref => {
|
||||
streams[`${ref}.vhd`] = await this._createSyntheticStream(handler, join(dir, vhds[ref]))
|
||||
})
|
||||
await pEach(
|
||||
Object.keys(vdis),
|
||||
async ref => {
|
||||
streams[`${ref}.vhd`] = await this._createSyntheticStream(handler, join(dir, vhds[ref]))
|
||||
},
|
||||
{ stopOnError: false }
|
||||
)
|
||||
|
||||
return {
|
||||
streams,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
'use strict'
|
||||
|
||||
const { asyncMap } = require('@xen-orchestra/async-map')
|
||||
const { pEach } = require('@vates/async-each')
|
||||
|
||||
const { DIR_XO_POOL_METADATA_BACKUPS } = require('./RemoteAdapter.js')
|
||||
const { forkStreamUnpipe } = require('./_forkStreamUnpipe.js')
|
||||
@@ -52,7 +52,7 @@ exports.PoolMetadataBackup = class PoolMetadataBackup {
|
||||
)
|
||||
const metaDataFileName = `${dir}/metadata.json`
|
||||
|
||||
await asyncMap(
|
||||
await pEach(
|
||||
Object.entries(this._remoteAdapters),
|
||||
([remoteId, adapter]) =>
|
||||
Task.run(
|
||||
|
||||
@@ -146,7 +146,7 @@ class VmBackup {
|
||||
}
|
||||
|
||||
const errors = []
|
||||
await (parallel ? asyncMap : asyncEach)(writers, async function (writer) {
|
||||
await (parallel ? pEach : asyncEach)(writers, async function (writer) {
|
||||
try {
|
||||
await fn(writer)
|
||||
} catch (error) {
|
||||
@@ -331,13 +331,13 @@ class VmBackup {
|
||||
|
||||
const snapshotsPerSchedule = groupBy(this._jobSnapshots, _ => _.other_config['xo:backup:schedule'])
|
||||
const xapi = this._xapi
|
||||
await asyncMap(Object.entries(snapshotsPerSchedule), ([scheduleId, snapshots]) => {
|
||||
await pEach(Object.entries(snapshotsPerSchedule), ([scheduleId, snapshots]) => {
|
||||
const settings = {
|
||||
...baseSettings,
|
||||
...allSettings[scheduleId],
|
||||
...allSettings[this.vm.uuid],
|
||||
}
|
||||
return asyncMap(getOldEntries(settings.snapshotRetention, snapshots), ({ $ref }) => {
|
||||
return pEach(getOldEntries(settings.snapshotRetention, snapshots), ({ $ref }) => {
|
||||
if ($ref !== baseVmRef) {
|
||||
return xapi.VM_destroy($ref)
|
||||
}
|
||||
@@ -367,7 +367,7 @@ class VmBackup {
|
||||
baseVm = await xapi.getRecord('VM', baseVm.$ref)
|
||||
|
||||
const baseUuidToSrcVdi = new Map()
|
||||
await asyncMap(await baseVm.$getDisks(), async baseRef => {
|
||||
await pEach(await baseVm.$getDisks(), async baseRef => {
|
||||
const [baseUuid, snapshotOf] = await Promise.all([
|
||||
xapi.getField('VDI', baseRef, 'uuid'),
|
||||
xapi.getField('VDI', baseRef, 'snapshot_of'),
|
||||
|
||||
@@ -38,7 +38,7 @@ exports.XoMetadataBackup = class XoMetadataBackup {
|
||||
)
|
||||
const metaDataFileName = `${dir}/metadata.json`
|
||||
|
||||
await asyncMap(
|
||||
await pEach(
|
||||
Object.entries(this._remoteAdapters),
|
||||
([remoteId, adapter]) =>
|
||||
Task.run(
|
||||
|
||||
@@ -4,11 +4,12 @@ const assert = require('assert')
|
||||
const sum = require('lodash/sum')
|
||||
const { asyncMap } = require('@xen-orchestra/async-map')
|
||||
const { Constants, mergeVhd, openVhd, VhdAbstract, VhdFile } = require('vhd-lib')
|
||||
const { isVhdAlias, resolveVhdAlias } = require('vhd-lib/aliases')
|
||||
const { dirname, resolve } = require('path')
|
||||
const { DISK_TYPES } = Constants
|
||||
const { isMetadataFile, isVhdFile, isXvaFile, isXvaSumFile } = require('./_backupType.js')
|
||||
const { isVhdAlias, resolveVhdAlias } = require('vhd-lib/aliases')
|
||||
const { limitConcurrency } = require('limit-concurrency-decorator')
|
||||
const { pEach } = require('@vates/async-each')
|
||||
|
||||
const { Task } = require('./Task.js')
|
||||
const { Disposable } = require('promise-toolbox')
|
||||
@@ -85,13 +86,13 @@ const listVhds = async (handler, vmDir) => {
|
||||
const aliases = {}
|
||||
const interruptedVhds = new Map()
|
||||
|
||||
await asyncMap(
|
||||
await pEach(
|
||||
await handler.list(`${vmDir}/vdis`, {
|
||||
ignoreMissing: true,
|
||||
prependDir: true,
|
||||
}),
|
||||
async jobDir =>
|
||||
asyncMap(
|
||||
pEach(
|
||||
await handler.list(jobDir, {
|
||||
prependDir: true,
|
||||
}),
|
||||
@@ -193,7 +194,7 @@ exports.cleanVm = async function cleanVm(
|
||||
const { vhds, interruptedVhds, aliases } = await listVhds(handler, vmDir)
|
||||
|
||||
// remove broken VHDs
|
||||
await asyncMap(vhds, async path => {
|
||||
await pEach(vhds, async path => {
|
||||
try {
|
||||
await Disposable.use(openVhd(handler, path, { checkSecondFooter: !interruptedVhds.has(path) }), vhd => {
|
||||
if (vhd.footer.diskType === DISK_TYPES.DIFFERENCING) {
|
||||
@@ -238,7 +239,7 @@ exports.cleanVm = async function cleanVm(
|
||||
|
||||
// check if alias are correct
|
||||
// check if all vhd in data subfolder have a corresponding alias
|
||||
await asyncMap(Object.keys(aliases), async dir => {
|
||||
await pEach(Object.keys(aliases), async dir => {
|
||||
await checkAliases(aliases[dir], `${dir}/data`, { handler, logInfo, logWarn, remove })
|
||||
})
|
||||
|
||||
@@ -296,7 +297,7 @@ exports.cleanVm = async function cleanVm(
|
||||
}
|
||||
})
|
||||
|
||||
await asyncMap(xvas, async path => {
|
||||
await pEach(xvas, async path => {
|
||||
// check is not good enough to delete the file, the best we can do is report
|
||||
// it
|
||||
if (!(await this.isValidXva(path))) {
|
||||
@@ -309,7 +310,7 @@ exports.cleanVm = async function cleanVm(
|
||||
|
||||
// compile the list of unused XVAs and VHDs, and remove backup metadata which
|
||||
// reference a missing XVA/VHD
|
||||
await asyncMap(jsons, async json => {
|
||||
await pEach(jsons, async json => {
|
||||
let metadata
|
||||
try {
|
||||
metadata = JSON.parse(await handler.readFile(json))
|
||||
@@ -416,7 +417,7 @@ exports.cleanVm = async function cleanVm(
|
||||
|
||||
const metadataWithMergedVhd = {}
|
||||
const doMerge = async () => {
|
||||
await asyncMap(toMerge, async chain => {
|
||||
await pEach(toMerge, async chain => {
|
||||
const merged = await limitedMergeVhdChain(chain, { handler, logInfo, logWarn, remove, merge })
|
||||
if (merged !== undefined) {
|
||||
const metadataPath = vhdsToJSons[chain[0]] // all the chain should have the same metada file
|
||||
@@ -428,14 +429,14 @@ exports.cleanVm = async function cleanVm(
|
||||
await Promise.all([
|
||||
...unusedVhdsDeletion,
|
||||
toMerge.length !== 0 && (merge ? Task.run({ name: 'merge' }, doMerge) : doMerge()),
|
||||
asyncMap(unusedXvas, path => {
|
||||
pEach(unusedXvas, path => {
|
||||
logWarn('unused XVA', { path })
|
||||
if (remove) {
|
||||
logInfo('deleting unused XVA', { path })
|
||||
return handler.unlink(path)
|
||||
}
|
||||
}),
|
||||
asyncMap(xvaSums, path => {
|
||||
pEach(xvaSums, path => {
|
||||
// no need to handle checksums for XVAs deleted by the script, they will be handled by `unlink()`
|
||||
if (!xvas.has(path.slice(0, -'.checksum'.length))) {
|
||||
logInfo('unused XVA checksum', { path })
|
||||
@@ -450,7 +451,7 @@ exports.cleanVm = async function cleanVm(
|
||||
// update size for delta metadata with merged VHD
|
||||
// check for the other that the size is the same as the real file size
|
||||
|
||||
await asyncMap(jsons, async metadataPath => {
|
||||
await pEach(jsons, async metadataPath => {
|
||||
const metadata = JSON.parse(await handler.readFile(metadataPath))
|
||||
|
||||
let fileSystemSize
|
||||
|
||||
@@ -5,10 +5,10 @@ const find = require('lodash/find.js')
|
||||
const groupBy = require('lodash/groupBy.js')
|
||||
const ignoreErrors = require('promise-toolbox/ignoreErrors')
|
||||
const omit = require('lodash/omit.js')
|
||||
const { asyncMap } = require('@xen-orchestra/async-map')
|
||||
const { CancelToken } = require('promise-toolbox')
|
||||
const { createVhdStreamWithLength } = require('vhd-lib')
|
||||
const { defer } = require('golike-defer')
|
||||
const { pEach } = require('@vates/async-each')
|
||||
|
||||
const { cancelableMap } = require('./_cancelableMap.js')
|
||||
const { Task } = require('./Task.js')
|
||||
@@ -237,13 +237,13 @@ exports.importDeltaVm = defer(async function importDeltaVm(
|
||||
$defer.onFailure.call(xapi, 'VM_destroy', vmRef)
|
||||
|
||||
// 2. Delete all VBDs which may have been created by the import.
|
||||
await asyncMap(await xapi.getField('VM', vmRef, 'VBDs'), ref => ignoreErrors.call(xapi.call('VBD.destroy', ref)))
|
||||
await pEach(await xapi.getField('VM', vmRef, 'VBDs'), ref => ignoreErrors.call(xapi.call('VBD.destroy', ref)))
|
||||
|
||||
// 3. Create VDIs & VBDs.
|
||||
const vbdRecords = deltaVm.vbds
|
||||
const vbds = groupBy(vbdRecords, 'VDI')
|
||||
const newVdis = {}
|
||||
await asyncMap(Object.keys(vdiRecords), async vdiRef => {
|
||||
await pEach(Object.keys(vdiRecords), async vdiRef => {
|
||||
const vdi = vdiRecords[vdiRef]
|
||||
let newVdi
|
||||
|
||||
@@ -279,7 +279,7 @@ exports.importDeltaVm = defer(async function importDeltaVm(
|
||||
|
||||
const vdiVbds = vbds[vdiRef]
|
||||
if (vdiVbds !== undefined) {
|
||||
await asyncMap(Object.values(vdiVbds), vbd =>
|
||||
await pEach(Object.values(vdiVbds), vbd =>
|
||||
xapi.VBD_create({
|
||||
...vbd,
|
||||
VDI: newVdi.$ref,
|
||||
@@ -323,7 +323,7 @@ exports.importDeltaVm = defer(async function importDeltaVm(
|
||||
}),
|
||||
|
||||
// Create VIFs.
|
||||
asyncMap(Object.values(deltaVm.vifs), vif => {
|
||||
pEach(Object.values(deltaVm.vifs), vif => {
|
||||
let network = vif.$network$uuid && xapi.getObjectByUuid(vif.$network$uuid, undefined)
|
||||
|
||||
if (network === undefined) {
|
||||
|
||||
@@ -33,7 +33,7 @@ exports.DeltaBackupWriter = class DeltaBackupWriter extends MixinBackupWriter(Ab
|
||||
const backupDir = getVmBackupDir(backup.vm.uuid)
|
||||
const vdisDir = `${backupDir}/vdis/${backup.job.id}`
|
||||
|
||||
await asyncMap(baseUuidToSrcVdi, async ([baseUuid, srcVdi]) => {
|
||||
await pEach(baseUuidToSrcVdi, async ([baseUuid, srcVdi]) => {
|
||||
let found = false
|
||||
try {
|
||||
const vhds = await handler.list(`${vdisDir}/${srcVdi.uuid}`, {
|
||||
@@ -41,7 +41,7 @@ exports.DeltaBackupWriter = class DeltaBackupWriter extends MixinBackupWriter(Ab
|
||||
prependDir: true,
|
||||
})
|
||||
const packedBaseUuid = packUuid(baseUuid)
|
||||
await asyncMap(vhds, async path => {
|
||||
await pEach(vhds, async path => {
|
||||
try {
|
||||
await checkVhdChain(handler, path)
|
||||
// Warning, this should not be written as found = found || await adapter.isMergeableParent(packedBaseUuid, path)
|
||||
|
||||
@@ -25,7 +25,7 @@ exports.DeltaReplicationWriter = class DeltaReplicationWriter extends MixinRepli
|
||||
|
||||
const xapi = replicatedVm.$xapi
|
||||
const replicatedVdis = new Set(
|
||||
await asyncMap(await replicatedVm.$getDisks(), async vdiRef => {
|
||||
await pEach(await replicatedVm.$getDisks(), async vdiRef => {
|
||||
const otherConfig = await xapi.getField('VDI', vdiRef, 'other_config')
|
||||
return otherConfig[TAG_COPY_SRC]
|
||||
})
|
||||
@@ -60,7 +60,10 @@ exports.DeltaReplicationWriter = class DeltaReplicationWriter extends MixinRepli
|
||||
const { scheduleId, vm } = this._backup
|
||||
|
||||
// delete previous interrupted copies
|
||||
ignoreErrors.call(asyncMapSettled(listReplicatedVms(xapi, scheduleId, undefined, vm.uuid), vm => vm.$destroy))
|
||||
ignoreErrors.call(
|
||||
pEach(listReplicatedVms(xapi, scheduleId, undefined, vm.uuid), vm => vm.$destroy),
|
||||
{ stopOnError: false }
|
||||
)
|
||||
|
||||
this._oldEntries = getOldEntries(settings.copyRetention - 1, listReplicatedVms(xapi, scheduleId, srUuid, vm.uuid))
|
||||
|
||||
@@ -76,7 +79,7 @@ exports.DeltaReplicationWriter = class DeltaReplicationWriter extends MixinRepli
|
||||
}
|
||||
|
||||
async _deleteOldEntries() {
|
||||
return asyncMapSettled(this._oldEntries, vm => vm.$destroy())
|
||||
return pEach(this._oldEntries, vm => vm.$destroy(), { stopOnError: false })
|
||||
}
|
||||
|
||||
async _transfer({ timestamp, deltaExport, sizeContainers }) {
|
||||
@@ -108,7 +111,7 @@ exports.DeltaReplicationWriter = class DeltaReplicationWriter extends MixinRepli
|
||||
targetVm.ha_restart_priority !== '' &&
|
||||
Promise.all([targetVm.set_ha_restart_priority(''), targetVm.add_tags('HA disabled')]),
|
||||
targetVm.set_name_label(`${vm.name_label} - ${job.name} - (${formatFilenameDate(timestamp)})`),
|
||||
asyncMap(['start', 'start_on'], op =>
|
||||
pEach(['start', 'start_on'], op =>
|
||||
targetVm.update_blocked_operations(
|
||||
op,
|
||||
'Start operation for this vm is blocked, clone it if you want to use it.'
|
||||
|
||||
@@ -40,12 +40,14 @@ exports.FullReplicationWriter = class FullReplicationWriter extends MixinReplica
|
||||
|
||||
// delete previous interrupted copies
|
||||
ignoreErrors.call(
|
||||
asyncMapSettled(listReplicatedVms(xapi, scheduleId, undefined, vm.uuid), vm => xapi.VM_destroy(vm.$ref))
|
||||
pEach(listReplicatedVms(xapi, scheduleId, undefined, vm.uuid), vm => xapi.VM_destroy(vm.$ref), {
|
||||
stopOnError: false,
|
||||
})
|
||||
)
|
||||
|
||||
const oldVms = getOldEntries(settings.copyRetention - 1, listReplicatedVms(xapi, scheduleId, srUuid, vm.uuid))
|
||||
|
||||
const deleteOldBackups = () => asyncMapSettled(oldVms, vm => xapi.VM_destroy(vm.$ref))
|
||||
const deleteOldBackups = () => pEach(oldVms, vm => xapi.VM_destroy(vm.$ref), { stopOnError: false })
|
||||
const { deleteFirst } = settings
|
||||
if (deleteFirst) {
|
||||
await deleteOldBackups()
|
||||
@@ -66,7 +68,7 @@ exports.FullReplicationWriter = class FullReplicationWriter extends MixinReplica
|
||||
const targetVm = await xapi.getRecord('VM', targetVmRef)
|
||||
|
||||
await Promise.all([
|
||||
asyncMap(['start', 'start_on'], op =>
|
||||
pEach(['start', 'start_on'], op =>
|
||||
targetVm.update_blocked_operations(
|
||||
op,
|
||||
'Start operation for this vm is blocked, clone it if you want to use it.'
|
||||
|
||||
@@ -554,15 +554,18 @@ export default class RemoteHandlerAbstract {
|
||||
}
|
||||
|
||||
const files = await this._list(dir)
|
||||
await asyncMapSettled(files, file =>
|
||||
this._unlink(`${dir}/${file}`).catch(error => {
|
||||
// Unlink dir behavior is not consistent across platforms
|
||||
// https://github.com/nodejs/node-v0.x-archive/issues/5791
|
||||
if (error.code === 'EISDIR' || error.code === 'EPERM') {
|
||||
return this._rmtree(`${dir}/${file}`)
|
||||
}
|
||||
throw error
|
||||
})
|
||||
await pEach(files, file =>
|
||||
this._unlink(`${dir}/${file}`).catch(
|
||||
error => {
|
||||
// Unlink dir behavior is not consistent across platforms
|
||||
// https://github.com/nodejs/node-v0.x-archive/issues/5791
|
||||
if (error.code === 'EISDIR' || error.code === 'EPERM') {
|
||||
return this._rmtree(`${dir}/${file}`)
|
||||
}
|
||||
throw error
|
||||
},
|
||||
{ stopOnError: false }
|
||||
)
|
||||
)
|
||||
return this._rmtree(dir)
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ import createBufferFromStream from './_createBufferFromStream.js'
|
||||
import guessAwsRegion from './_guessAwsRegion.js'
|
||||
import RemoteHandlerAbstract from './abstract'
|
||||
import { basename, join, split } from './_path'
|
||||
import { asyncEach } from '@vates/async-each'
|
||||
import { pEach } from '@vates/async-each'
|
||||
|
||||
// endpoints https://docs.aws.amazon.com/general/latest/gr/s3.html
|
||||
|
||||
@@ -369,7 +369,7 @@ export default class S3Handler extends RemoteHandlerAbstract {
|
||||
)
|
||||
|
||||
NextContinuationToken = result.IsTruncated ? result.NextContinuationToken : undefined
|
||||
await asyncEach(
|
||||
await pEach(
|
||||
result.Contents ?? [],
|
||||
async ({ Key }) => {
|
||||
// _unlink will add the prefix, but Key contains everything
|
||||
|
||||
@@ -250,7 +250,7 @@ export default class Backups {
|
||||
listPoolMetadataBackups: [
|
||||
async ({ remotes }) => {
|
||||
const backupsByRemote = {}
|
||||
await asyncMap(Object.entries(remotes), async ([remoteId, remote]) => {
|
||||
await pEach(Object.entries(remotes), async ([remoteId, remote]) => {
|
||||
try {
|
||||
await Disposable.use(this.getAdapter(remote), async adapter => {
|
||||
backupsByRemote[remoteId] = await adapter.listPoolMetadataBackups()
|
||||
@@ -274,7 +274,7 @@ export default class Backups {
|
||||
listVmBackups: [
|
||||
async ({ remotes }) => {
|
||||
const backups = {}
|
||||
await asyncMap(Object.keys(remotes), async remoteId => {
|
||||
await pEach(Object.keys(remotes), async remoteId => {
|
||||
try {
|
||||
await Disposable.use(this.getAdapter(remotes[remoteId]), async adapter => {
|
||||
backups[remoteId] = formatVmBackups(await adapter.listAllVmBackups())
|
||||
@@ -304,7 +304,7 @@ export default class Backups {
|
||||
listXoMetadataBackups: [
|
||||
async ({ remotes }) => {
|
||||
const backupsByRemote = {}
|
||||
await asyncMap(Object.entries(remotes), async ([remoteId, remote]) => {
|
||||
await pEach(Object.entries(remotes), async ([remoteId, remote]) => {
|
||||
try {
|
||||
await Disposable.use(this.getAdapter(remote), async adapter => {
|
||||
backupsByRemote[remoteId] = await adapter.listXoMetadataBackups()
|
||||
|
||||
@@ -68,12 +68,12 @@ class Sr {
|
||||
}
|
||||
}
|
||||
}
|
||||
await asyncMap(await this.getField('SR', ref, 'VDIs'), async ref => {
|
||||
await asyncMap(await this.getField('VDI', ref, 'VBDs'), handleVbd)
|
||||
await pEach(await this.getField('SR', ref, 'VDIs'), async ref => {
|
||||
await pEach(await this.getField('VDI', ref, 'VBDs'), handleVbd)
|
||||
})
|
||||
|
||||
{
|
||||
const runningVmUuids = await asyncMap(runningVms.keys(), ref => this.getField('VM', ref, 'uuid'))
|
||||
const runningVmUuids = await pEach(runningVms.keys(), ref => this.getField('VM', ref, 'uuid'))
|
||||
|
||||
const set = new Set(vmsToShutdown)
|
||||
for (const vmUuid of runningVmUuids) {
|
||||
@@ -89,29 +89,37 @@ class Sr {
|
||||
|
||||
state.shutdownVms = {}
|
||||
|
||||
await asyncMapSettled(runningVms, async ([ref, isPaused]) => {
|
||||
state.shutdownVms[await this.getField('VM', ref, 'uuid')] = isPaused
|
||||
await pEach(
|
||||
runningVms,
|
||||
async ([ref, isPaused]) => {
|
||||
state.shutdownVms[await this.getField('VM', ref, 'uuid')] = isPaused
|
||||
|
||||
try {
|
||||
await this.callAsync('VM.clean_shutdown', ref)
|
||||
} catch (error) {
|
||||
warn('SR_enableMaintenanceMode, VM clean shutdown', { error })
|
||||
await this.callAsync('VM.hard_shutdown', ref)
|
||||
}
|
||||
try {
|
||||
await this.callAsync('VM.clean_shutdown', ref)
|
||||
} catch (error) {
|
||||
warn('SR_enableMaintenanceMode, VM clean shutdown', { error })
|
||||
await this.callAsync('VM.hard_shutdown', ref)
|
||||
}
|
||||
|
||||
$defer.onFailure.call(this, 'callAsync', 'VM.start', ref, isPaused, true)
|
||||
})
|
||||
$defer.onFailure.call(this, 'callAsync', 'VM.start', ref, isPaused, true)
|
||||
},
|
||||
{ stopOnError: false }
|
||||
)
|
||||
|
||||
state.unpluggedPbds = []
|
||||
await asyncMapSettled(await this.getField('SR', ref, 'PBDs'), async ref => {
|
||||
if (await this.getField('PBD', ref, 'currently_attached')) {
|
||||
state.unpluggedPbds.push(await this.getField('PBD', ref, 'uuid'))
|
||||
await pEach(
|
||||
await this.getField('SR', ref, 'PBDs'),
|
||||
async ref => {
|
||||
if (await this.getField('PBD', ref, 'currently_attached')) {
|
||||
state.unpluggedPbds.push(await this.getField('PBD', ref, 'uuid'))
|
||||
|
||||
await this.callAsync('PBD.unplug', ref)
|
||||
await this.callAsync('PBD.unplug', ref)
|
||||
|
||||
$defer.onFailure.call(this, 'callAsync', 'PBD.plug', ref)
|
||||
}
|
||||
})
|
||||
$defer.onFailure.call(this, 'callAsync', 'PBD.plug', ref)
|
||||
}
|
||||
},
|
||||
{ stopOnError: false }
|
||||
)
|
||||
|
||||
await this.setFieldEntry('SR', ref, 'other_config', OC_MAINTENANCE, JSON.stringify(state))
|
||||
}
|
||||
@@ -125,7 +133,7 @@ class Sr {
|
||||
|
||||
const errors = []
|
||||
|
||||
await asyncMap(state.unpluggedPbds, async uuid => {
|
||||
await pEach(state.unpluggedPbds, async uuid => {
|
||||
try {
|
||||
await this.callAsync('PBD.plug', await this.call('PBD.get_by_uuid', uuid))
|
||||
} catch (error) {
|
||||
@@ -133,7 +141,7 @@ class Sr {
|
||||
}
|
||||
})
|
||||
|
||||
await asyncMap(Object.entries(state.shutdownVms), async ([uuid, isPaused]) => {
|
||||
await pEach(Object.entries(state.shutdownVms), async ([uuid, isPaused]) => {
|
||||
try {
|
||||
await this.callAsync('VM.start', await this.call('VM.get_by_uuid', uuid), isPaused, true)
|
||||
} catch (error) {
|
||||
|
||||
@@ -48,7 +48,7 @@ const cleanBiosStrings = biosStrings => {
|
||||
|
||||
async function listNobakVbds(xapi, vbdRefs) {
|
||||
const vbds = []
|
||||
await asyncMap(vbdRefs, async vbdRef => {
|
||||
await pEach(vbdRefs, async vbdRef => {
|
||||
const vbd = await xapi.getRecord('VBD', vbdRef)
|
||||
if (
|
||||
vbd.type === 'Disk' &&
|
||||
@@ -152,7 +152,7 @@ class Vm {
|
||||
|
||||
if (ignoreNobakVdis) {
|
||||
if (vm.power_state === 'Halted') {
|
||||
await asyncMap(await listNobakVbds(this, vm.VBDs), async vbd => {
|
||||
await pEach(await listNobakVbds(this, vm.VBDs), async vbd => {
|
||||
await this.VBD_destroy(vbd.$ref)
|
||||
$defer.call(this, 'VBD_create', vbd)
|
||||
})
|
||||
@@ -181,9 +181,7 @@ class Vm {
|
||||
)
|
||||
|
||||
if (destroyNobakVdis) {
|
||||
await asyncMap(await listNobakVbds(this, await this.getField('VM', ref, 'VBDs')), vbd =>
|
||||
this.VDI_destroy(vbd.VDI)
|
||||
)
|
||||
await pEach(await listNobakVbds(this, await this.getField('VM', ref, 'VBDs')), vbd => this.VDI_destroy(vbd.VDI))
|
||||
}
|
||||
|
||||
return ref
|
||||
@@ -385,7 +383,7 @@ class Vm {
|
||||
await this.call('VM.destroy', vmRef)
|
||||
|
||||
await Promise.all([
|
||||
asyncMap(vm.snapshots, snapshotRef =>
|
||||
pEach(vm.snapshots, snapshotRef =>
|
||||
this.VM_destroy(snapshotRef).catch(error => {
|
||||
warn('VM_destroy: failed to destroy snapshot', {
|
||||
error,
|
||||
@@ -395,7 +393,7 @@ class Vm {
|
||||
})
|
||||
),
|
||||
deleteDisks &&
|
||||
asyncMap(disks, async vdiRef => {
|
||||
pEach(disks, async vdiRef => {
|
||||
try {
|
||||
// Dont destroy if attached to other (non control domain) VMs
|
||||
for (const vbdRef of await this.getField('VDI', vdiRef, 'VBDs')) {
|
||||
@@ -551,7 +549,7 @@ class Vm {
|
||||
// vm.VUSBs can be undefined (e.g. on XS 7.0.0)
|
||||
const vusbs = vm.VUSBs
|
||||
if (vusbs !== undefined) {
|
||||
await asyncMap(vusbs, async ref => {
|
||||
await pEach(vusbs, async ref => {
|
||||
const vusb = await this.getRecord('VUSB', ref)
|
||||
await vusb.$call('destroy')
|
||||
$defer.call(this, 'call', 'VUSB.create', vusb.VM, vusb.USB_group, vusb.other_config)
|
||||
@@ -562,7 +560,7 @@ class Vm {
|
||||
let destroyNobakVdis = false
|
||||
if (ignoreNobakVdis) {
|
||||
if (isHalted) {
|
||||
await asyncMap(await listNobakVbds(this, vm.VBDs), async vbd => {
|
||||
await pEach(await listNobakVbds(this, vm.VBDs), async vbd => {
|
||||
await this.VBD_destroy(vbd.$ref)
|
||||
$defer.call(this, 'VBD_create', vbd)
|
||||
})
|
||||
@@ -659,9 +657,7 @@ class Vm {
|
||||
)
|
||||
|
||||
if (destroyNobakVdis) {
|
||||
await asyncMap(await listNobakVbds(this, await this.getField('VM', ref, 'VBDs')), vbd =>
|
||||
this.VDI_destroy(vbd.VDI)
|
||||
)
|
||||
await pEach(await listNobakVbds(this, await this.getField('VM', ref, 'VBDs')), vbd => this.VDI_destroy(vbd.VDI))
|
||||
}
|
||||
|
||||
return ref
|
||||
|
||||
@@ -64,7 +64,7 @@ const VhdSynthetic = class VhdSynthetic extends VhdAbstract {
|
||||
}
|
||||
|
||||
async readBlockAllocationTable() {
|
||||
await asyncMap(this.#vhds, vhd => vhd.readBlockAllocationTable())
|
||||
await pEach(this.#vhds, vhd => vhd.readBlockAllocationTable())
|
||||
}
|
||||
|
||||
containsBlock(blockId) {
|
||||
@@ -74,7 +74,7 @@ const VhdSynthetic = class VhdSynthetic extends VhdAbstract {
|
||||
async readHeaderAndFooter() {
|
||||
const vhds = this.#vhds
|
||||
|
||||
await asyncMap(vhds, vhd => vhd.readHeaderAndFooter())
|
||||
await pEach(vhds, vhd => vhd.readHeaderAndFooter())
|
||||
|
||||
for (let i = 0, n = vhds.length - 1; i < n; ++i) {
|
||||
const child = vhds[i]
|
||||
|
||||
@@ -4,13 +4,13 @@ const { createLogger } = require('@xen-orchestra/log')
|
||||
const { parseVhdStream } = require('./parseVhdStream.js')
|
||||
const { VhdDirectory } = require('./Vhd/VhdDirectory.js')
|
||||
const { Disposable } = require('promise-toolbox')
|
||||
const { asyncEach } = require('@vates/async-each')
|
||||
const { pEach } = require('@vates/async-each')
|
||||
|
||||
const { warn } = createLogger('vhd-lib:createVhdDirectoryFromStream')
|
||||
|
||||
const buildVhd = Disposable.wrap(async function* (handler, path, inputStream, { concurrency, compression }) {
|
||||
const vhd = yield VhdDirectory.create(handler, path, { compression })
|
||||
await asyncEach(
|
||||
await pEach(
|
||||
parseVhdStream(inputStream),
|
||||
async function (item) {
|
||||
switch (item.type) {
|
||||
|
||||
@@ -12,7 +12,7 @@ const { openVhd } = require('./openVhd')
|
||||
const { basename, dirname } = require('path')
|
||||
const { DISK_TYPES } = require('./_constants')
|
||||
const { Disposable } = require('promise-toolbox')
|
||||
const { asyncEach } = require('@vates/async-each')
|
||||
const { pEach } = require('@vates/async-each')
|
||||
const { VhdAbstract } = require('./Vhd/VhdAbstract')
|
||||
const { VhdDirectory } = require('./Vhd/VhdDirectory')
|
||||
const { VhdSynthetic } = require('./Vhd/VhdSynthetic')
|
||||
@@ -63,7 +63,7 @@ function cleanupVhds(handler, parent, children, { logInfo = noop, remove = false
|
||||
|
||||
return Promise.all([
|
||||
VhdAbstract.rename(handler, parent, mergeTargetChild),
|
||||
asyncMap(children, child => {
|
||||
pEach(children, child => {
|
||||
logInfo(`the VHD child is already merged`, { child })
|
||||
if (remove) {
|
||||
logInfo(`deleting merged VHD child`, { child })
|
||||
@@ -167,7 +167,7 @@ module.exports.mergeVhd = limitConcurrency(2)(async function merge(
|
||||
let counter = 0
|
||||
|
||||
const mergeStateWriter = makeThrottledWriter(parentHandler, mergeStatePath, 10e3)
|
||||
await asyncEach(
|
||||
await pEach(
|
||||
toMerge,
|
||||
async blockId => {
|
||||
merging.add(blockId)
|
||||
|
||||
@@ -152,7 +152,7 @@ export async function installPatches({ pool, patches, hosts }) {
|
||||
await hosts.shift().$restartAgent()
|
||||
}
|
||||
|
||||
await asyncMap(hosts, host => host.$restartAgent())
|
||||
await pEach(hosts, host => host.$restartAgent())
|
||||
}
|
||||
|
||||
installPatches.params = {
|
||||
|
||||
@@ -67,7 +67,7 @@ export async function destroy({ sr }) {
|
||||
const config = xapi.xo.getData(sr, 'xosan_config')
|
||||
// we simply forget because the hosted disks are being destroyed with the VMs
|
||||
await xapi.forgetSr(sr._xapiId)
|
||||
await asyncMapSettled(config.nodes, node => this.getXapiObject(node.vm.id).$destroy())
|
||||
await pEach(config.nodes, node => this.getXapiObject(node.vm.id).$destroy(), { stopOnError: false })
|
||||
await xapi.deleteNetwork(config.network)
|
||||
if (sr.SR_type === 'xosan') {
|
||||
await this.unbindXosanLicense({ srId: sr.id })
|
||||
|
||||
@@ -374,13 +374,17 @@ const delete_ = defer(async function (
|
||||
$defer.onFailure(() => this.setVmResourceSet(vm._xapiId, resourceSet, true)::ignoreErrors())
|
||||
}
|
||||
|
||||
await asyncMapSettled(vm.snapshots, async id => {
|
||||
const { resourceSet } = this.getObject(id)
|
||||
if (resourceSet !== undefined) {
|
||||
await this.setVmResourceSet(id, null)
|
||||
$defer.onFailure(() => this.setVmResourceSet(id, resourceSet, true))
|
||||
}
|
||||
})
|
||||
await pEach(
|
||||
vm.snapshots,
|
||||
async id => {
|
||||
const { resourceSet } = this.getObject(id)
|
||||
if (resourceSet !== undefined) {
|
||||
await this.setVmResourceSet(id, null)
|
||||
$defer.onFailure(() => this.setVmResourceSet(id, resourceSet, true))
|
||||
}
|
||||
},
|
||||
{ stopOnError: false }
|
||||
)
|
||||
|
||||
return xapi.VM_destroy(vm._xapiRef, { deleteDisks, force, forceDeleteDefaultTemplate })
|
||||
})
|
||||
|
||||
@@ -401,19 +401,25 @@ const createNetworkAndInsertHosts = defer(async function ($defer, xapi, pif, vla
|
||||
pif,
|
||||
address: networkPrefix + hostIpLastNumber++,
|
||||
}))
|
||||
await asyncMapSettled(addresses, addressAndPif => reconfigurePifIP(xapi, addressAndPif.pif, addressAndPif.address))
|
||||
await pEach(addresses, addressAndPif => reconfigurePifIP(xapi, addressAndPif.pif, addressAndPif.address), {
|
||||
stopOnError: false,
|
||||
})
|
||||
const master = xapi.pool.$master
|
||||
const otherAddresses = addresses.filter(addr => addr.pif.$host !== master)
|
||||
await asyncMapSettled(otherAddresses, async address => {
|
||||
const result = await callPlugin(xapi, master, 'run_ping', {
|
||||
address: address.address,
|
||||
})
|
||||
if (result.exit !== 0) {
|
||||
throw invalidParameters(
|
||||
`Could not ping ${master.name_label}->${address.pif.$host.name_label} (${address.address}) \n${result.stdout}`
|
||||
)
|
||||
}
|
||||
})
|
||||
await pEach(
|
||||
otherAddresses,
|
||||
async address => {
|
||||
const result = await callPlugin(xapi, master, 'run_ping', {
|
||||
address: address.address,
|
||||
})
|
||||
if (result.exit !== 0) {
|
||||
throw invalidParameters(
|
||||
`Could not ping ${master.name_label}->${address.pif.$host.name_label} (${address.address}) \n${result.stdout}`
|
||||
)
|
||||
}
|
||||
},
|
||||
{ stopOnError: false }
|
||||
)
|
||||
return xosanNetwork
|
||||
})
|
||||
|
||||
@@ -441,10 +447,14 @@ async function getOrCreateSshKey(xapi) {
|
||||
}
|
||||
|
||||
const _probePoolAndWaitForPresence = defer(async function ($defer, glusterEndpoint, addresses) {
|
||||
await asyncMapSettled(addresses, async address => {
|
||||
await glusterCmd(glusterEndpoint, 'peer probe ' + address)
|
||||
$defer.onFailure(() => glusterCmd(glusterEndpoint, 'peer detach ' + address, true))
|
||||
})
|
||||
await pEach(
|
||||
addresses,
|
||||
async address => {
|
||||
await glusterCmd(glusterEndpoint, 'peer probe ' + address)
|
||||
$defer.onFailure(() => glusterCmd(glusterEndpoint, 'peer detach ' + address, true))
|
||||
},
|
||||
{ stopOnError: false }
|
||||
)
|
||||
|
||||
function shouldRetry(peers) {
|
||||
for (const peer of peers) {
|
||||
@@ -1129,11 +1139,11 @@ export const removeBricks = defer(async function ($defer, { xosansr, bricks }) {
|
||||
const dict = _getIPToVMDict(xapi, xosansr.id)
|
||||
const brickVMs = map(bricks, b => dict[b])
|
||||
await glusterCmd(glusterEndpoint, `volume remove-brick xosan ${bricks.join(' ')} force`)
|
||||
await asyncMapSettled(ips, ip => glusterCmd(glusterEndpoint, 'peer detach ' + ip, true))
|
||||
await pEach(ips, ip => glusterCmd(glusterEndpoint, 'peer detach ' + ip, true), { stopOnError: false })
|
||||
remove(data.nodes, node => ips.includes(node.vm.ip))
|
||||
await xapi.xo.setData(xosansr.id, 'xosan_config', data)
|
||||
await xapi.callAsync('SR.scan', xapi.getObject(xosansr._xapiId).$ref)
|
||||
await asyncMapSettled(brickVMs, vm => xapi.VM_destroy(vm.vm.$ref, true))
|
||||
await pEach(brickVMs, vm => xapi.VM_destroy(vm.vm.$ref, true), { stopOnError: false })
|
||||
} finally {
|
||||
delete CURRENT_POOL_OPERATIONS[xapi.pool.$id]
|
||||
}
|
||||
|
||||
@@ -81,22 +81,31 @@ export default class Redis extends Collection {
|
||||
|
||||
await redis.sadd(`${prefix}::indexes`, indexes)
|
||||
|
||||
await asyncMapSettled(indexes, index =>
|
||||
redis.keys(`${prefix}_${index}:*`).then(keys => keys.length !== 0 && redis.del(keys))
|
||||
await pEach(
|
||||
indexes,
|
||||
index => redis.keys(`${prefix}_${index}:*`).then(keys => keys.length !== 0 && redis.del(keys)),
|
||||
{ stopOnError: false }
|
||||
)
|
||||
|
||||
const idsIndex = `${prefix}_ids`
|
||||
await asyncMapSettled(redis.smembers(idsIndex), id =>
|
||||
redis.hgetall(`${prefix}:${id}`).then(values =>
|
||||
values == null
|
||||
? redis.srem(idsIndex, id) // entry no longer exists
|
||||
: asyncMapSettled(indexes, index => {
|
||||
const value = values[index]
|
||||
if (value !== undefined) {
|
||||
return redis.sadd(`${prefix}_${index}:${String(value).toLowerCase()}`, id)
|
||||
}
|
||||
})
|
||||
)
|
||||
await pEach(
|
||||
redis.smembers(idsIndex),
|
||||
id =>
|
||||
redis.hgetall(`${prefix}:${id}`).then(values =>
|
||||
values == null
|
||||
? redis.srem(idsIndex, id) // entry no longer exists
|
||||
: pEach(
|
||||
indexes,
|
||||
index => {
|
||||
const value = values[index]
|
||||
if (value !== undefined) {
|
||||
return redis.sadd(`${prefix}_${index}:${String(value).toLowerCase()}`, id)
|
||||
}
|
||||
},
|
||||
{ stopOnError: false }
|
||||
)
|
||||
),
|
||||
{ stopOnError: false }
|
||||
)
|
||||
}
|
||||
|
||||
@@ -146,12 +155,16 @@ export default class Redis extends Collection {
|
||||
// remove the previous values from indexes
|
||||
if (indexes.length !== 0) {
|
||||
const previous = await redis.hgetall(`${prefix}:${id}`)
|
||||
await asyncMapSettled(indexes, index => {
|
||||
const value = previous[index]
|
||||
if (value !== undefined) {
|
||||
return redis.srem(`${prefix}_${index}:${String(value).toLowerCase()}`, id)
|
||||
}
|
||||
})
|
||||
await pEach(
|
||||
indexes,
|
||||
index => {
|
||||
const value = previous[index]
|
||||
if (value !== undefined) {
|
||||
return redis.srem(`${prefix}_${index}:${String(value).toLowerCase()}`, id)
|
||||
}
|
||||
},
|
||||
{ stopOnError: false }
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -230,17 +243,24 @@ export default class Redis extends Collection {
|
||||
if (indexes.length !== 0) {
|
||||
promise = Promise.all([
|
||||
promise,
|
||||
asyncMapSettled(ids, id =>
|
||||
redis.hgetall(`${prefix}:${id}`).then(
|
||||
values =>
|
||||
values != null &&
|
||||
asyncMapSettled(indexes, index => {
|
||||
const value = values[index]
|
||||
if (value !== undefined) {
|
||||
return redis.srem(`${prefix}_${index}:${String(value).toLowerCase()}`, id)
|
||||
}
|
||||
})
|
||||
)
|
||||
pEach(
|
||||
ids,
|
||||
id =>
|
||||
redis.hgetall(`${prefix}:${id}`).then(
|
||||
values =>
|
||||
values != null &&
|
||||
pEach(
|
||||
indexes,
|
||||
index => {
|
||||
const value = values[index]
|
||||
if (value !== undefined) {
|
||||
return redis.srem(`${prefix}_${index}:${String(value).toLowerCase()}`, id)
|
||||
}
|
||||
},
|
||||
{ stopOnError: false }
|
||||
)
|
||||
),
|
||||
{ stopOnError: false }
|
||||
),
|
||||
])
|
||||
}
|
||||
|
||||
@@ -369,7 +369,7 @@ async function registerPluginsInPath(path, prefix) {
|
||||
throw error
|
||||
})
|
||||
|
||||
await asyncMap(files, name => {
|
||||
await pEach(files, name => {
|
||||
if (name.startsWith(prefix)) {
|
||||
return registerPluginWrapper.call(this, `${path}/${name}`, name.slice(prefix.length))
|
||||
}
|
||||
|
||||
@@ -42,6 +42,7 @@ import {
|
||||
parseDateTime,
|
||||
prepareXapiParam,
|
||||
} from './utils.mjs'
|
||||
import { pEach } from '@vates/async-each'
|
||||
|
||||
const log = createLogger('xo:xapi')
|
||||
|
||||
@@ -171,7 +172,7 @@ export default class Xapi extends XapiBase {
|
||||
const host = this.getObject(hostId)
|
||||
const vms = host.$resident_VMs
|
||||
log.debug(`Emergency shutdown: ${host.name_label}`)
|
||||
await asyncMap(vms, vm => {
|
||||
await pEach(vms, vm => {
|
||||
if (!vm.is_control_domain) {
|
||||
return ignoreErrors.call(this.callAsync('VM.suspend', vm.$ref))
|
||||
}
|
||||
@@ -261,11 +262,15 @@ export default class Xapi extends XapiBase {
|
||||
// from host to another. It only works when a shared SR is present
|
||||
// in the host. For this reason we chose to show a warning instead.
|
||||
const pluggedPbds = host.$PBDs.filter(pbd => pbd.currently_attached)
|
||||
await asyncMapSettled(pluggedPbds, async pbd => {
|
||||
const ref = pbd.$ref
|
||||
await this.unplugPbd(ref)
|
||||
$defer(() => this.plugPbd(ref))
|
||||
})
|
||||
await pEach(
|
||||
pluggedPbds,
|
||||
async pbd => {
|
||||
const ref = pbd.$ref
|
||||
await this.unplugPbd(ref)
|
||||
$defer(() => this.plugPbd(ref))
|
||||
},
|
||||
{ stopOnError: false }
|
||||
)
|
||||
|
||||
return host.update_other_config(
|
||||
multipathing
|
||||
@@ -702,7 +707,9 @@ export default class Xapi extends XapiBase {
|
||||
$defer.onFailure(() => this.VM_destroy(vm.$ref))
|
||||
// Disable start and change the VM name label during import.
|
||||
await Promise.all([
|
||||
asyncMapSettled(['start', 'start_on'], op => vm.update_blocked_operations(op, 'OVA import in progress...')),
|
||||
pEach(['start', 'start_on'], op => vm.update_blocked_operations(op, 'OVA import in progress...'), {
|
||||
stopOnError: false,
|
||||
}),
|
||||
vm.set_name_label(`[Importing...] ${nameLabel}`),
|
||||
])
|
||||
|
||||
@@ -879,7 +886,7 @@ export default class Xapi extends XapiBase {
|
||||
}
|
||||
|
||||
throw new AggregateError(
|
||||
await asyncMap(await this.call('host.get_all'), async hostRef => {
|
||||
await pEach(await this.call('host.get_all'), async hostRef => {
|
||||
const hostNameLabel = await this.call('host.get_name_label', hostRef)
|
||||
try {
|
||||
await this.call('VM.assert_can_boot_here', vmRef, hostRef)
|
||||
@@ -1008,13 +1015,17 @@ export default class Xapi extends XapiBase {
|
||||
throw error
|
||||
}
|
||||
const newVdi = await this.barrier(await this.callAsync('VDI.copy', vdi.$ref, sr.$ref).then(extractOpaqueRef))
|
||||
await asyncMapSettled(vdi.$VBDs, async vbd => {
|
||||
await this.call('VBD.destroy', vbd.$ref)
|
||||
await this.VBD_create({
|
||||
...vbd,
|
||||
VDI: newVdi.$ref,
|
||||
})
|
||||
})
|
||||
await pEach(
|
||||
vdi.$VBDs,
|
||||
async vbd => {
|
||||
await this.call('VBD.destroy', vbd.$ref)
|
||||
await this.VBD_create({
|
||||
...vbd,
|
||||
VDI: newVdi.$ref,
|
||||
})
|
||||
},
|
||||
{ stopOnError: false }
|
||||
)
|
||||
await vdi.$destroy()
|
||||
|
||||
return newVdi
|
||||
@@ -1195,7 +1206,7 @@ export default class Xapi extends XapiBase {
|
||||
})
|
||||
})
|
||||
|
||||
await asyncMapSettled(pifsByHost, pifs => this.call('Bond.create', network.$ref, pifs, '', bondMode))
|
||||
await pEach(pifsByHost, pifs => this.call('Bond.create', network.$ref, pifs, '', bondMode), { stopOnError: false })
|
||||
|
||||
return network
|
||||
}
|
||||
|
||||
@@ -194,32 +194,40 @@ export default class BackupNg {
|
||||
const remotes = {}
|
||||
const xapis = {}
|
||||
await waitAll([
|
||||
asyncMapSettled(remoteIds, async id => {
|
||||
const remote = await app.getRemoteWithCredentials(id)
|
||||
if (remote.proxy !== proxyId) {
|
||||
throw new Error(
|
||||
proxyId === undefined
|
||||
? 'The remote must not be linked to a proxy'
|
||||
: `The remote ${remote.name} must be linked to the proxy ${proxyId}`
|
||||
)
|
||||
}
|
||||
pEach(
|
||||
remoteIds,
|
||||
async id => {
|
||||
const remote = await app.getRemoteWithCredentials(id)
|
||||
if (remote.proxy !== proxyId) {
|
||||
throw new Error(
|
||||
proxyId === undefined
|
||||
? 'The remote must not be linked to a proxy'
|
||||
: `The remote ${remote.name} must be linked to the proxy ${proxyId}`
|
||||
)
|
||||
}
|
||||
|
||||
remotes[id] = remote
|
||||
}),
|
||||
asyncMapSettled([...servers], async id => {
|
||||
const { allowUnauthorized, password, username } = await app.getXenServer(id)
|
||||
remotes[id] = remote
|
||||
},
|
||||
{ stopOnError: false }
|
||||
),
|
||||
pEach(
|
||||
servers,
|
||||
async id => {
|
||||
const { allowUnauthorized, password, username } = await app.getXenServer(id)
|
||||
|
||||
const xapi = app.getAllXapis()[id]
|
||||
const xapi = app.getAllXapis()[id]
|
||||
|
||||
xapis[id] = {
|
||||
allowUnauthorized,
|
||||
credentials: {
|
||||
username,
|
||||
password,
|
||||
},
|
||||
url: await xapi.getHostBackupUrl(xapi.pool.$master),
|
||||
}
|
||||
}),
|
||||
xapis[id] = {
|
||||
allowUnauthorized,
|
||||
credentials: {
|
||||
username,
|
||||
password,
|
||||
},
|
||||
url: await xapi.getHostBackupUrl(xapi.pool.$master),
|
||||
}
|
||||
},
|
||||
{ stopOnError: false }
|
||||
),
|
||||
])
|
||||
|
||||
const params = {
|
||||
@@ -295,12 +303,16 @@ export default class BackupNg {
|
||||
if (schedules !== undefined) {
|
||||
const { id, settings } = job
|
||||
const tmpIds = Object.keys(schedules)
|
||||
await asyncMapSettled(tmpIds, async tmpId => {
|
||||
const schedule = schedules[tmpId]
|
||||
schedule.jobId = id
|
||||
settings[(await app.createSchedule(schedule)).id] = settings[tmpId]
|
||||
delete settings[tmpId]
|
||||
})
|
||||
await pEach(
|
||||
tmpIds,
|
||||
async tmpId => {
|
||||
const schedule = schedules[tmpId]
|
||||
schedule.jobId = id
|
||||
settings[(await app.createSchedule(schedule)).id] = settings[tmpId]
|
||||
delete settings[tmpId]
|
||||
},
|
||||
{ stopOnError: false }
|
||||
)
|
||||
await app.updateJob({ id, settings })
|
||||
}
|
||||
|
||||
@@ -355,11 +367,15 @@ export default class BackupNg {
|
||||
const [schedules] = await Promise.all([app.getAllSchedules(), app.getJob(id, 'backup')])
|
||||
await Promise.all([
|
||||
app.removeJob(id),
|
||||
asyncMapSettled(schedules, schedule => {
|
||||
if (schedule.id === id) {
|
||||
app.deleteSchedule(schedule.id)
|
||||
}
|
||||
}),
|
||||
pEach(
|
||||
schedules,
|
||||
schedule => {
|
||||
if (schedule.id === id) {
|
||||
app.deleteSchedule(schedule.id)
|
||||
}
|
||||
},
|
||||
{ stopOnError: false }
|
||||
),
|
||||
])
|
||||
}
|
||||
|
||||
@@ -370,23 +386,27 @@ export default class BackupNg {
|
||||
async deleteVmBackupsNg(ids) {
|
||||
const app = this._app
|
||||
const backupsByRemote = groupBy(ids.map(parseVmBackupId), 'remoteId')
|
||||
await asyncMapSettled(Object.entries(backupsByRemote), async ([remoteId, backups]) => {
|
||||
const filenames = backups.map(_ => _.metadataFilename)
|
||||
const remote = await app.getRemoteWithCredentials(remoteId)
|
||||
if (remote.proxy !== undefined) {
|
||||
await app.callProxyMethod(remote.proxy, 'backup.deleteVmBackups', {
|
||||
filenames,
|
||||
remote: {
|
||||
url: remote.url,
|
||||
options: remote.options,
|
||||
},
|
||||
})
|
||||
} else {
|
||||
await Disposable.use(app.getBackupsRemoteAdapter(remote), adapter => adapter.deleteVmBackups(filenames))
|
||||
}
|
||||
await pEach(
|
||||
Object.entries(backupsByRemote),
|
||||
async ([remoteId, backups]) => {
|
||||
const filenames = backups.map(_ => _.metadataFilename)
|
||||
const remote = await app.getRemoteWithCredentials(remoteId)
|
||||
if (remote.proxy !== undefined) {
|
||||
await app.callProxyMethod(remote.proxy, 'backup.deleteVmBackups', {
|
||||
filenames,
|
||||
remote: {
|
||||
url: remote.url,
|
||||
options: remote.options,
|
||||
},
|
||||
})
|
||||
} else {
|
||||
await Disposable.use(app.getBackupsRemoteAdapter(remote), adapter => adapter.deleteVmBackups(filenames))
|
||||
}
|
||||
|
||||
this._listVmBackupsOnRemote(REMOVE_CACHE_ENTRY, remoteId)
|
||||
})
|
||||
this._listVmBackupsOnRemote(REMOVE_CACHE_ENTRY, remoteId)
|
||||
},
|
||||
{ stopOnError: false }
|
||||
)
|
||||
}
|
||||
|
||||
async importVmBackupNg(id, srId, settings) {
|
||||
|
||||
@@ -58,49 +58,53 @@ export default async function executeJobCall({ app, job, logger, runJobId, sched
|
||||
timezone: schedule !== undefined ? schedule.timezone : undefined,
|
||||
}
|
||||
|
||||
await asyncMapSettled(paramsFlatVector, params => {
|
||||
const runCallId = logger.notice(`Starting ${job.method} call. (${job.id})`, {
|
||||
event: 'jobCall.start',
|
||||
runJobId,
|
||||
method: job.method,
|
||||
params,
|
||||
})
|
||||
await pEach(
|
||||
paramsFlatVector,
|
||||
params => {
|
||||
const runCallId = logger.notice(`Starting ${job.method} call. (${job.id})`, {
|
||||
event: 'jobCall.start',
|
||||
runJobId,
|
||||
method: job.method,
|
||||
params,
|
||||
})
|
||||
|
||||
const call = (execStatus.calls[runCallId] = {
|
||||
method: job.method,
|
||||
params,
|
||||
start: Date.now(),
|
||||
})
|
||||
let promise = app.callApiMethod(connection, job.method, Object.assign({}, params))
|
||||
if (job.timeout) {
|
||||
promise = promise::timeout(job.timeout)
|
||||
}
|
||||
|
||||
return promise.then(
|
||||
value => {
|
||||
logger.notice(`Call ${job.method} (${runCallId}) is a success. (${job.id})`, {
|
||||
event: 'jobCall.end',
|
||||
runJobId,
|
||||
runCallId,
|
||||
returnedValue: value,
|
||||
})
|
||||
|
||||
call.returnedValue = value
|
||||
call.end = Date.now()
|
||||
},
|
||||
reason => {
|
||||
logger.notice(`Call ${job.method} (${runCallId}) has failed. (${job.id})`, {
|
||||
event: 'jobCall.end',
|
||||
runJobId,
|
||||
runCallId,
|
||||
error: serializeError(reason),
|
||||
})
|
||||
|
||||
call.error = reason
|
||||
call.end = Date.now()
|
||||
const call = (execStatus.calls[runCallId] = {
|
||||
method: job.method,
|
||||
params,
|
||||
start: Date.now(),
|
||||
})
|
||||
let promise = app.callApiMethod(connection, job.method, Object.assign({}, params))
|
||||
if (job.timeout) {
|
||||
promise = promise::timeout(job.timeout)
|
||||
}
|
||||
)
|
||||
})
|
||||
|
||||
return promise.then(
|
||||
value => {
|
||||
logger.notice(`Call ${job.method} (${runCallId}) is a success. (${job.id})`, {
|
||||
event: 'jobCall.end',
|
||||
runJobId,
|
||||
runCallId,
|
||||
returnedValue: value,
|
||||
})
|
||||
|
||||
call.returnedValue = value
|
||||
call.end = Date.now()
|
||||
},
|
||||
reason => {
|
||||
logger.notice(`Call ${job.method} (${runCallId}) has failed. (${job.id})`, {
|
||||
event: 'jobCall.end',
|
||||
runJobId,
|
||||
runCallId,
|
||||
error: serializeError(reason),
|
||||
})
|
||||
|
||||
call.error = reason
|
||||
call.end = Date.now()
|
||||
}
|
||||
)
|
||||
},
|
||||
{ stopOnError: false }
|
||||
)
|
||||
|
||||
execStatus.end = Date.now()
|
||||
|
||||
|
||||
@@ -96,23 +96,27 @@ export default class Jobs {
|
||||
)
|
||||
})
|
||||
// it sends a report for the interrupted backup jobs
|
||||
app.on('plugins:registered', () =>
|
||||
asyncMapSettled(this._jobs.get(), job => {
|
||||
// only the interrupted backup jobs have the runId property
|
||||
if (job.runId === undefined) {
|
||||
return
|
||||
}
|
||||
|
||||
app.emit(
|
||||
'job:terminated',
|
||||
// This cast can be removed after merging the PR: https://github.com/vatesfr/xen-orchestra/pull/3209
|
||||
String(job.runId),
|
||||
{
|
||||
type: job.type,
|
||||
app.on('plugins:registered', async () =>
|
||||
pEach(
|
||||
await this._jobs.get(),
|
||||
job => {
|
||||
// only the interrupted backup jobs have the runId property
|
||||
if (job.runId === undefined) {
|
||||
return
|
||||
}
|
||||
)
|
||||
return this.updateJob({ id: job.id, runId: null })
|
||||
})
|
||||
|
||||
app.emit(
|
||||
'job:terminated',
|
||||
// This cast can be removed after merging the PR: https://github.com/vatesfr/xen-orchestra/pull/3209
|
||||
String(job.runId),
|
||||
{
|
||||
type: job.type,
|
||||
}
|
||||
)
|
||||
return this.updateJob({ id: job.id, runId: null })
|
||||
},
|
||||
{ stopOnError: false }
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -69,25 +69,33 @@ export default class metadataBackup {
|
||||
const remotes = {}
|
||||
const xapis = {}
|
||||
await waitAll([
|
||||
asyncMapSettled(remoteIds, async id => {
|
||||
const remote = await app.getRemoteWithCredentials(id)
|
||||
if (remote.proxy !== proxyId) {
|
||||
throw new Error(`The remote ${remote.name} must be linked to the proxy ${proxyId}`)
|
||||
}
|
||||
pEach(
|
||||
remoteIds,
|
||||
async id => {
|
||||
const remote = await app.getRemoteWithCredentials(id)
|
||||
if (remote.proxy !== proxyId) {
|
||||
throw new Error(`The remote ${remote.name} must be linked to the proxy ${proxyId}`)
|
||||
}
|
||||
|
||||
remotes[id] = remote
|
||||
}),
|
||||
asyncMapSettled([...servers], async id => {
|
||||
const { allowUnauthorized, host, password, username } = await app.getXenServer(id)
|
||||
xapis[id] = {
|
||||
allowUnauthorized,
|
||||
credentials: {
|
||||
username,
|
||||
password,
|
||||
},
|
||||
url: host,
|
||||
}
|
||||
}),
|
||||
remotes[id] = remote
|
||||
},
|
||||
{ stopOnError: false }
|
||||
),
|
||||
pEach(
|
||||
servers,
|
||||
async id => {
|
||||
const { allowUnauthorized, host, password, username } = await app.getXenServer(id)
|
||||
xapis[id] = {
|
||||
allowUnauthorized,
|
||||
credentials: {
|
||||
username,
|
||||
password,
|
||||
},
|
||||
url: host,
|
||||
}
|
||||
},
|
||||
{ stopOnError: false }
|
||||
),
|
||||
])
|
||||
|
||||
const params = {
|
||||
@@ -157,14 +165,18 @@ export default class metadataBackup {
|
||||
})
|
||||
|
||||
const { id: jobId, settings } = job
|
||||
await asyncMapSettled(schedules, async (schedule, tmpId) => {
|
||||
const { id: scheduleId } = await app.createSchedule({
|
||||
...schedule,
|
||||
jobId,
|
||||
})
|
||||
settings[scheduleId] = settings[tmpId]
|
||||
delete settings[tmpId]
|
||||
})
|
||||
await pEach(
|
||||
schedules,
|
||||
async (schedule, tmpId) => {
|
||||
const { id: scheduleId } = await app.createSchedule({
|
||||
...schedule,
|
||||
jobId,
|
||||
})
|
||||
settings[scheduleId] = settings[tmpId]
|
||||
delete settings[tmpId]
|
||||
},
|
||||
{ stopOnError: false }
|
||||
)
|
||||
await app.updateJob({ id: jobId, settings })
|
||||
|
||||
return job
|
||||
@@ -180,11 +192,15 @@ export default class metadataBackup {
|
||||
|
||||
await Promise.all([
|
||||
app.removeJob(id),
|
||||
asyncMapSettled(schedules, schedule => {
|
||||
if (schedule.id === id) {
|
||||
return app.deleteSchedule(id)
|
||||
}
|
||||
}),
|
||||
pEach(
|
||||
schedules,
|
||||
schedule => {
|
||||
if (schedule.id === id) {
|
||||
return app.deleteSchedule(id)
|
||||
}
|
||||
},
|
||||
{ stopOnError: false }
|
||||
),
|
||||
])
|
||||
}
|
||||
|
||||
|
||||
@@ -120,27 +120,31 @@ export default class {
|
||||
|
||||
async getAllRemotesInfo() {
|
||||
const remotesInfo = this._remotesInfo
|
||||
await asyncMapSettled(this._remotes.get(), async remote => {
|
||||
if (!remote.enabled) {
|
||||
return
|
||||
}
|
||||
await pEach(
|
||||
this._remotes.get(),
|
||||
async remote => {
|
||||
if (!remote.enabled) {
|
||||
return
|
||||
}
|
||||
|
||||
const promise =
|
||||
remote.proxy !== undefined
|
||||
? this._app.callProxyMethod(remote.proxy, 'remote.getInfo', {
|
||||
remote,
|
||||
})
|
||||
: this.getRemoteHandler(remote.id).then(handler => handler.getInfo())
|
||||
const promise =
|
||||
remote.proxy !== undefined
|
||||
? this._app.callProxyMethod(remote.proxy, 'remote.getInfo', {
|
||||
remote,
|
||||
})
|
||||
: this.getRemoteHandler(remote.id).then(handler => handler.getInfo())
|
||||
|
||||
try {
|
||||
await timeout.call(
|
||||
promise.then(info => {
|
||||
remotesInfo[remote.id] = info
|
||||
}),
|
||||
5e3
|
||||
)
|
||||
} catch (_) {}
|
||||
})
|
||||
try {
|
||||
await timeout.call(
|
||||
promise.then(info => {
|
||||
remotesInfo[remote.id] = info
|
||||
}),
|
||||
5e3
|
||||
)
|
||||
} catch (_) {}
|
||||
},
|
||||
{ stopOnError: false }
|
||||
)
|
||||
return remotesInfo
|
||||
}
|
||||
|
||||
|
||||
@@ -426,6 +426,6 @@ export default class {
|
||||
}
|
||||
|
||||
const { subjects } = await this.getResourceSet(resourceSetId)
|
||||
await asyncMapSettled(subjects, subject => this._app.addAcl(subject, vmId, 'admin'))
|
||||
await pEach(subjects, subject => this._app.addAcl(subject, vmId, 'admin'), { stopOnError: false })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -53,10 +53,14 @@ export default class Scheduling {
|
||||
'schedules',
|
||||
() => db.get(),
|
||||
schedules =>
|
||||
asyncMapSettled(schedules, async schedule => {
|
||||
await db.update(normalize(schedule))
|
||||
this._start(schedule.id)
|
||||
}),
|
||||
pEach(
|
||||
schedules,
|
||||
async schedule => {
|
||||
await db.update(normalize(schedule))
|
||||
this._start(schedule.id)
|
||||
},
|
||||
{ stopOnError: false }
|
||||
),
|
||||
['jobs']
|
||||
)
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import assert from 'assert'
|
||||
import findKey from 'lodash/findKey.js'
|
||||
import pick from 'lodash/pick.js'
|
||||
import { asyncEach } from '@vates/async-each'
|
||||
import { pEach } from '@vates/async-each'
|
||||
import { BaseError } from 'make-error'
|
||||
import { createLogger } from '@xen-orchestra/log'
|
||||
import { fibonacci } from 'iterable-backoff'
|
||||
@@ -391,7 +391,7 @@ export default class {
|
||||
const { pool } = xapi
|
||||
|
||||
try {
|
||||
await asyncEach(Object.entries(pool.other_config), ([key, value]) => {
|
||||
await pEach(Object.entries(pool.other_config), ([key, value]) => {
|
||||
if (key.startsWith(poolMarkingPrefix)) {
|
||||
const { lastConnected } = JSON.parse(value)
|
||||
if (now - lastConnected > poolMarkingMaxAge) {
|
||||
|
||||
Reference in New Issue
Block a user