chore(xo-server/ag2s): use async-iterator-to-stream instead

This commit is contained in:
Julien Fontanet 2018-03-26 16:26:12 +02:00
parent 4cfe3ec06e
commit 20d5047b55
4 changed files with 134 additions and 196 deletions

View File

@ -38,6 +38,7 @@
"ajv": "^6.1.1",
"app-conf": "^0.5.0",
"archiver": "^2.1.0",
"async-iterator-to-stream": "^1.0.1",
"base64url": "^2.0.0",
"bind-property-descriptor": "^1.0.0",
"blocked": "^1.2.1",

View File

@ -1,68 +0,0 @@
import { Readable } from 'stream'
// return the next value of the iterator but if it is a promise, resolve it and
// reinject it
//
// this enables the use of a simple generator instead of an async generator
// (which are less widely supported)
const next = async (iterator, arg) => {
let cursor = iterator.next(arg)
if (typeof cursor.then === 'function') {
return cursor
}
let value
while (
!cursor.done &&
(value = cursor.value) != null &&
typeof value.then === 'function'
) {
let success = false
try {
value = await value
success = true
} catch (error) {
cursor = iterator.throw(error)
}
if (success) {
cursor = iterator.next(value)
}
}
return cursor
}
// Create a readable stream from a generator
//
// generator can be async or can yield promises to wait for them
export const createReadable = (generator, options) => {
const readable = new Readable(options)
readable._read = size => {
const iterator = generator(size)
readable._destroy = (error, cb) => {
iterator.throw(error)
cb(error)
}
let running = false
const read = (readable._read = async size => {
if (running) {
return
}
running = true
try {
let cursor
do {
cursor = await next(iterator, size)
if (cursor.done) {
return readable.push(null)
}
} while (readable.push(cursor.value))
} catch (error) {
readable.emit('error', error)
} finally {
running = false
}
})
return read(size)
}
return readable
}

View File

