spawn only one promise in _read() of the outpur stream
- instrument VirtualBuffer to detect unwanted concurrency - move VMDK version detection to header parsing - chunk zero pages in the output stage - avoid sending too much data at once in the output stage (with a shadow file)
This commit is contained in:
@@ -222,48 +222,84 @@ export class ReadableRawVHDStream extends stream.Readable {
|
||||
this.position = 0
|
||||
this.vmdkParser = vmdkParser
|
||||
this.done = false
|
||||
this.busy = false
|
||||
this.currentFile = []
|
||||
}
|
||||
|
||||
filePadding (paddingLength) {
|
||||
if (paddingLength !== 0) {
|
||||
const chunkSize = 1024 * 1024 // 1Mo
|
||||
const chunkCount = Math.floor(paddingLength / chunkSize)
|
||||
for (let i = 0; i < chunkCount; i++) {
|
||||
this.currentFile.push(() => {
|
||||
const paddingBuffer = new Buffer(chunkSize)
|
||||
paddingBuffer.fill(0)
|
||||
return paddingBuffer
|
||||
})
|
||||
}
|
||||
this.currentFile.push(() => {
|
||||
const paddingBuffer = new Buffer(paddingLength % chunkSize)
|
||||
paddingBuffer.fill(0)
|
||||
return paddingBuffer
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
async pushNextBlock () {
|
||||
const next = await this.vmdkParser.next()
|
||||
if (next === null) {
|
||||
const paddingLength = this.size - this.position
|
||||
this.filePadding(paddingLength)
|
||||
this.currentFile.push(() => this.footer)
|
||||
this.currentFile.push(() => {
|
||||
this.done = true
|
||||
return null
|
||||
})
|
||||
} else {
|
||||
const offset = next.lbaBytes
|
||||
const buffer = next.grain
|
||||
const paddingLength = offset - this.position
|
||||
if (paddingLength < 0) {
|
||||
process.nextTick(() => this.emit('error', 'This VMDK file does not have its blocks in the correct order'))
|
||||
}
|
||||
this.filePadding(paddingLength)
|
||||
this.currentFile.push(() => buffer)
|
||||
this.position = offset + buffer.length
|
||||
}
|
||||
return this.pushFileUntilFull()
|
||||
}
|
||||
|
||||
// returns true if the file is empty
|
||||
pushFileUntilFull () {
|
||||
while (true) {
|
||||
if (this.currentFile.length === 0) {
|
||||
break
|
||||
}
|
||||
const result = this.push(this.currentFile.shift()())
|
||||
if (!result) {
|
||||
break
|
||||
}
|
||||
}
|
||||
return this.currentFile.length === 0
|
||||
}
|
||||
|
||||
async pushNextUntilFull () {
|
||||
while (!this.done && await this.pushNextBlock()) {
|
||||
}
|
||||
}
|
||||
|
||||
_read () {
|
||||
this.vmdkParser.next().then((next) => {
|
||||
if (this.done) {
|
||||
return
|
||||
}
|
||||
if (next === null) {
|
||||
const paddingLength = this.size - this.position
|
||||
if (paddingLength !== 0) {
|
||||
const chunkSize = 10 * 1024 * 1024
|
||||
const chunkCount = Math.floor(paddingLength / chunkSize)
|
||||
for (let i = 0; i < chunkCount; i++) {
|
||||
const paddingBuffer = new Buffer(chunkSize)
|
||||
paddingBuffer.fill(0)
|
||||
this.push(paddingBuffer)
|
||||
}
|
||||
const paddingBuffer = new Buffer(paddingLength % chunkSize)
|
||||
paddingBuffer.fill(0)
|
||||
this.push(paddingBuffer)
|
||||
}
|
||||
this.push(this.footer)
|
||||
this.push(null)
|
||||
this.done = true
|
||||
} else {
|
||||
const offset = next.lbaBytes
|
||||
const buffer = next.grain
|
||||
const paddingLength = offset - this.position
|
||||
if (paddingLength < 0) {
|
||||
process.nextTick(() => this.emit('error', 'This VMDK file does not have its blocks in the correct order'))
|
||||
}
|
||||
if (paddingLength !== 0) {
|
||||
const paddingBuffer = new Buffer(paddingLength)
|
||||
paddingBuffer.fill(0)
|
||||
this.push(paddingBuffer)
|
||||
}
|
||||
this.push(buffer)
|
||||
this.position = offset + buffer.length
|
||||
}
|
||||
}).catch((error) => {
|
||||
this.emit('error', error)
|
||||
})
|
||||
if (this.busy || this.done) {
|
||||
return
|
||||
}
|
||||
if (this.pushFileUntilFull()) {
|
||||
this.busy = true
|
||||
this.pushNextUntilFull().then(() => {
|
||||
this.busy = false
|
||||
}).catch((error) => {
|
||||
process.nextTick(() => this.emit('error', error))
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ export class VirtualBuffer {
|
||||
constructor (readStream) {
|
||||
this.slicer = new Slicer(readStream)
|
||||
this.position = 0
|
||||
this.promise = null
|
||||
}
|
||||
|
||||
get isDepleted () {
|
||||
@@ -15,7 +16,11 @@ export class VirtualBuffer {
|
||||
}
|
||||
|
||||
// length = -1 means 'until the end'
|
||||
async readChunk (length) {
|
||||
async readChunk (length, label) {
|
||||
const _this = this
|
||||
if (this.promise !== null) {
|
||||
throw new Error('pomise already there !!!', this.promise)
|
||||
}
|
||||
if (length === -1) {
|
||||
const chunks = []
|
||||
let error = false
|
||||
@@ -34,12 +39,15 @@ export class VirtualBuffer {
|
||||
} while (error === false)
|
||||
return Buffer.concat(chunks)
|
||||
} else {
|
||||
this.promise = label
|
||||
return new Promise((resolve, reject) => {
|
||||
this.slicer.read(length, (error, actualLength, data, offset) => {
|
||||
if (error !== false && error !== true) {
|
||||
_this.promise = null
|
||||
reject(error)
|
||||
} else {
|
||||
this.position += actualLength
|
||||
_this.promise = null
|
||||
_this.position += data.length
|
||||
resolve(data)
|
||||
}
|
||||
})
|
||||
|
||||
@@ -66,6 +66,14 @@ function parseFlags (flagBuffer) {
|
||||
}
|
||||
|
||||
function parseHeader (buffer) {
|
||||
const magicString = buffer.slice(0, 4).toString('ascii')
|
||||
if (magicString !== 'KDMV') {
|
||||
throw new Error('not a VMDK file')
|
||||
}
|
||||
const version = buffer.readUInt32LE(4)
|
||||
if (version !== 1 && version !== 3) {
|
||||
throw new Error('unsupported VMDK version ' + version + ', only version 1 and 3 are supported')
|
||||
}
|
||||
const flags = parseFlags(buffer.slice(8, 12))
|
||||
const capacitySectors = parseU64b(buffer, 12, 'capacitySectors')
|
||||
const grainSizeSectors = parseU64b(buffer, 20, 'grainSizeSectors')
|
||||
@@ -122,7 +130,7 @@ export class VMDKDirectParser {
|
||||
}
|
||||
|
||||
async readHeader () {
|
||||
const headerBuffer = await this.virtualBuffer.readChunk(512)
|
||||
const headerBuffer = await this.virtualBuffer.readChunk(512, 'readHeader')
|
||||
const magicString = headerBuffer.slice(0, 4).toString('ascii')
|
||||
if (magicString !== 'KDMV') {
|
||||
throw new Error('not a VMDK file')
|
||||
@@ -134,27 +142,27 @@ export class VMDKDirectParser {
|
||||
this.header = parseHeader(headerBuffer)
|
||||
// I think the multiplications are OK, because the descriptor is always at the beginning of the file
|
||||
const descriptorLength = this.header.descriptorSizeSectors * sectorSize
|
||||
const descriptorBuffer = await this.virtualBuffer.readChunk(descriptorLength)
|
||||
const descriptorBuffer = await this.virtualBuffer.readChunk(descriptorLength, 'descriptor')
|
||||
this.descriptor = parseDescriptor(descriptorBuffer)
|
||||
return this.header
|
||||
}
|
||||
|
||||
async next () {
|
||||
while (!this.virtualBuffer.isDepleted) {
|
||||
const sector = await this.virtualBuffer.readChunk(512)
|
||||
const sector = await this.virtualBuffer.readChunk(512, 'marker start ' + this.virtualBuffer.position)
|
||||
if (sector.length === 0) {
|
||||
break
|
||||
}
|
||||
const marker = tryToParseMarker(sector)
|
||||
if (marker.size === 0) {
|
||||
if (marker.value !== 0) {
|
||||
await this.virtualBuffer.readChunk(marker.value * sectorSize)
|
||||
await this.virtualBuffer.readChunk(marker.value * sectorSize, 'other marker value ' + this.virtualBuffer.position)
|
||||
}
|
||||
} else if (marker.size > 10) {
|
||||
const grainDiskSize = marker.size + 12
|
||||
const alignedGrainDiskSize = Math.ceil(grainDiskSize / sectorSize) * sectorSize
|
||||
const remainOfBufferSize = alignedGrainDiskSize - sectorSize
|
||||
const remainderOfGrainBuffer = await this.virtualBuffer.readChunk(remainOfBufferSize)
|
||||
const remainderOfGrainBuffer = await this.virtualBuffer.readChunk(remainOfBufferSize, 'grain remainder ' + this.virtualBuffer.position)
|
||||
const grainBuffer = Buffer.concat([sector, remainderOfGrainBuffer])
|
||||
return readGrain(0, grainBuffer, true)
|
||||
}
|
||||
@@ -165,25 +173,16 @@ export class VMDKDirectParser {
|
||||
|
||||
export async function readRawContent (readStream) {
|
||||
const virtualBuffer = new VirtualBuffer(readStream)
|
||||
const headerBuffer = await virtualBuffer.readChunk(512)
|
||||
const magicString = headerBuffer.slice(0, 4).toString('ascii')
|
||||
if (magicString !== 'KDMV') {
|
||||
throw new Error('not a VMDK file')
|
||||
}
|
||||
const version = headerBuffer.readUInt32LE(4)
|
||||
if (version !== 1 && version !== 3) {
|
||||
throw new Error('unsupported VMDK version ' + version + ', only version 1 and 3 are supported')
|
||||
}
|
||||
|
||||
const headerBuffer = await virtualBuffer.readChunk(512, 'header')
|
||||
let header = parseHeader(headerBuffer)
|
||||
|
||||
// I think the multiplications are OK, because the descriptor is always at the beginning of the file
|
||||
const descriptorLength = header.descriptorSizeSectors * sectorSize
|
||||
const descriptorBuffer = await virtualBuffer.readChunk(descriptorLength)
|
||||
const descriptorBuffer = await virtualBuffer.readChunk(descriptorLength, 'descriptor')
|
||||
const descriptor = parseDescriptor(descriptorBuffer)
|
||||
|
||||
// TODO: we concat them back for now so that the indices match, we'll have to introduce a bias later
|
||||
const remainingBuffer = await virtualBuffer.readChunk(-1)
|
||||
const remainingBuffer = await virtualBuffer.readChunk(-1, 'remainder')
|
||||
const buffer = Buffer.concat([headerBuffer, descriptorBuffer, remainingBuffer])
|
||||
if (header.grainDirectoryOffsetSectors === -1) {
|
||||
header = parseHeader(buffer.slice(-1024, -1024 + sectorSize))
|
||||
|
||||
@@ -13,13 +13,10 @@ describe('VMDK reading', () => {
|
||||
await exec('rm -f ' + fileName + '&& VBoxManage convertfromraw --format VMDK --variant Stream ' + rawFileName + ' ' + fileName)
|
||||
const parser = new VMDKDirectParser(createReadStream(fileName))
|
||||
const header = await parser.readHeader()
|
||||
let grain
|
||||
const harvested = []
|
||||
while (true) {
|
||||
grain = parser.next()
|
||||
const res = await grain
|
||||
const res = await parser.next()
|
||||
if (res === null) {
|
||||
console.log('VMDK reading got null')
|
||||
break
|
||||
}
|
||||
harvested.push(res)
|
||||
|
||||
Reference in New Issue
Block a user