diff --git a/@vates/async-each/.npmignore b/@vates/async-each/.npmignore new file mode 120000 index 000000000..008d1b9b9 --- /dev/null +++ b/@vates/async-each/.npmignore @@ -0,0 +1 @@ +../../scripts/npmignore \ No newline at end of file diff --git a/@vates/async-each/README.md b/@vates/async-each/README.md new file mode 100644 index 000000000..7b56ab151 --- /dev/null +++ b/@vates/async-each/README.md @@ -0,0 +1,68 @@ + + +# @vates/async-each + +[![Package Version](https://badgen.net/npm/v/@vates/async-each)](https://npmjs.org/package/@vates/async-each) ![License](https://badgen.net/npm/license/@vates/async-each) [![PackagePhobia](https://badgen.net/bundlephobia/minzip/@vates/async-each)](https://bundlephobia.com/result?p=@vates/async-each) [![Node compatibility](https://badgen.net/npm/node/@vates/async-each)](https://npmjs.org/package/@vates/async-each) + +> Run async fn for each item in (async) iterable + +## Install + +Installation of the [npm package](https://npmjs.org/package/@vates/async-each): + +``` +> npm install --save @vates/async-each +``` + +## Usage + +### `asyncEach(iterable, iteratee, [opts])` + +Executes `iteratee` in order for each value yielded by `iterable`. + +Returns a promise wich rejects as soon as a call to `iteratee` throws or a promise returned by it rejects, and which resolves when all promises returned by `iteratee` have resolved. + +`iterable` must be an iterable or async iterable. + +`iteratee` is called with the same `this` value as `asyncEach`, and with the following arguments: + +- `value`: the value yielded by `iterable` +- `index`: the 0-based index for this value +- `iterable`: the iterable itself + +`opts` is an object that can contains the following options: + +- `concurrency`: a number which indicates the maximum number of parallel call to `iteratee`, defaults to `1` +- `signal`: an abort signal to stop the iteration +- `stopOnError`: wether to stop iteration of first error, or wait for all calls to finish and throw an `AggregateError`, defaults to `true` + +```js +import { asyncEach } from '@vates/async-each' + +const contents = [] +await asyncEach( + ['foo.txt', 'bar.txt', 'baz.txt'], + async function (filename, i) { + contents[i] = await readFile(filename) + }, + { + // reads two files at a time + concurrency: 2, + } +) +``` + +## Contributions + +Contributions are _very_ welcomed, either on the documentation or on +the code. + +You may: + +- report any [issue](https://github.com/vatesfr/xen-orchestra/issues) + you've encountered; +- fork and create a pull request. + +## License + +[ISC](https://spdx.org/licenses/ISC) © [Vates SAS](https://vates.fr) diff --git a/@vates/async-each/USAGE.md b/@vates/async-each/USAGE.md new file mode 100644 index 000000000..8dabffc45 --- /dev/null +++ b/@vates/async-each/USAGE.md @@ -0,0 +1,35 @@ +### `asyncEach(iterable, iteratee, [opts])` + +Executes `iteratee` in order for each value yielded by `iterable`. + +Returns a promise wich rejects as soon as a call to `iteratee` throws or a promise returned by it rejects, and which resolves when all promises returned by `iteratee` have resolved. + +`iterable` must be an iterable or async iterable. + +`iteratee` is called with the same `this` value as `asyncEach`, and with the following arguments: + +- `value`: the value yielded by `iterable` +- `index`: the 0-based index for this value +- `iterable`: the iterable itself + +`opts` is an object that can contains the following options: + +- `concurrency`: a number which indicates the maximum number of parallel call to `iteratee`, defaults to `1` +- `signal`: an abort signal to stop the iteration +- `stopOnError`: wether to stop iteration of first error, or wait for all calls to finish and throw an `AggregateError`, defaults to `true` + +```js +import { asyncEach } from '@vates/async-each' + +const contents = [] +await asyncEach( + ['foo.txt', 'bar.txt', 'baz.txt'], + async function (filename, i) { + contents[i] = await readFile(filename) + }, + { + // reads two files at a time + concurrency: 2, + } +) +``` diff --git a/@vates/async-each/index.js b/@vates/async-each/index.js new file mode 100644 index 000000000..c95af2d8e --- /dev/null +++ b/@vates/async-each/index.js @@ -0,0 +1,99 @@ +'use strict' + +const noop = Function.prototype + +class AggregateError extends Error { + constructor(errors, message) { + super(message) + this.errors = errors + } +} + +exports.asyncEach = function asyncEach(iterable, iteratee, { concurrency = 1, signal, stopOnError = true } = {}) { + return new Promise((resolve, reject) => { + const it = (iterable[Symbol.iterator] || iterable[Symbol.asyncIterator]).call(iterable) + const errors = [] + let running = 0 + let index = 0 + + let onAbort + if (signal !== undefined) { + onAbort = () => { + onRejectedWrapper(new Error('asyncEach aborted')) + } + signal.addEventListener('abort', onAbort) + } + + const clean = () => { + onFulfilled = onRejected = noop + if (onAbort !== undefined) { + signal.removeEventListener('abort', onAbort) + } + } + + resolve = (resolve => + function resolveAndClean(value) { + resolve(value) + clean() + })(resolve) + reject = (reject => + function rejectAndClean(reason) { + reject(reason) + clean() + })(reject) + + let onFulfilled = value => { + --running + next() + } + const onFulfilledWrapper = value => onFulfilled(value) + + let onRejected = stopOnError + ? reject + : error => { + --running + errors.push(error) + next() + } + const onRejectedWrapper = reason => onRejected(reason) + + let nextIsRunning = false + let next = async () => { + if (nextIsRunning) { + return + } + nextIsRunning = true + if (running < concurrency) { + const cursor = await it.next() + if (cursor.done) { + next = () => { + if (running === 0) { + if (errors.length === 0) { + resolve() + } else { + reject(new AggregateError(errors)) + } + } + } + } else { + ++running + try { + const result = iteratee.call(this, cursor.value, index++, iterable) + let then + if (result != null && typeof result === 'object' && typeof (then = result.then) === 'function') { + then.call(result, onFulfilledWrapper, onRejectedWrapper) + } else { + onFulfilled(result) + } + } catch (error) { + onRejected(error) + } + } + nextIsRunning = false + return next() + } + nextIsRunning = false + } + next() + }) +} diff --git a/@vates/async-each/index.spec.js b/@vates/async-each/index.spec.js new file mode 100644 index 000000000..26f315968 --- /dev/null +++ b/@vates/async-each/index.spec.js @@ -0,0 +1,99 @@ +'use strict' + +/* eslint-env jest */ + +const { asyncEach } = require('./') + +const randomDelay = (max = 10) => + new Promise(resolve => { + setTimeout(resolve, Math.floor(Math.random() * max + 1)) + }) + +const rejectionOf = p => + new Promise((resolve, reject) => { + p.then(reject, resolve) + }) + +describe('asyncEach', () => { + const thisArg = 'qux' + const values = ['foo', 'bar', 'baz'] + + Object.entries({ + 'sync iterable': () => values, + 'async iterable': async function* () { + for (const value of values) { + await randomDelay() + yield value + } + }, + }).forEach(([what, getIterable]) => + describe('with ' + what, () => { + let iterable + beforeEach(() => { + iterable = getIterable() + }) + + it('works', async () => { + const iteratee = jest.fn(async () => {}) + + await asyncEach.call(thisArg, iterable, iteratee) + + expect(iteratee.mock.instances).toEqual(Array.from(values, () => thisArg)) + expect(iteratee.mock.calls).toEqual(Array.from(values, (value, index) => [value, index, iterable])) + }) + ;[1, 2, 4].forEach(concurrency => { + it('respects a concurrency of ' + concurrency, async () => { + let running = 0 + + await asyncEach( + values, + async () => { + ++running + expect(running).toBeLessThanOrEqual(concurrency) + await randomDelay() + --running + }, + { concurrency } + ) + }) + }) + + it('stops on first error when stopOnError is true', async () => { + const error = new Error() + const iteratee = jest.fn((_, i) => { + if (i === 1) { + throw error + } + }) + + expect(await rejectionOf(asyncEach(iterable, iteratee, { stopOnError: true }))).toBe(error) + expect(iteratee).toHaveBeenCalledTimes(2) + }) + + it('rejects AggregateError when stopOnError is false', async () => { + const errors = [] + const iteratee = jest.fn(() => { + const error = new Error() + errors.push(error) + throw error + }) + + const error = await rejectionOf(asyncEach(iterable, iteratee, { stopOnError: false })) + expect(error.errors).toEqual(errors) + expect(iteratee.mock.calls).toEqual(Array.from(values, (value, index) => [value, index, iterable])) + }) + + it('can be interrupted with an AbortSignal', async () => { + const ac = new AbortController() + const iteratee = jest.fn((_, i) => { + if (i === 1) { + ac.abort() + } + }) + + await expect(asyncEach(iterable, iteratee, { signal: ac.signal })).rejects.toThrow('asyncEach aborted') + expect(iteratee).toHaveBeenCalledTimes(2) + }) + }) + ) +}) diff --git a/@vates/async-each/package.json b/@vates/async-each/package.json new file mode 100644 index 000000000..dde20c6cf --- /dev/null +++ b/@vates/async-each/package.json @@ -0,0 +1,34 @@ +{ + "private": false, + "name": "@vates/async-each", + "homepage": "https://github.com/vatesfr/xen-orchestra/tree/master/@vates/async-each", + "description": "Run async fn for each item in (async) iterable", + "keywords": [ + "array", + "async", + "collection", + "each", + "for", + "foreach", + "iterable", + "iterator" + ], + "bugs": "https://github.com/vatesfr/xen-orchestra/issues", + "repository": { + "directory": "@vates/async-each", + "type": "git", + "url": "https://github.com/vatesfr/xen-orchestra.git" + }, + "author": { + "name": "Vates SAS", + "url": "https://vates.fr" + }, + "license": "ISC", + "version": "0.0.0", + "engines": { + "node": ">=8.10" + }, + "scripts": { + "postversion": "npm publish --access public" + } +} diff --git a/CHANGELOG.unreleased.md b/CHANGELOG.unreleased.md index b276d379e..2bd6da915 100644 --- a/CHANGELOG.unreleased.md +++ b/CHANGELOG.unreleased.md @@ -30,6 +30,7 @@ > > In case of conflict, the highest (lowest in previous list) `$version` wins. +- @vates/async-each minor - @xen-orchestra/fs minor - vhd-lib minor - xo-server patch