From 28799c62a99118a82b77915bf27fb8dd21ec8d77 Mon Sep 17 00:00:00 2001 From: Nicolas Raynaud Date: Wed, 24 Aug 2016 09:39:02 -0700 Subject: [PATCH] spawn only one promise in _read() of the outpur stream - instrument VirtualBuffer to detect unwanted concurrency - move VMDK version detection to header parsing - chunk zero pages in the output stage - avoid sending too much data at once in the output stage (with a shadow file) --- packages/xo-vmdk-to-vhd/src/vhd-write.js | 114 ++++++++++++------ packages/xo-vmdk-to-vhd/src/virtual-buffer.js | 12 +- packages/xo-vmdk-to-vhd/src/vmdk-read.js | 33 +++-- packages/xo-vmdk-to-vhd/src/vmdk-read.spec.js | 5 +- 4 files changed, 102 insertions(+), 62 deletions(-) diff --git a/packages/xo-vmdk-to-vhd/src/vhd-write.js b/packages/xo-vmdk-to-vhd/src/vhd-write.js index 66bf7d4e1..9681127ff 100644 --- a/packages/xo-vmdk-to-vhd/src/vhd-write.js +++ b/packages/xo-vmdk-to-vhd/src/vhd-write.js @@ -222,48 +222,84 @@ export class ReadableRawVHDStream extends stream.Readable { this.position = 0 this.vmdkParser = vmdkParser this.done = false + this.busy = false + this.currentFile = [] + } + + filePadding (paddingLength) { + if (paddingLength !== 0) { + const chunkSize = 1024 * 1024 // 1Mo + const chunkCount = Math.floor(paddingLength / chunkSize) + for (let i = 0; i < chunkCount; i++) { + this.currentFile.push(() => { + const paddingBuffer = new Buffer(chunkSize) + paddingBuffer.fill(0) + return paddingBuffer + }) + } + this.currentFile.push(() => { + const paddingBuffer = new Buffer(paddingLength % chunkSize) + paddingBuffer.fill(0) + return paddingBuffer + }) + } + } + + async pushNextBlock () { + const next = await this.vmdkParser.next() + if (next === null) { + const paddingLength = this.size - this.position + this.filePadding(paddingLength) + this.currentFile.push(() => this.footer) + this.currentFile.push(() => { + this.done = true + return null + }) + } else { + const offset = next.lbaBytes + const buffer = next.grain + const paddingLength = offset - this.position + if (paddingLength < 0) { + process.nextTick(() => this.emit('error', 'This VMDK file does not have its blocks in the correct order')) + } + this.filePadding(paddingLength) + this.currentFile.push(() => buffer) + this.position = offset + buffer.length + } + return this.pushFileUntilFull() + } + + // returns true if the file is empty + pushFileUntilFull () { + while (true) { + if (this.currentFile.length === 0) { + break + } + const result = this.push(this.currentFile.shift()()) + if (!result) { + break + } + } + return this.currentFile.length === 0 + } + + async pushNextUntilFull () { + while (!this.done && await this.pushNextBlock()) { + } } _read () { - this.vmdkParser.next().then((next) => { - if (this.done) { - return - } - if (next === null) { - const paddingLength = this.size - this.position - if (paddingLength !== 0) { - const chunkSize = 10 * 1024 * 1024 - const chunkCount = Math.floor(paddingLength / chunkSize) - for (let i = 0; i < chunkCount; i++) { - const paddingBuffer = new Buffer(chunkSize) - paddingBuffer.fill(0) - this.push(paddingBuffer) - } - const paddingBuffer = new Buffer(paddingLength % chunkSize) - paddingBuffer.fill(0) - this.push(paddingBuffer) - } - this.push(this.footer) - this.push(null) - this.done = true - } else { - const offset = next.lbaBytes - const buffer = next.grain - const paddingLength = offset - this.position - if (paddingLength < 0) { - process.nextTick(() => this.emit('error', 'This VMDK file does not have its blocks in the correct order')) - } - if (paddingLength !== 0) { - const paddingBuffer = new Buffer(paddingLength) - paddingBuffer.fill(0) - this.push(paddingBuffer) - } - this.push(buffer) - this.position = offset + buffer.length - } - }).catch((error) => { - this.emit('error', error) - }) + if (this.busy || this.done) { + return + } + if (this.pushFileUntilFull()) { + this.busy = true + this.pushNextUntilFull().then(() => { + this.busy = false + }).catch((error) => { + process.nextTick(() => this.emit('error', error)) + }) + } } } diff --git a/packages/xo-vmdk-to-vhd/src/virtual-buffer.js b/packages/xo-vmdk-to-vhd/src/virtual-buffer.js index 932b04733..5df1823f1 100644 --- a/packages/xo-vmdk-to-vhd/src/virtual-buffer.js +++ b/packages/xo-vmdk-to-vhd/src/virtual-buffer.js @@ -8,6 +8,7 @@ export class VirtualBuffer { constructor (readStream) { this.slicer = new Slicer(readStream) this.position = 0 + this.promise = null } get isDepleted () { @@ -15,7 +16,11 @@ export class VirtualBuffer { } // length = -1 means 'until the end' - async readChunk (length) { + async readChunk (length, label) { + const _this = this + if (this.promise !== null) { + throw new Error('pomise already there !!!', this.promise) + } if (length === -1) { const chunks = [] let error = false @@ -34,12 +39,15 @@ export class VirtualBuffer { } while (error === false) return Buffer.concat(chunks) } else { + this.promise = label return new Promise((resolve, reject) => { this.slicer.read(length, (error, actualLength, data, offset) => { if (error !== false && error !== true) { + _this.promise = null reject(error) } else { - this.position += actualLength + _this.promise = null + _this.position += data.length resolve(data) } }) diff --git a/packages/xo-vmdk-to-vhd/src/vmdk-read.js b/packages/xo-vmdk-to-vhd/src/vmdk-read.js index b20854e31..210cbb668 100644 --- a/packages/xo-vmdk-to-vhd/src/vmdk-read.js +++ b/packages/xo-vmdk-to-vhd/src/vmdk-read.js @@ -66,6 +66,14 @@ function parseFlags (flagBuffer) { } function parseHeader (buffer) { + const magicString = buffer.slice(0, 4).toString('ascii') + if (magicString !== 'KDMV') { + throw new Error('not a VMDK file') + } + const version = buffer.readUInt32LE(4) + if (version !== 1 && version !== 3) { + throw new Error('unsupported VMDK version ' + version + ', only version 1 and 3 are supported') + } const flags = parseFlags(buffer.slice(8, 12)) const capacitySectors = parseU64b(buffer, 12, 'capacitySectors') const grainSizeSectors = parseU64b(buffer, 20, 'grainSizeSectors') @@ -122,7 +130,7 @@ export class VMDKDirectParser { } async readHeader () { - const headerBuffer = await this.virtualBuffer.readChunk(512) + const headerBuffer = await this.virtualBuffer.readChunk(512, 'readHeader') const magicString = headerBuffer.slice(0, 4).toString('ascii') if (magicString !== 'KDMV') { throw new Error('not a VMDK file') @@ -134,27 +142,27 @@ export class VMDKDirectParser { this.header = parseHeader(headerBuffer) // I think the multiplications are OK, because the descriptor is always at the beginning of the file const descriptorLength = this.header.descriptorSizeSectors * sectorSize - const descriptorBuffer = await this.virtualBuffer.readChunk(descriptorLength) + const descriptorBuffer = await this.virtualBuffer.readChunk(descriptorLength, 'descriptor') this.descriptor = parseDescriptor(descriptorBuffer) return this.header } async next () { while (!this.virtualBuffer.isDepleted) { - const sector = await this.virtualBuffer.readChunk(512) + const sector = await this.virtualBuffer.readChunk(512, 'marker start ' + this.virtualBuffer.position) if (sector.length === 0) { break } const marker = tryToParseMarker(sector) if (marker.size === 0) { if (marker.value !== 0) { - await this.virtualBuffer.readChunk(marker.value * sectorSize) + await this.virtualBuffer.readChunk(marker.value * sectorSize, 'other marker value ' + this.virtualBuffer.position) } } else if (marker.size > 10) { const grainDiskSize = marker.size + 12 const alignedGrainDiskSize = Math.ceil(grainDiskSize / sectorSize) * sectorSize const remainOfBufferSize = alignedGrainDiskSize - sectorSize - const remainderOfGrainBuffer = await this.virtualBuffer.readChunk(remainOfBufferSize) + const remainderOfGrainBuffer = await this.virtualBuffer.readChunk(remainOfBufferSize, 'grain remainder ' + this.virtualBuffer.position) const grainBuffer = Buffer.concat([sector, remainderOfGrainBuffer]) return readGrain(0, grainBuffer, true) } @@ -165,25 +173,16 @@ export class VMDKDirectParser { export async function readRawContent (readStream) { const virtualBuffer = new VirtualBuffer(readStream) - const headerBuffer = await virtualBuffer.readChunk(512) - const magicString = headerBuffer.slice(0, 4).toString('ascii') - if (magicString !== 'KDMV') { - throw new Error('not a VMDK file') - } - const version = headerBuffer.readUInt32LE(4) - if (version !== 1 && version !== 3) { - throw new Error('unsupported VMDK version ' + version + ', only version 1 and 3 are supported') - } - + const headerBuffer = await virtualBuffer.readChunk(512, 'header') let header = parseHeader(headerBuffer) // I think the multiplications are OK, because the descriptor is always at the beginning of the file const descriptorLength = header.descriptorSizeSectors * sectorSize - const descriptorBuffer = await virtualBuffer.readChunk(descriptorLength) + const descriptorBuffer = await virtualBuffer.readChunk(descriptorLength, 'descriptor') const descriptor = parseDescriptor(descriptorBuffer) // TODO: we concat them back for now so that the indices match, we'll have to introduce a bias later - const remainingBuffer = await virtualBuffer.readChunk(-1) + const remainingBuffer = await virtualBuffer.readChunk(-1, 'remainder') const buffer = Buffer.concat([headerBuffer, descriptorBuffer, remainingBuffer]) if (header.grainDirectoryOffsetSectors === -1) { header = parseHeader(buffer.slice(-1024, -1024 + sectorSize)) diff --git a/packages/xo-vmdk-to-vhd/src/vmdk-read.spec.js b/packages/xo-vmdk-to-vhd/src/vmdk-read.spec.js index 3785a390f..37745c3cd 100644 --- a/packages/xo-vmdk-to-vhd/src/vmdk-read.spec.js +++ b/packages/xo-vmdk-to-vhd/src/vmdk-read.spec.js @@ -13,13 +13,10 @@ describe('VMDK reading', () => { await exec('rm -f ' + fileName + '&& VBoxManage convertfromraw --format VMDK --variant Stream ' + rawFileName + ' ' + fileName) const parser = new VMDKDirectParser(createReadStream(fileName)) const header = await parser.readHeader() - let grain const harvested = [] while (true) { - grain = parser.next() - const res = await grain + const res = await parser.next() if (res === null) { - console.log('VMDK reading got null') break } harvested.push(res)