feat(async-each): run async fn for each item in (async) iterable

This commit is contained in:
Julien Fontanet 2021-11-05 12:26:52 +01:00
parent a4bb453401
commit ffe430758e
7 changed files with 337 additions and 0 deletions

View File

@ -0,0 +1 @@
../../scripts/npmignore

View File

@ -0,0 +1,68 @@
<!-- DO NOT EDIT MANUALLY, THIS FILE HAS BEEN GENERATED -->
# @vates/async-each
[![Package Version](https://badgen.net/npm/v/@vates/async-each)](https://npmjs.org/package/@vates/async-each) ![License](https://badgen.net/npm/license/@vates/async-each) [![PackagePhobia](https://badgen.net/bundlephobia/minzip/@vates/async-each)](https://bundlephobia.com/result?p=@vates/async-each) [![Node compatibility](https://badgen.net/npm/node/@vates/async-each)](https://npmjs.org/package/@vates/async-each)
> Run async fn for each item in (async) iterable
## Install
Installation of the [npm package](https://npmjs.org/package/@vates/async-each):
```
> npm install --save @vates/async-each
```
## Usage
### `asyncEach(iterable, iteratee, [opts])`
Executes `iteratee` in order for each value yielded by `iterable`.
Returns a promise wich rejects as soon as a call to `iteratee` throws or a promise returned by it rejects, and which resolves when all promises returned by `iteratee` have resolved.
`iterable` must be an iterable or async iterable.
`iteratee` is called with the same `this` value as `asyncEach`, and with the following arguments:
- `value`: the value yielded by `iterable`
- `index`: the 0-based index for this value
- `iterable`: the iterable itself
`opts` is an object that can contains the following options:
- `concurrency`: a number which indicates the maximum number of parallel call to `iteratee`, defaults to `1`
- `signal`: an abort signal to stop the iteration
- `stopOnError`: wether to stop iteration of first error, or wait for all calls to finish and throw an `AggregateError`, defaults to `true`
```js
import { asyncEach } from '@vates/async-each'
const contents = []
await asyncEach(
['foo.txt', 'bar.txt', 'baz.txt'],
async function (filename, i) {
contents[i] = await readFile(filename)
},
{
// reads two files at a time
concurrency: 2,
}
)
```
## Contributions
Contributions are _very_ welcomed, either on the documentation or on
the code.
You may:
- report any [issue](https://github.com/vatesfr/xen-orchestra/issues)
you've encountered;
- fork and create a pull request.
## License
[ISC](https://spdx.org/licenses/ISC) © [Vates SAS](https://vates.fr)

View File

@ -0,0 +1,35 @@
### `asyncEach(iterable, iteratee, [opts])`
Executes `iteratee` in order for each value yielded by `iterable`.
Returns a promise wich rejects as soon as a call to `iteratee` throws or a promise returned by it rejects, and which resolves when all promises returned by `iteratee` have resolved.
`iterable` must be an iterable or async iterable.
`iteratee` is called with the same `this` value as `asyncEach`, and with the following arguments:
- `value`: the value yielded by `iterable`
- `index`: the 0-based index for this value
- `iterable`: the iterable itself
`opts` is an object that can contains the following options:
- `concurrency`: a number which indicates the maximum number of parallel call to `iteratee`, defaults to `1`
- `signal`: an abort signal to stop the iteration
- `stopOnError`: wether to stop iteration of first error, or wait for all calls to finish and throw an `AggregateError`, defaults to `true`
```js
import { asyncEach } from '@vates/async-each'
const contents = []
await asyncEach(
['foo.txt', 'bar.txt', 'baz.txt'],
async function (filename, i) {
contents[i] = await readFile(filename)
},
{
// reads two files at a time
concurrency: 2,
}
)
```

View File

@ -0,0 +1,99 @@
'use strict'
const noop = Function.prototype
class AggregateError extends Error {
constructor(errors, message) {
super(message)
this.errors = errors
}
}
exports.asyncEach = function asyncEach(iterable, iteratee, { concurrency = 1, signal, stopOnError = true } = {}) {
return new Promise((resolve, reject) => {
const it = (iterable[Symbol.iterator] || iterable[Symbol.asyncIterator]).call(iterable)
const errors = []
let running = 0
let index = 0
let onAbort
if (signal !== undefined) {
onAbort = () => {
onRejectedWrapper(new Error('asyncEach aborted'))
}
signal.addEventListener('abort', onAbort)
}
const clean = () => {
onFulfilled = onRejected = noop
if (onAbort !== undefined) {
signal.removeEventListener('abort', onAbort)
}
}
resolve = (resolve =>
function resolveAndClean(value) {
resolve(value)
clean()
})(resolve)
reject = (reject =>
function rejectAndClean(reason) {
reject(reason)
clean()
})(reject)
let onFulfilled = value => {
--running
next()
}
const onFulfilledWrapper = value => onFulfilled(value)
let onRejected = stopOnError
? reject
: error => {
--running
errors.push(error)
next()
}
const onRejectedWrapper = reason => onRejected(reason)
let nextIsRunning = false
let next = async () => {
if (nextIsRunning) {
return
}
nextIsRunning = true
if (running < concurrency) {
const cursor = await it.next()
if (cursor.done) {
next = () => {
if (running === 0) {
if (errors.length === 0) {
resolve()
} else {
reject(new AggregateError(errors))
}
}
}
} else {
++running
try {
const result = iteratee.call(this, cursor.value, index++, iterable)
let then
if (result != null && typeof result === 'object' && typeof (then = result.then) === 'function') {
then.call(result, onFulfilledWrapper, onRejectedWrapper)
} else {
onFulfilled(result)
}
} catch (error) {
onRejected(error)
}
}
nextIsRunning = false
return next()
}
nextIsRunning = false
}
next()
})
}

View File

@ -0,0 +1,99 @@
'use strict'
/* eslint-env jest */
const { asyncEach } = require('./')
const randomDelay = (max = 10) =>
new Promise(resolve => {
setTimeout(resolve, Math.floor(Math.random() * max + 1))
})
const rejectionOf = p =>
new Promise((resolve, reject) => {
p.then(reject, resolve)
})
describe('asyncEach', () => {
const thisArg = 'qux'
const values = ['foo', 'bar', 'baz']
Object.entries({
'sync iterable': () => values,
'async iterable': async function* () {
for (const value of values) {
await randomDelay()
yield value
}
},
}).forEach(([what, getIterable]) =>
describe('with ' + what, () => {
let iterable
beforeEach(() => {
iterable = getIterable()
})
it('works', async () => {
const iteratee = jest.fn(async () => {})
await asyncEach.call(thisArg, iterable, iteratee)
expect(iteratee.mock.instances).toEqual(Array.from(values, () => thisArg))
expect(iteratee.mock.calls).toEqual(Array.from(values, (value, index) => [value, index, iterable]))
})
;[1, 2, 4].forEach(concurrency => {
it('respects a concurrency of ' + concurrency, async () => {
let running = 0
await asyncEach(
values,
async () => {
++running
expect(running).toBeLessThanOrEqual(concurrency)
await randomDelay()
--running
},
{ concurrency }
)
})
})
it('stops on first error when stopOnError is true', async () => {
const error = new Error()
const iteratee = jest.fn((_, i) => {
if (i === 1) {
throw error
}
})
expect(await rejectionOf(asyncEach(iterable, iteratee, { stopOnError: true }))).toBe(error)
expect(iteratee).toHaveBeenCalledTimes(2)
})
it('rejects AggregateError when stopOnError is false', async () => {
const errors = []
const iteratee = jest.fn(() => {
const error = new Error()
errors.push(error)
throw error
})
const error = await rejectionOf(asyncEach(iterable, iteratee, { stopOnError: false }))
expect(error.errors).toEqual(errors)
expect(iteratee.mock.calls).toEqual(Array.from(values, (value, index) => [value, index, iterable]))
})
it('can be interrupted with an AbortSignal', async () => {
const ac = new AbortController()
const iteratee = jest.fn((_, i) => {
if (i === 1) {
ac.abort()
}
})
await expect(asyncEach(iterable, iteratee, { signal: ac.signal })).rejects.toThrow('asyncEach aborted')
expect(iteratee).toHaveBeenCalledTimes(2)
})
})
)
})

View File

@ -0,0 +1,34 @@
{
"private": false,
"name": "@vates/async-each",
"homepage": "https://github.com/vatesfr/xen-orchestra/tree/master/@vates/async-each",
"description": "Run async fn for each item in (async) iterable",
"keywords": [
"array",
"async",
"collection",
"each",
"for",
"foreach",
"iterable",
"iterator"
],
"bugs": "https://github.com/vatesfr/xen-orchestra/issues",
"repository": {
"directory": "@vates/async-each",
"type": "git",
"url": "https://github.com/vatesfr/xen-orchestra.git"
},
"author": {
"name": "Vates SAS",
"url": "https://vates.fr"
},
"license": "ISC",
"version": "0.0.0",
"engines": {
"node": ">=8.10"
},
"scripts": {
"postversion": "npm publish --access public"
}
}

View File

@ -30,6 +30,7 @@
> >
> In case of conflict, the highest (lowest in previous list) `$version` wins. > In case of conflict, the highest (lowest in previous list) `$version` wins.
- @vates/async-each minor
- @xen-orchestra/fs minor - @xen-orchestra/fs minor
- vhd-lib minor - vhd-lib minor
- xo-server patch - xo-server patch