@ -1,6 +1,7 @@
// TODO: remove once completely merged in vhd.js
import assert from 'assert'
import asyncIteratorToStream from 'async-iterator-to-stream'
import concurrency from 'limit-concurrency-decorator'
import fu from '@nraynaud/struct-fu'
import isEqual from 'lodash/isEqual'
@ -9,7 +10,6 @@ import { fromEvent } from 'promise-toolbox'
import type RemoteHandler from './remote-handlers/abstract'
import constantStream from './constant-stream'
import { createReadable } from './ag2s'
import { noop, resolveRelativeFromFile, streamToBuffer } from './utils'
const VHD_UTIL_DEBUG = 0
@ -769,138 +769,136 @@ export async function chainVhd (
return false
}
export const createReadStream = (handler, path) =>
createReadable(function * () {
const fds = []
export const createReadStream = asyncIteratorToStream(function * (handler, path) {
const fds = []
try {
const vhds = []
while (true) {
const fd = yield handler.openFile(path, 'r')
fds.push(fd)
const vhd = new Vhd(handler, fd)
vhds.push(vhd)
yield vhd.readHeaderAndFooter()
yield vhd.readBlockTable()
try {
const vhds = []
while (true) {
const fd = yield handler.openFile(path, 'r')
fds.push(fd)
const vhd = new Vhd(handler, fd)
vhds.push(vhd)
yield vhd.readHeaderAndFooter()
yield vhd.readBlockTable()
if (vhd.footer.diskType === HARD_DISK_TYPE_DYNAMIC) {
if (vhd.footer.diskType === HARD_DISK_TYPE_DYNAMIC) {
break
}
path = resolveRelativeFromFile(path, vhd.header.parentUnicodeName)
}
const nVhds = vhds.length
// this the VHD we want to synthetize
const vhd = vhds[0]
// data of our synthetic VHD
// TODO: empty parentUuid and parentLocatorEntry-s in header
let header = {
...vhd.header,
tableOffset: {
high: 0,
low: 512 + 1024,
},
parentUnicodeName: '',
}
const bat = Buffer.allocUnsafe(
Math.ceil(4 * header.maxTableEntries / VHD_SECTOR_SIZE) * VHD_SECTOR_SIZE
)
let footer = {
...vhd.footer,
diskType: HARD_DISK_TYPE_DYNAMIC,
}
const sectorsPerBlockData = vhd.sectorsPerBlock
const sectorsPerBlock =
sectorsPerBlockData + vhd.bitmapSize / VHD_SECTOR_SIZE
const nBlocks = Math.ceil(
uint32ToUint64(footer.currentSize) / header.blockSize
)
const blocksOwner = new Array(nBlocks)
for (
let iBlock = 0,
blockOffset = Math.ceil((512 + 1024 + bat.length) / VHD_SECTOR_SIZE);
iBlock < nBlocks;
++iBlock
) {
let blockSector = BLOCK_UNUSED
for (let i = 0; i < nVhds; ++i) {
if (vhds[i].containsBlock(iBlock)) {
blocksOwner[iBlock] = i
blockSector = blockOffset
blockOffset += sectorsPerBlock
break
}
path = resolveRelativeFromFile(path, vhd.header.parentUnicodeName)
}
const nVhds = vhds.length
// this the VHD we want to synthetize
const vhd = vhds[0]
// data of our synthetic VHD
// TODO: empty parentUuid and parentLocatorEntry-s in header
let header = {
...vhd.header,
tableOffset: {
high: 0,
low: 512 + 1024,
},
parentUnicodeName: '',
}
const bat = Buffer.allocUnsafe(
Math.ceil(4 * header.maxTableEntries / VHD_SECTOR_SIZE) *
VHD_SECTOR_SIZE
)
let footer = {
...vhd.footer,
diskType: HARD_DISK_TYPE_DYNAMIC,
}
const sectorsPerBlockData = vhd.sectorsPerBlock
const sectorsPerBlock =
sectorsPerBlockData + vhd.bitmapSize / VHD_SECTOR_SIZE
const nBlocks = Math.ceil(
uint32ToUint64(footer.currentSize) / header.blockSize
)
const blocksOwner = new Array(nBlocks)
for (
let iBlock = 0,
blockOffset = Math.ceil((512 + 1024 + bat.length) / VHD_SECTOR_SIZE);
iBlock < nBlocks;
++iBlock
) {
let blockSector = BLOCK_UNUSED
for (let i = 0; i < nVhds; ++i) {
if (vhds[i].containsBlock(iBlock)) {
blocksOwner[iBlock] = i
blockSector = blockOffset
blockOffset += sectorsPerBlock
break
}
}
bat.writeUInt32BE(blockSector, iBlock * 4)
}
footer = fuFooter.pack(footer)
checksumStruct(footer, fuFooter)
yield footer
header = fuHeader.pack(header)
checksumStruct(header, fuHeader)
yield header
yield bat
const bitmap = Buffer.alloc(vhd.bitmapSize, 0xff)
for (let iBlock = 0; iBlock < nBlocks; ++iBlock) {
const owner = blocksOwner[iBlock]
if (owner === undefined) {
continue
}
yield bitmap
const blocksByVhd = new Map()
const emitBlockSectors = function * (iVhd, i, n) {
const vhd = vhds[iVhd]
if (!vhd.containsBlock(iBlock)) {
yield * emitBlockSectors(iVhd + 1, i, n)
return
}
let block = blocksByVhd.get(vhd)
if (block === undefined) {
block = yield vhd._readBlock(iBlock)
blocksByVhd.set(vhd, block)
}
const { bitmap, data } = block
if (vhd.footer.diskType === HARD_DISK_TYPE_DYNAMIC) {
yield data.slice(i * VHD_SECTOR_SIZE, n * VHD_SECTOR_SIZE)
return
}
while (i < n) {
const hasData = mapTestBit(bitmap, i)
const start = i
do {
++i
} while (i < n && mapTestBit(bitmap, i) === hasData)
if (hasData) {
yield data.slice(start * VHD_SECTOR_SIZE, i * VHD_SECTOR_SIZE)
} else {
yield * emitBlockSectors(iVhd + 1, start, i)
}
}
}
yield * emitBlockSectors(owner, 0, sectorsPerBlock)
}
yield footer
} finally {
for (let i = 0, n = fds.length; i < n; ++i) {
handler.closeFile(fds[i]).catch(error => {
console.warn('createReadStream, closeFd', i, error)
})
}
bat.writeUInt32BE(blockSector, iBlock * 4)
}
})
footer = fuFooter.pack(footer)
checksumStruct(footer, fuFooter)
yield footer
header = fuHeader.pack(header)
checksumStruct(header, fuHeader)
yield header
yield bat
const bitmap = Buffer.alloc(vhd.bitmapSize, 0xff)
for (let iBlock = 0; iBlock < nBlocks; ++iBlock) {
const owner = blocksOwner[iBlock]
if (owner === undefined) {
continue
}
yield bitmap
const blocksByVhd = new Map()
const emitBlockSectors = function * (iVhd, i, n) {
const vhd = vhds[iVhd]
if (!vhd.containsBlock(iBlock)) {
yield * emitBlockSectors(iVhd + 1, i, n)
return
}
let block = blocksByVhd.get(vhd)
if (block === undefined) {
block = yield vhd._readBlock(iBlock)
blocksByVhd.set(vhd, block)
}
const { bitmap, data } = block
if (vhd.footer.diskType === HARD_DISK_TYPE_DYNAMIC) {
yield data.slice(i * VHD_SECTOR_SIZE, n * VHD_SECTOR_SIZE)
return
}
while (i < n) {
const hasData = mapTestBit(bitmap, i)
const start = i
do {
++i
} while (i < n && mapTestBit(bitmap, i) === hasData)
if (hasData) {
yield data.slice(start * VHD_SECTOR_SIZE, i * VHD_SECTOR_SIZE)
} else {
yield * emitBlockSectors(iVhd + 1, start, i)
}
}
}
yield * emitBlockSectors(owner, 0, sectorsPerBlock)
}
yield footer
} finally {
for (let i = 0, n = fds.length; i < n; ++i) {
handler.closeFile(fds[i]).catch(error => {
console.warn('createReadStream, closeFd', i, error)
})
}
}
})
export async function readVhdMetadata (handler: RemoteHandler, path: string) {
const vhd = new Vhd(handler, path)

View File

@ -611,7 +611,7 @@
pirates "^3.0.1"
source-map-support "^0.4.2"
"@babel/runtime@^7.0.0-beta.39":
"@babel/runtime@^7.0.0-beta.39", "@babel/runtime@^7.0.0-beta.42":
version "7.0.0-beta.42"
resolved "https://registry.yarnpkg.com/@babel/runtime/-/runtime-7.0.0-beta.42.tgz#352e40c92e0460d3e82f49bd7e79f6cda76f919f"
dependencies:
@ -1252,6 +1252,13 @@ async-foreach@^0.1.3:
version "0.1.3"
resolved "https://registry.yarnpkg.com/async-foreach/-/async-foreach-0.1.3.tgz#36121f845c0578172de419a97dbeb1d16ec34542"
async-iterator-to-stream@^1.0.1:
version "1.0.1"
resolved "https://registry.yarnpkg.com/async-iterator-to-stream/-/async-iterator-to-stream-1.0.1.tgz#e609c9102a17412772b9c94ffb77ddffb9b743b2"
dependencies:
"@babel/runtime" "^7.0.0-beta.42"
readable-stream "^2.3.5"
async-limiter@~1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/async-limiter/-/async-limiter-1.0.0.tgz#78faed8c3d074ab81f22b4e985d79e8738f720f8"