From 20d5047b55ef8ef7fe87fb6715f1f0963557bd17 Mon Sep 17 00:00:00 2001 From: Julien Fontanet Date: Mon, 26 Mar 2018 16:26:12 +0200 Subject: [PATCH] chore(xo-server/ag2s): use async-iterator-to-stream instead --- packages/xo-server/package.json | 1 + packages/xo-server/src/ag2s.js | 68 -------- packages/xo-server/src/vhd-merge.js | 252 ++++++++++++++-------------- yarn.lock | 9 +- 4 files changed, 134 insertions(+), 196 deletions(-) delete mode 100644 packages/xo-server/src/ag2s.js diff --git a/packages/xo-server/package.json b/packages/xo-server/package.json index 55a18c817..f43596038 100644 --- a/packages/xo-server/package.json +++ b/packages/xo-server/package.json @@ -38,6 +38,7 @@ "ajv": "^6.1.1", "app-conf": "^0.5.0", "archiver": "^2.1.0", + "async-iterator-to-stream": "^1.0.1", "base64url": "^2.0.0", "bind-property-descriptor": "^1.0.0", "blocked": "^1.2.1", diff --git a/packages/xo-server/src/ag2s.js b/packages/xo-server/src/ag2s.js deleted file mode 100644 index 212bb6e05..000000000 --- a/packages/xo-server/src/ag2s.js +++ /dev/null @@ -1,68 +0,0 @@ -import { Readable } from 'stream' - -// return the next value of the iterator but if it is a promise, resolve it and -// reinject it -// -// this enables the use of a simple generator instead of an async generator -// (which are less widely supported) -const next = async (iterator, arg) => { - let cursor = iterator.next(arg) - if (typeof cursor.then === 'function') { - return cursor - } - let value - while ( - !cursor.done && - (value = cursor.value) != null && - typeof value.then === 'function' - ) { - let success = false - try { - value = await value - success = true - } catch (error) { - cursor = iterator.throw(error) - } - if (success) { - cursor = iterator.next(value) - } - } - return cursor -} - -// Create a readable stream from a generator -// -// generator can be async or can yield promises to wait for them -export const createReadable = (generator, options) => { - const readable = new Readable(options) - readable._read = size => { - const iterator = generator(size) - readable._destroy = (error, cb) => { - iterator.throw(error) - cb(error) - } - let running = false - const read = (readable._read = async size => { - if (running) { - return - } - running = true - try { - let cursor - do { - cursor = await next(iterator, size) - if (cursor.done) { - return readable.push(null) - } - } while (readable.push(cursor.value)) - } catch (error) { - readable.emit('error', error) - } finally { - running = false - } - }) - return read(size) - } - - return readable -} diff --git a/packages/xo-server/src/vhd-merge.js b/packages/xo-server/src/vhd-merge.js index 2de83cbf1..8eb532002 100644 --- a/packages/xo-server/src/vhd-merge.js +++ b/packages/xo-server/src/vhd-merge.js @@ -1,6 +1,7 @@ // TODO: remove once completely merged in vhd.js import assert from 'assert' +import asyncIteratorToStream from 'async-iterator-to-stream' import concurrency from 'limit-concurrency-decorator' import fu from '@nraynaud/struct-fu' import isEqual from 'lodash/isEqual' @@ -9,7 +10,6 @@ import { fromEvent } from 'promise-toolbox' import type RemoteHandler from './remote-handlers/abstract' import constantStream from './constant-stream' -import { createReadable } from './ag2s' import { noop, resolveRelativeFromFile, streamToBuffer } from './utils' const VHD_UTIL_DEBUG = 0 @@ -769,138 +769,136 @@ export async function chainVhd ( return false } -export const createReadStream = (handler, path) => - createReadable(function * () { - const fds = [] +export const createReadStream = asyncIteratorToStream(function * (handler, path) { + const fds = [] - try { - const vhds = [] - while (true) { - const fd = yield handler.openFile(path, 'r') - fds.push(fd) - const vhd = new Vhd(handler, fd) - vhds.push(vhd) - yield vhd.readHeaderAndFooter() - yield vhd.readBlockTable() + try { + const vhds = [] + while (true) { + const fd = yield handler.openFile(path, 'r') + fds.push(fd) + const vhd = new Vhd(handler, fd) + vhds.push(vhd) + yield vhd.readHeaderAndFooter() + yield vhd.readBlockTable() - if (vhd.footer.diskType === HARD_DISK_TYPE_DYNAMIC) { + if (vhd.footer.diskType === HARD_DISK_TYPE_DYNAMIC) { + break + } + + path = resolveRelativeFromFile(path, vhd.header.parentUnicodeName) + } + const nVhds = vhds.length + + // this the VHD we want to synthetize + const vhd = vhds[0] + + // data of our synthetic VHD + // TODO: empty parentUuid and parentLocatorEntry-s in header + let header = { + ...vhd.header, + tableOffset: { + high: 0, + low: 512 + 1024, + }, + parentUnicodeName: '', + } + + const bat = Buffer.allocUnsafe( + Math.ceil(4 * header.maxTableEntries / VHD_SECTOR_SIZE) * VHD_SECTOR_SIZE + ) + let footer = { + ...vhd.footer, + diskType: HARD_DISK_TYPE_DYNAMIC, + } + const sectorsPerBlockData = vhd.sectorsPerBlock + const sectorsPerBlock = + sectorsPerBlockData + vhd.bitmapSize / VHD_SECTOR_SIZE + + const nBlocks = Math.ceil( + uint32ToUint64(footer.currentSize) / header.blockSize + ) + + const blocksOwner = new Array(nBlocks) + for ( + let iBlock = 0, + blockOffset = Math.ceil((512 + 1024 + bat.length) / VHD_SECTOR_SIZE); + iBlock < nBlocks; + ++iBlock + ) { + let blockSector = BLOCK_UNUSED + for (let i = 0; i < nVhds; ++i) { + if (vhds[i].containsBlock(iBlock)) { + blocksOwner[iBlock] = i + blockSector = blockOffset + blockOffset += sectorsPerBlock break } - - path = resolveRelativeFromFile(path, vhd.header.parentUnicodeName) - } - const nVhds = vhds.length - - // this the VHD we want to synthetize - const vhd = vhds[0] - - // data of our synthetic VHD - // TODO: empty parentUuid and parentLocatorEntry-s in header - let header = { - ...vhd.header, - tableOffset: { - high: 0, - low: 512 + 1024, - }, - parentUnicodeName: '', - } - - const bat = Buffer.allocUnsafe( - Math.ceil(4 * header.maxTableEntries / VHD_SECTOR_SIZE) * - VHD_SECTOR_SIZE - ) - let footer = { - ...vhd.footer, - diskType: HARD_DISK_TYPE_DYNAMIC, - } - const sectorsPerBlockData = vhd.sectorsPerBlock - const sectorsPerBlock = - sectorsPerBlockData + vhd.bitmapSize / VHD_SECTOR_SIZE - - const nBlocks = Math.ceil( - uint32ToUint64(footer.currentSize) / header.blockSize - ) - - const blocksOwner = new Array(nBlocks) - for ( - let iBlock = 0, - blockOffset = Math.ceil((512 + 1024 + bat.length) / VHD_SECTOR_SIZE); - iBlock < nBlocks; - ++iBlock - ) { - let blockSector = BLOCK_UNUSED - for (let i = 0; i < nVhds; ++i) { - if (vhds[i].containsBlock(iBlock)) { - blocksOwner[iBlock] = i - blockSector = blockOffset - blockOffset += sectorsPerBlock - break - } - } - bat.writeUInt32BE(blockSector, iBlock * 4) - } - - footer = fuFooter.pack(footer) - checksumStruct(footer, fuFooter) - yield footer - - header = fuHeader.pack(header) - checksumStruct(header, fuHeader) - yield header - - yield bat - - const bitmap = Buffer.alloc(vhd.bitmapSize, 0xff) - for (let iBlock = 0; iBlock < nBlocks; ++iBlock) { - const owner = blocksOwner[iBlock] - if (owner === undefined) { - continue - } - - yield bitmap - - const blocksByVhd = new Map() - const emitBlockSectors = function * (iVhd, i, n) { - const vhd = vhds[iVhd] - if (!vhd.containsBlock(iBlock)) { - yield * emitBlockSectors(iVhd + 1, i, n) - return - } - let block = blocksByVhd.get(vhd) - if (block === undefined) { - block = yield vhd._readBlock(iBlock) - blocksByVhd.set(vhd, block) - } - const { bitmap, data } = block - if (vhd.footer.diskType === HARD_DISK_TYPE_DYNAMIC) { - yield data.slice(i * VHD_SECTOR_SIZE, n * VHD_SECTOR_SIZE) - return - } - while (i < n) { - const hasData = mapTestBit(bitmap, i) - const start = i - do { - ++i - } while (i < n && mapTestBit(bitmap, i) === hasData) - if (hasData) { - yield data.slice(start * VHD_SECTOR_SIZE, i * VHD_SECTOR_SIZE) - } else { - yield * emitBlockSectors(iVhd + 1, start, i) - } - } - } - yield * emitBlockSectors(owner, 0, sectorsPerBlock) - } - - yield footer - } finally { - for (let i = 0, n = fds.length; i < n; ++i) { - handler.closeFile(fds[i]).catch(error => { - console.warn('createReadStream, closeFd', i, error) - }) } + bat.writeUInt32BE(blockSector, iBlock * 4) } - }) + + footer = fuFooter.pack(footer) + checksumStruct(footer, fuFooter) + yield footer + + header = fuHeader.pack(header) + checksumStruct(header, fuHeader) + yield header + + yield bat + + const bitmap = Buffer.alloc(vhd.bitmapSize, 0xff) + for (let iBlock = 0; iBlock < nBlocks; ++iBlock) { + const owner = blocksOwner[iBlock] + if (owner === undefined) { + continue + } + + yield bitmap + + const blocksByVhd = new Map() + const emitBlockSectors = function * (iVhd, i, n) { + const vhd = vhds[iVhd] + if (!vhd.containsBlock(iBlock)) { + yield * emitBlockSectors(iVhd + 1, i, n) + return + } + let block = blocksByVhd.get(vhd) + if (block === undefined) { + block = yield vhd._readBlock(iBlock) + blocksByVhd.set(vhd, block) + } + const { bitmap, data } = block + if (vhd.footer.diskType === HARD_DISK_TYPE_DYNAMIC) { + yield data.slice(i * VHD_SECTOR_SIZE, n * VHD_SECTOR_SIZE) + return + } + while (i < n) { + const hasData = mapTestBit(bitmap, i) + const start = i + do { + ++i + } while (i < n && mapTestBit(bitmap, i) === hasData) + if (hasData) { + yield data.slice(start * VHD_SECTOR_SIZE, i * VHD_SECTOR_SIZE) + } else { + yield * emitBlockSectors(iVhd + 1, start, i) + } + } + } + yield * emitBlockSectors(owner, 0, sectorsPerBlock) + } + + yield footer + } finally { + for (let i = 0, n = fds.length; i < n; ++i) { + handler.closeFile(fds[i]).catch(error => { + console.warn('createReadStream, closeFd', i, error) + }) + } + } +}) export async function readVhdMetadata (handler: RemoteHandler, path: string) { const vhd = new Vhd(handler, path) diff --git a/yarn.lock b/yarn.lock index 29f9847b0..09aa5ddcc 100644 --- a/yarn.lock +++ b/yarn.lock @@ -611,7 +611,7 @@ pirates "^3.0.1" source-map-support "^0.4.2" -"@babel/runtime@^7.0.0-beta.39": +"@babel/runtime@^7.0.0-beta.39", "@babel/runtime@^7.0.0-beta.42": version "7.0.0-beta.42" resolved "https://registry.yarnpkg.com/@babel/runtime/-/runtime-7.0.0-beta.42.tgz#352e40c92e0460d3e82f49bd7e79f6cda76f919f" dependencies: @@ -1252,6 +1252,13 @@ async-foreach@^0.1.3: version "0.1.3" resolved "https://registry.yarnpkg.com/async-foreach/-/async-foreach-0.1.3.tgz#36121f845c0578172de419a97dbeb1d16ec34542" +async-iterator-to-stream@^1.0.1: + version "1.0.1" + resolved "https://registry.yarnpkg.com/async-iterator-to-stream/-/async-iterator-to-stream-1.0.1.tgz#e609c9102a17412772b9c94ffb77ddffb9b743b2" + dependencies: + "@babel/runtime" "^7.0.0-beta.42" + readable-stream "^2.3.5" + async-limiter@~1.0.0: version "1.0.0" resolved "https://registry.yarnpkg.com/async-limiter/-/async-limiter-1.0.0.tgz#78faed8c3d074ab81f22b4e985d79e8738f720f8"