feat(async-map): new implementations

These implementations are no longer compatible with plain objects but support iterables.

The previous implementation is still available as `@xen-orchestra/async-map/legacy`.
This commit is contained in:
Julien Fontanet 2021-03-01 16:34:50 +01:00
parent 08857a6198
commit 20377e9c56
34 changed files with 254 additions and 83 deletions

View File

@ -1,7 +1,6 @@
const { asyncMap } = require('@xen-orchestra/async-map')
const { createLogger } = require('@xen-orchestra/log/dist')
const asyncMap = (arrayLike, mapFn, thisArg) => Promise.all(Array.from(arrayLike, mapFn, thisArg))
const { warn } = createLogger('vates:disposable:debounceResource')
exports.createDebounceResource = () => {

View File

@ -4,7 +4,7 @@
[![Package Version](https://badgen.net/npm/v/@xen-orchestra/async-map)](https://npmjs.org/package/@xen-orchestra/async-map) ![License](https://badgen.net/npm/license/@xen-orchestra/async-map) [![PackagePhobia](https://badgen.net/bundlephobia/minzip/@xen-orchestra/async-map)](https://bundlephobia.com/result?p=@xen-orchestra/async-map) [![Node compatibility](https://badgen.net/npm/node/@xen-orchestra/async-map)](https://npmjs.org/package/@xen-orchestra/async-map)
> Similar to Promise.all + lodash.map but wait for all promises to be settled
> Promise.all + map for all iterables
## Install
@ -16,10 +16,61 @@ Installation of the [npm package](https://npmjs.org/package/@xen-orchestra/async
## Usage
```js
import asyncMap from '@xen-orchestra/async-map'
### `asyncMap(iterable, iteratee, thisArg = iterable)`
const array = await asyncMap(collection, iteratee)
Similar to `Promise.all + Array#map` for all iterables: calls `iteratee` for each item in `iterable`, and returns a promise of an array containing the awaited result of each calls to `iteratee`.
It rejects as soon as te first call to `iteratee` rejects.
```js
import { asyncMap } from '@xen-orchestra/async-map'
const array = await asyncMap(iterable, iteratee, thisArg)
```
It can be used with any iterables (`Array`, `Map`, etc.):
```js
const map = new Map()
map.set('foo', 42)
map.set('bar', 3.14)
const array = await asyncMap(map, async function ([key, value]) {
// TODO: do async computation
//
// the map can be accessed via `this`
})
```
#### Use with plain objects
Plain objects are not iterable, but you can use `Object.keys`, `Object.values` or `Object.entries` to help:
```js
const object = {
foo: 42,
bar: 3.14,
}
const array = await asyncMap(
Object.entries(object),
async function ([key, value]) {
// TODO: do async computation
//
// the object can be accessed via `this` because it's been passed as third arg
},
object
)
```
### `asyncMapSettled(iterable, iteratee, thisArg = iterable)`
Similar to `asyncMap` but waits for all promises to settle before rejecting.
```js
import { asyncMapSettled } from '@xen-orchestra/async-map'
const array = await asyncMapSettled(iterable, iteratee, thisArg)
```
## Contributions

View File

@ -1,5 +1,56 @@
```js
import asyncMap from '@xen-orchestra/async-map'
### `asyncMap(iterable, iteratee, thisArg = iterable)`
const array = await asyncMap(collection, iteratee)
Similar to `Promise.all + Array#map` for all iterables: calls `iteratee` for each item in `iterable`, and returns a promise of an array containing the awaited result of each calls to `iteratee`.
It rejects as soon as te first call to `iteratee` rejects.
```js
import { asyncMap } from '@xen-orchestra/async-map'
const array = await asyncMap(iterable, iteratee, thisArg)
```
It can be used with any iterables (`Array`, `Map`, etc.):
```js
const map = new Map()
map.set('foo', 42)
map.set('bar', 3.14)
const array = await asyncMap(map, async function ([key, value]) {
// TODO: do async computation
//
// the map can be accessed via `this`
})
```
#### Use with plain objects
Plain objects are not iterable, but you can use `Object.keys`, `Object.values` or `Object.entries` to help:
```js
const object = {
foo: 42,
bar: 3.14,
}
const array = await asyncMap(
Object.entries(object),
async function ([key, value]) {
// TODO: do async computation
//
// the object can be accessed via `this` because it's been passed as third arg
},
object
)
```
### `asyncMapSettled(iterable, iteratee, thisArg = iterable)`
Similar to `asyncMap` but waits for all promises to settle before rejecting.
```js
import { asyncMapSettled } from '@xen-orchestra/async-map'
const array = await asyncMapSettled(iterable, iteratee, thisArg)
```

View File

@ -1,41 +1,65 @@
// type MaybePromise<T> = Promise<T> | T
//
// declare export function asyncMap<T1, T2>(
// collection: MaybePromise<T1[]>,
// (T1, number) => MaybePromise<T2>
// ): Promise<T2[]>
// declare export function asyncMap<K, V1, V2>(
// collection: MaybePromise<{ [K]: V1 }>,
// (V1, K) => MaybePromise<V2>
// ): Promise<V2[]>
const wrapCall = require('promise-toolbox/wrapCall')
const map = require('lodash/map')
/**
* Similar to Promise.all + Array#map but supports all iterables and does not trigger ESLint array-callback-return
*
* WARNING: Does not handle plain objects
*
* @template Item,This
* @param {Iterable<Item>} arrayLike
* @param {(this: This, item: Item) => (Item | PromiseLike<Item>)} mapFn
* @param {This} [thisArg]
* @returns {Promise<Item[]>}
*/
exports.asyncMap = function asyncMap(iterable, mapFn, thisArg = iterable) {
return Promise.all(Array.from(iterable, mapFn, thisArg))
}
// Similar to map() + Promise.all() but wait for all promises to
// settle before rejecting (with the first error)
module.exports = function asyncMap(collection, iteratee) {
let then
if (collection != null && typeof (then = collection.then) === 'function') {
return then.call(collection, collection => asyncMap(collection, iteratee))
}
let errorContainer
const onError = error => {
if (errorContainer === undefined) {
errorContainer = { error }
/**
* Like `asyncMap` but wait for all promises to settle before rejecting
*
* @template Item,This
* @param {Iterable<Item>} iterable
* @param {(this: This, item: Item) => (Item | PromiseLike<Item>)} mapFn
* @param {This} [thisArg]
* @returns {Promise<Item[]>}
*/
exports.asyncMapSettled = function asyncMapSettled(iterable, mapFn, thisArg = iterable) {
return new Promise((resolve, reject) => {
const onError = e => {
if (result !== undefined) {
error = e
result = undefined
}
if (--n === 0) {
this.reject(error)
}
}
}
return Promise.all(
map(collection, (item, key, collection) =>
new Promise(resolve => {
resolve(iteratee(item, key, collection))
}).catch(onError)
)
).then(values => {
if (errorContainer !== undefined) {
throw errorContainer.error
const onValue = (i, value) => {
const hasError = result !== undefined
if (!hasError) {
result[i] = value
}
if (--n === 0) {
if (hasError) {
reject(error)
} else {
resolve(result)
}
}
}
return values
let n = 0
for (const item of iterable) {
const i = n++
wrapCall(mapFn, item, thisArg).then(value => onValue(i, value), onError)
}
if (n === 0) {
return resolve([])
}
let error
let result = new Array(n)
})
}

View File

@ -0,0 +1,45 @@
// type MaybePromise<T> = Promise<T> | T
//
// declare export function asyncMap<T1, T2>(
// collection: MaybePromise<T1[]>,
// (T1, number) => MaybePromise<T2>
// ): Promise<T2[]>
// declare export function asyncMap<K, V1, V2>(
// collection: MaybePromise<{ [K]: V1 }>,
// (V1, K) => MaybePromise<V2>
// ): Promise<V2[]>
const map = require('lodash/map')
/**
* Similar to map() + Promise.all() but wait for all promises to settle before
* rejecting (with the first error)
*
* @deprecated Don't support iterables, please use new implementations
*/
module.exports = function asyncMapLegacy(collection, iteratee) {
let then
if (collection != null && typeof (then = collection.then) === 'function') {
return then.call(collection, collection => asyncMapLegacy(collection, iteratee))
}
let errorContainer
const onError = error => {
if (errorContainer === undefined) {
errorContainer = { error }
}
}
return Promise.all(
map(collection, (item, key, collection) =>
new Promise(resolve => {
resolve(iteratee(item, key, collection))
}).catch(onError)
)
).then(values => {
if (errorContainer !== undefined) {
throw errorContainer.error
}
return values
})
}

View File

@ -3,8 +3,15 @@
"name": "@xen-orchestra/async-map",
"version": "0.0.0",
"license": "ISC",
"description": "Similar to Promise.all + lodash.map but wait for all promises to be settled",
"keywords": [],
"description": "Promise.all + map for all iterables",
"keywords": [
"array",
"async",
"iterable",
"map",
"settled",
"typescript"
],
"homepage": "https://github.com/vatesfr/xen-orchestra/tree/master/@xen-orchestra/async-map",
"bugs": "https://github.com/vatesfr/xen-orchestra/issues",
"repository": {

View File

@ -1,11 +1,10 @@
const asyncMapSettled = require('@xen-orchestra/async-map')
const { asyncMap, asyncMapSettled } = require('@xen-orchestra/async-map')
const Disposable = require('promise-toolbox/Disposable')
const ignoreErrors = require('promise-toolbox/ignoreErrors')
const limitConcurrency = require('limit-concurrency-decorator').default
const using = require('promise-toolbox/using')
const { compileTemplate } = require('@xen-orchestra/template')
const { asyncMap } = require('./asyncMap')
const { extractIdsFromSimplePattern } = require('./_extractIdsFromSimplePattern')
const { PoolMetadataBackup } = require('./_PoolMetadataBackup')
const { Task } = require('./Task')

View File

@ -1,4 +1,4 @@
const { asyncMap } = require('./asyncMap')
const { asyncMap } = require('@xen-orchestra/async-map')
exports.DurablePartition = class DurablePartition {
// private resource API is used exceptionally to be able to separate resource creation and release

View File

@ -1,4 +1,4 @@
const asyncMapSettled = require('@xen-orchestra/async-map')
const { asyncMap, asyncMapSettled } = require('@xen-orchestra/async-map')
const Disposable = require('promise-toolbox/Disposable')
const fromCallback = require('promise-toolbox/fromCallback')
const fromEvent = require('promise-toolbox/fromEvent')
@ -13,7 +13,6 @@ const { execFile } = require('child_process')
const { readdir, stat } = require('fs-extra')
const { ZipFile } = require('yazl')
const { asyncMap } = require('./asyncMap')
const { BACKUP_DIR } = require('./_getVmBackupDir')
const { getTmpDir } = require('./_getTmpDir')
const { listPartitions, LVM_PARTITION_TYPE } = require('./_listPartitions')

View File

@ -1,8 +1,7 @@
const asyncMapSettled = require('@xen-orchestra/async-map')
const { asyncMap, asyncMapSettled } = require('@xen-orchestra/async-map')
const ignoreErrors = require('promise-toolbox/ignoreErrors')
const { formatDateTime } = require('@xen-orchestra/xapi')
const { asyncMap } = require('./asyncMap')
const { formatFilenameDate } = require('./_filenameDate')
const { getOldEntries } = require('./_getOldEntries')
const { importDeltaVm, TAG_COPY_SRC } = require('./_deltaVm')

View File

@ -2,11 +2,11 @@ const assert = require('assert')
const map = require('lodash/map')
const mapValues = require('lodash/mapValues')
const ignoreErrors = require('promise-toolbox/ignoreErrors')
const { asyncMap } = require('@xen-orchestra/async-map')
const { chainVhd, checkVhdChain, default: Vhd } = require('vhd-lib')
const { createLogger } = require('@xen-orchestra/log')
const { dirname } = require('path')
const { asyncMap } = require('./asyncMap')
const { checkVhd } = require('./_checkVhd')
const { formatFilenameDate } = require('./_filenameDate')
const { getOldEntries } = require('./_getOldEntries')

View File

@ -1,4 +1,5 @@
const { asyncMap } = require('./asyncMap')
const { asyncMap } = require('@xen-orchestra/async-map')
const { DIR_XO_POOL_METADATA_BACKUPS } = require('./RemoteAdapter')
const { forkStreamUnpipe } = require('./_forkStreamUnpipe')
const { formatFilenameDate } = require('./_filenameDate')

View File

@ -2,10 +2,10 @@ const findLast = require('lodash/findLast')
const ignoreErrors = require('promise-toolbox/ignoreErrors')
const keyBy = require('lodash/keyBy')
const mapValues = require('lodash/mapValues')
const { asyncMap } = require('@xen-orchestra/async-map')
const { createLogger } = require('@xen-orchestra/log')
const { formatDateTime } = require('@xen-orchestra/xapi')
const { asyncMap } = require('./asyncMap')
const { ContinuousReplicationWriter } = require('./_ContinuousReplicationWriter')
const { DeltaBackupWriter } = require('./_DeltaBackupWriter')
const { DisasterRecoveryWriter } = require('./_DisasterRecoveryWriter')

View File

@ -1,4 +1,5 @@
const { asyncMap } = require('./asyncMap')
const { asyncMap } = require('@xen-orchestra/async-map')
const { DIR_XO_CONFIG_BACKUPS } = require('./RemoteAdapter')
const { formatFilenameDate } = require('./_filenameDate')
const { Task } = require('./Task')

View File

@ -4,10 +4,10 @@ const find = require('lodash/find')
const groupBy = require('lodash/groupBy')
const ignoreErrors = require('promise-toolbox/ignoreErrors')
const omit = require('lodash/omit')
const { asyncMap } = require('@xen-orchestra/async-map')
const { CancelToken } = require('promise-toolbox')
const { createVhdStreamWithLength } = require('vhd-lib')
const { asyncMap } = require('./asyncMap')
const { cancelableMap } = require('./_cancelableMap')
const TAG_BASE_DELTA = 'xo:base_delta'

View File

@ -1,6 +0,0 @@
// Similar to Promise.all + Array#map but supports all iterables and does not trigger ESLint array-callback-return
//
// WARNING: Does not handle plain objects
exports.asyncMap = function asyncMap(arrayLike, mapFn, thisArg) {
return Promise.all(Array.from(arrayLike, mapFn, thisArg))
}

View File

@ -3,7 +3,7 @@
// $FlowFixMe
import getStream from 'get-stream'
import asyncMapSettled from '@xen-orchestra/async-map'
import asyncMapSettled from '@xen-orchestra/async-map/legacy'
import limit from 'limit-concurrency-decorator'
import path, { basename } from 'path'
import synchronized from 'decorator-synchronized'

View File

@ -2,7 +2,7 @@ import defer from 'golike-defer'
import Disposable from 'promise-toolbox/Disposable'
import fromCallback from 'promise-toolbox/fromCallback'
import using from 'promise-toolbox/using'
import { asyncMap } from '@xen-orchestra/backups/asyncMap'
import { asyncMap } from '@xen-orchestra/async-map'
import { Backup } from '@xen-orchestra/backups/Backup'
import { compose } from '@vates/compose'
import { createLogger } from '@xen-orchestra/log/dist'

View File

@ -1,10 +1,10 @@
const asyncMap = require('@xen-orchestra/async-map')
const cancelable = require('promise-toolbox/cancelable')
const defer = require('golike-defer').default
const groupBy = require('lodash/groupBy')
const pickBy = require('lodash/pickBy')
const ignoreErrors = require('promise-toolbox/ignoreErrors')
const pRetry = require('promise-toolbox/retry')
const { asyncMap } = require('@xen-orchestra/async-map')
const { createLogger } = require('@xen-orchestra/log')
const { NULL_REF } = require('xen-api')
@ -286,7 +286,7 @@ module.exports = class Vm {
])
// must be done before destroying the VM
const disks = (
await asyncMap(this.getRecords('VBD', vm.VBDs), async vbd => {
await asyncMap(await this.getRecords('VBD', vm.VBDs), async vbd => {
let vdiRef
if (vbd.type === 'Disk' && isValidRef((vdiRef = vbd.VDI))) {
return vdiRef
@ -304,7 +304,7 @@ module.exports = class Vm {
pRetry(
async () => {
// list VMs connected to this VDI
const vmRefs = await asyncMap(this.getField('VDI', vdiRef, 'VBDs'), vbdRef =>
const vmRefs = await asyncMap(await this.getField('VDI', vdiRef, 'VBDs'), vbdRef =>
this.getField('VBD', vbdRef, 'VM')
)
if (vmRefs.every(_ => _ === vmRef)) {

View File

@ -28,4 +28,5 @@
>
> In case of conflict, the highest (lowest in previous list) `$version` wins.
- @xen-orchestra/async-map minor
- xo-server minor

View File

@ -1,4 +1,4 @@
import asyncMapSettled from '@xen-orchestra/async-map'
import asyncMapSettled from '@xen-orchestra/async-map/legacy'
import createLogger from '@xen-orchestra/log'
import Handlebars from 'handlebars'
import humanFormat from 'human-format'

View File

@ -1,4 +1,4 @@
import asyncMapSettled from '@xen-orchestra/async-map'
import asyncMapSettled from '@xen-orchestra/async-map/legacy'
import { some } from 'lodash'
import ensureArray from '../_ensureArray'

View File

@ -1,5 +1,5 @@
import * as multiparty from 'multiparty'
import asyncMapSettled from '@xen-orchestra/async-map'
import asyncMapSettled from '@xen-orchestra/async-map/legacy'
import defer from 'golike-defer'
import getStream from 'get-stream'
import { createLogger } from '@xen-orchestra/log'

View File

@ -1,5 +1,5 @@
import assert from 'assert'
import asyncMapSettled from '@xen-orchestra/async-map'
import asyncMapSettled from '@xen-orchestra/async-map/legacy'
import createLogger from '@xen-orchestra/log'
import defer from 'golike-defer'
import execa from 'execa'

View File

@ -1,4 +1,4 @@
import asyncMapSettled from '@xen-orchestra/async-map'
import asyncMapSettled from '@xen-orchestra/async-map/legacy'
import { createClient as createRedisClient } from 'redis'
import { difference, filter, forEach, isEmpty, keys as getKeys, map } from 'lodash'
import { ignoreErrors, promisifyAll } from 'promise-toolbox'

View File

@ -1,6 +1,6 @@
/* eslint eslint-comments/disable-enable-pair: [error, {allowWholeFile: true}] */
/* eslint-disable camelcase */
import asyncMapSettled from '@xen-orchestra/async-map'
import asyncMapSettled from '@xen-orchestra/async-map/legacy'
import concurrency from 'limit-concurrency-decorator'
import createLogger from '@xen-orchestra/log'
import deferrable from 'golike-defer'

View File

@ -2,7 +2,7 @@
// $FlowFixMe
import type RemoteHandler from '@xen-orchestra/fs'
import asyncMapSettled from '@xen-orchestra/async-map'
import asyncMapSettled from '@xen-orchestra/async-map/legacy'
import createLogger from '@xen-orchestra/log'
import defer from 'golike-defer'
import limitConcurrency from 'limit-concurrency-decorator'

View File

@ -1,4 +1,4 @@
import asyncMapSettled from '@xen-orchestra/async-map'
import asyncMapSettled from '@xen-orchestra/async-map/legacy'
import createLogger from '@xen-orchestra/log'
import deferrable from 'golike-defer'
import execa from 'execa'

View File

@ -1,4 +1,4 @@
import asyncMapSettled from '@xen-orchestra/async-map'
import asyncMapSettled from '@xen-orchestra/async-map/legacy'
import { createPredicate } from 'value-matcher'
import { timeout } from 'promise-toolbox'
import { filter, isEmpty, map, mapValues } from 'lodash'

View File

@ -2,7 +2,7 @@
import type { Pattern } from 'value-matcher'
import asyncMapSettled from '@xen-orchestra/async-map'
import asyncMapSettled from '@xen-orchestra/async-map/legacy'
import createLogger from '@xen-orchestra/log'
import emitAsync from '@xen-orchestra/emit-async'

View File

@ -1,5 +1,5 @@
// @flow
import asyncMapSettled from '@xen-orchestra/async-map'
import asyncMapSettled from '@xen-orchestra/async-map/legacy'
import createLogger from '@xen-orchestra/log'
import { fromEvent, ignoreErrors, timeout, using } from 'promise-toolbox'
import { parseDuration } from '@vates/parse-duration'

View File

@ -1,4 +1,4 @@
import asyncMapSettled from '@xen-orchestra/async-map'
import asyncMapSettled from '@xen-orchestra/async-map/legacy'
import synchronized from 'decorator-synchronized'
import { format, parse } from 'xo-remote-parser'
import { getHandler } from '@xen-orchestra/fs'

View File

@ -1,4 +1,4 @@
import asyncMapSettled from '@xen-orchestra/async-map'
import asyncMapSettled from '@xen-orchestra/async-map/legacy'
import deferrable from 'golike-defer'
import synchronized from 'decorator-synchronized'
import { difference, every, forEach, isObject, keyBy, map as mapToArray, remove, some } from 'lodash'

View File

@ -1,6 +1,6 @@
// @flow
import asyncMapSettled from '@xen-orchestra/async-map'
import asyncMapSettled from '@xen-orchestra/async-map/legacy'
import { createSchedule } from '@xen-orchestra/cron'
import { ignoreErrors } from 'promise-toolbox'
import { keyBy } from 'lodash'