From e76a0ad4bdf77bad7313c5ec3c7ebd7af54237cd Mon Sep 17 00:00:00 2001 From: Nicolas Raynaud Date: Fri, 2 Mar 2018 11:08:01 -0700 Subject: [PATCH] feat(xo-server): improve VHD merge speed (#2643) Avoid re-opening/closing the files multiple times, introduce a lot of latency in remote FS. --- packages/xo-server/bin/run-vhd-test | 13 + .../xo-server/src/remote-handlers/abstract.js | 22 +- .../xo-server/src/remote-handlers/local.js | 36 ++- packages/xo-server/src/remote-handlers/smb.js | 21 +- packages/xo-server/src/vhd-merge.js | 227 ++++++++++-------- packages/xo-server/src/vhd-test.js | 72 ++++++ 6 files changed, 287 insertions(+), 104 deletions(-) create mode 100755 packages/xo-server/bin/run-vhd-test create mode 100644 packages/xo-server/src/vhd-test.js diff --git a/packages/xo-server/bin/run-vhd-test b/packages/xo-server/bin/run-vhd-test new file mode 100755 index 000000000..ef458e282 --- /dev/null +++ b/packages/xo-server/bin/run-vhd-test @@ -0,0 +1,13 @@ +#!/usr/bin/env node + +'use strict' + +global.Promise = require('bluebird') + + +process.on('unhandledRejection', function (reason) { + console.warn('[Warn] Possibly unhandled rejection:', reason && reason.stack || reason) +}) + + +require("exec-promise")(require("../dist/vhd-test").default) diff --git a/packages/xo-server/src/remote-handlers/abstract.js b/packages/xo-server/src/remote-handlers/abstract.js index 790998e0b..e9d8e4480 100644 --- a/packages/xo-server/src/remote-handlers/abstract.js +++ b/packages/xo-server/src/remote-handlers/abstract.js @@ -112,6 +112,7 @@ export default class RemoteHandlerAbstract { file, { checksum = false, ignoreMissingChecksum = false, ...options } = {} ) { + const path = typeof file === 'string' ? file : file.path const streamP = this._createReadStream(file, options).then(stream => { // detect early errors let promise = eventToPromise(stream, 'readable') @@ -142,7 +143,7 @@ export default class RemoteHandlerAbstract { // avoid a unhandled rejection warning ;streamP::ignoreErrors() - return this.readFile(`${file}.checksum`).then( + return this.readFile(`${path}.checksum`).then( checksum => streamP.then(stream => { const { length } = stream @@ -164,6 +165,22 @@ export default class RemoteHandlerAbstract { throw new Error('Not implemented') } + async openFile (path, flags) { + return { fd: await this._openFile(path, flags), path } + } + + async _openFile (path, flags) { + throw new Error('Not implemented') + } + + async closeFile (fd) { + return this._closeFile(fd.fd) + } + + async _closeFile (fd) { + throw new Error('Not implemented') + } + async refreshChecksum (path) { const stream = addChecksumToReadStream(await this.createReadStream(path)) stream.resume() // start reading the whole file @@ -172,6 +189,7 @@ export default class RemoteHandlerAbstract { } async createOutputStream (file, { checksum = false, ...options } = {}) { + const path = typeof file === 'string' ? file : file.path const streamP = this._createOutputStream(file, { flags: 'wx', ...options, @@ -192,7 +210,7 @@ export default class RemoteHandlerAbstract { streamWithChecksum.pipe(stream) streamWithChecksum.checksum - .then(value => this.outputFile(`${file}.checksum`, value)) + .then(value => this.outputFile(`${path}.checksum`, value)) .catch(forwardError) return connectorStream diff --git a/packages/xo-server/src/remote-handlers/local.js b/packages/xo-server/src/remote-handlers/local.js index 5d2ebba55..2c2058259 100644 --- a/packages/xo-server/src/remote-handlers/local.js +++ b/packages/xo-server/src/remote-handlers/local.js @@ -63,13 +63,29 @@ export default class LocalHandler extends RemoteHandlerAbstract { } async _createReadStream (file, options) { - return fs.createReadStream(this._getFilePath(file), options) + if (typeof file === 'string') { + return fs.createReadStream(this._getFilePath(file), options) + } else { + return fs.createReadStream('', { + autoClose: false, + ...options, + fd: file.fd, + }) + } } async _createOutputStream (file, options) { - const path = this._getFilePath(file) - await fs.ensureDir(dirname(path)) - return fs.createWriteStream(path, options) + if (typeof file === 'string') { + const path = this._getFilePath(file) + await fs.ensureDir(dirname(path)) + return fs.createWriteStream(path, options) + } else { + return fs.createWriteStream('', { + autoClose: false, + ...options, + fd: file.fd, + }) + } } async _unlink (file) { @@ -82,7 +98,17 @@ export default class LocalHandler extends RemoteHandlerAbstract { } async _getSize (file) { - const stats = await fs.stat(this._getFilePath(file)) + const stats = await fs.stat( + this._getFilePath(typeof file === 'string' ? file : file.path) + ) return stats.size } + + async _openFile (path, flags) { + return fs.open(this._getFilePath(path), flags) + } + + async _closeFile (fd) { + return fs.close(fd) + } } diff --git a/packages/xo-server/src/remote-handlers/smb.js b/packages/xo-server/src/remote-handlers/smb.js index 03b48cba3..5dc01b401 100644 --- a/packages/xo-server/src/remote-handlers/smb.js +++ b/packages/xo-server/src/remote-handlers/smb.js @@ -139,6 +139,9 @@ export default class SmbHandler extends RemoteHandlerAbstract { } async _createReadStream (file, options = {}) { + if (typeof file !== 'string') { + file = file.path + } const client = this._getClient(this._remote) let stream @@ -154,6 +157,9 @@ export default class SmbHandler extends RemoteHandlerAbstract { } async _createOutputStream (file, options = {}) { + if (typeof file !== 'string') { + file = file.path + } const client = this._getClient(this._remote) const path = this._getFilePath(file) const dir = this._dirname(path) @@ -188,13 +194,22 @@ export default class SmbHandler extends RemoteHandlerAbstract { let size try { - size = await client.getSize(this._getFilePath(file))::pFinally(() => { - client.close() - }) + size = await client + .getSize(this._getFilePath(typeof file === 'string' ? file : file.path)) + ::pFinally(() => { + client.close() + }) } catch (error) { throw normalizeError(error) } return size } + + // this is a fake + async _openFile (path) { + return this._getFilePath(path) + } + + async _closeFile (fd) {} } diff --git a/packages/xo-server/src/vhd-merge.js b/packages/xo-server/src/vhd-merge.js index 07d322591..2992b371f 100644 --- a/packages/xo-server/src/vhd-merge.js +++ b/packages/xo-server/src/vhd-merge.js @@ -182,7 +182,7 @@ function checksumStruct (rawStruct, struct) { // =================================================================== -class Vhd { +export class Vhd { constructor (handler, path) { this._handler = handler this._path = path @@ -193,7 +193,7 @@ class Vhd { // ================================================================= _readStream (start, n) { - return this._handler.createReadStream(this._path, { + return this._handler.createReadStream(this._fd ? this._fd : this._path, { start, end: start + n - 1, // end is inclusive }) @@ -328,10 +328,12 @@ class Vhd { ).then( buf => onlyBitmap - ? { bitmap: buf } + ? { id: blockId, bitmap: buf } : { + id: blockId, bitmap: buf.slice(0, this.bitmapSize), data: buf.slice(this.bitmapSize), + buffer: buf, } ) } @@ -339,7 +341,6 @@ class Vhd { // get the identifiers and first sectors of the first and last block // in the file // - // return undefined if none _getFirstAndLastBlocks () { const n = this.header.maxTableEntries const bat = this.blockTable @@ -353,7 +354,9 @@ class Vhd { j += VHD_ENTRY_SIZE if (i === n) { - throw new Error('no allocated block found') + const error = new Error('no allocated block found') + error.noBlock = true + throw error } } lastSector = firstSector @@ -383,27 +386,26 @@ class Vhd { // ================================================================= // Write a buffer/stream at a given position in a vhd file. - _write (data, offset) { + async _write (data, offset) { debug( `_write offset=${offset} size=${ Buffer.isBuffer(data) ? data.length : '???' }` ) // TODO: could probably be merged in remote handlers. - return this._handler - .createOutputStream(this._path, { + const stream = await this._handler.createOutputStream( + this._fd ? this._fd : this._path, + { flags: 'r+', start: offset, + } + ) + return Buffer.isBuffer(data) + ? new Promise((resolve, reject) => { + stream.on('error', reject) + stream.end(data, resolve) }) - .then( - Buffer.isBuffer(data) - ? stream => - new Promise((resolve, reject) => { - stream.on('error', reject) - stream.end(data, resolve) - }) - : stream => eventToPromise(data.pipe(stream), 'finish') - ) + : eventToPromise(data.pipe(stream), 'finish') } async ensureBatSize (size) { @@ -415,11 +417,11 @@ class Vhd { } const tableOffset = uint32ToUint64(header.tableOffset) - const { first, firstSector, lastSector } = this._getFirstAndLastBlocks() - // extend BAT const maxTableEntries = (header.maxTableEntries = size) - const batSize = maxTableEntries * VHD_ENTRY_SIZE + const batSize = sectorsToBytes( + sectorsRoundUpNoZero(maxTableEntries * VHD_ENTRY_SIZE) + ) const prevBat = this.blockTable const bat = (this.blockTable = Buffer.allocUnsafe(batSize)) prevBat.copy(bat) @@ -428,7 +430,7 @@ class Vhd { `ensureBatSize: extend in memory BAT ${prevMaxTableEntries} -> ${maxTableEntries}` ) - const extendBat = () => { + const extendBat = async () => { debug( `ensureBatSize: extend in file BAT ${prevMaxTableEntries} -> ${maxTableEntries}` ) @@ -438,25 +440,37 @@ class Vhd { tableOffset + prevBat.length ) } + try { + const { first, firstSector, lastSector } = this._getFirstAndLastBlocks() + if (tableOffset + batSize < sectorsToBytes(firstSector)) { + return Promise.all([extendBat(), this.writeHeader()]) + } - if (tableOffset + batSize < sectorsToBytes(firstSector)) { - return Promise.all([extendBat(), this.writeHeader()]) - } + const { fullBlockSize } = this + const newFirstSector = lastSector + fullBlockSize / VHD_SECTOR_SIZE + debug( + `ensureBatSize: move first block ${firstSector} -> ${newFirstSector}` + ) - const { fullBlockSize } = this - const newFirstSector = lastSector + fullBlockSize / VHD_SECTOR_SIZE - debug(`ensureBatSize: move first block ${firstSector} -> ${newFirstSector}`) - - return Promise.all([ // copy the first block at the end - this._readStream(sectorsToBytes(firstSector), fullBlockSize) - .then(stream => this._write(stream, sectorsToBytes(newFirstSector))) - .then(extendBat), - - this._setBatEntry(first, newFirstSector), - this.writeHeader(), - this.writeFooter(), - ]) + const stream = await this._readStream( + sectorsToBytes(firstSector), + fullBlockSize + ) + await this._write(stream, sectorsToBytes(newFirstSector)) + await extendBat() + await this._setBatEntry(first, newFirstSector) + await this.writeHeader() + await this.writeFooter() + } catch (e) { + if (e.noBlock) { + await extendBat() + await this.writeHeader() + await this.writeFooter() + } else { + throw e + } + } } // set the first sector (bitmap) of a block @@ -510,7 +524,16 @@ class Vhd { await this._write(bitmap, sectorsToBytes(blockAddr)) } - async writeBlockSectors (block, beginSectorId, endSectorId) { + async writeEntireBlock (block) { + let blockAddr = this._getBatEntry(block.id) + + if (blockAddr === BLOCK_UNUSED) { + blockAddr = await this.createBlock(block.id) + } + await this._write(block.buffer, sectorsToBytes(blockAddr)) + } + + async writeBlockSectors (block, beginSectorId, endSectorId, parentBitmap) { let blockAddr = this._getBatEntry(block.id) if (blockAddr === BLOCK_UNUSED) { @@ -525,6 +548,11 @@ class Vhd { }, sectors=${beginSectorId}...${endSectorId}` ) + for (let i = beginSectorId; i < endSectorId; ++i) { + mapSetBit(parentBitmap, i) + } + + await this.writeBlockBitmap(blockAddr, parentBitmap) await this._write( block.data.slice( sectorsToBytes(beginSectorId), @@ -532,20 +560,11 @@ class Vhd { ), sectorsToBytes(offset) ) - - const { bitmap } = await this._readBlock(block.id, true) - - for (let i = beginSectorId; i < endSectorId; ++i) { - mapSetBit(bitmap, i) - } - - await this.writeBlockBitmap(blockAddr, bitmap) } - // Merge block id (of vhd child) into vhd parent. async coalesceBlock (child, blockId) { - // Get block data and bitmap of block id. - const { bitmap, data } = await child._readBlock(blockId) + const block = await child._readBlock(blockId) + const { bitmap, data } = block debug(`coalesceBlock block=${blockId}`) @@ -556,7 +575,7 @@ class Vhd { if (!mapTestBit(bitmap, i)) { continue } - + let parentBitmap = null let endSector = i + 1 // Count changed sectors. @@ -566,7 +585,16 @@ class Vhd { // Write n sectors into parent. debug(`coalesceBlock: write sectors=${i}...${endSector}`) - await this.writeBlockSectors({ id: blockId, data }, i, endSector) + + const isFullBlock = i === 0 && endSector === sectorsPerBlock + if (isFullBlock) { + await this.writeEntireBlock(block) + } else { + if (parentBitmap === null) { + parentBitmap = (await this._readBlock(blockId, true)).bitmap + } + await this.writeBlockSectors(block, i, endSector, parentBitmap) + } i = endSector } @@ -620,60 +648,71 @@ export default concurrency(2)(async function vhdMerge ( childPath ) { const parentVhd = new Vhd(parentHandler, parentPath) - const childVhd = new Vhd(childHandler, childPath) + parentVhd._fd = await parentHandler.openFile(parentPath, 'r+') + try { + const childVhd = new Vhd(childHandler, childPath) + childVhd._fd = await childHandler.openFile(childPath, 'r') + try { + // Reading footer and header. + await Promise.all([ + parentVhd.readHeaderAndFooter(), + childVhd.readHeaderAndFooter(), + ]) - // Reading footer and header. - await Promise.all([ - parentVhd.readHeaderAndFooter(), - childVhd.readHeaderAndFooter(), - ]) + assert(childVhd.header.blockSize === parentVhd.header.blockSize) - assert(childVhd.header.blockSize === parentVhd.header.blockSize) + // Child must be a delta. + if (childVhd.footer.diskType !== HARD_DISK_TYPE_DIFFERENCING) { + throw new Error('Unable to merge, child is not a delta backup.') + } - // Child must be a delta. - if (childVhd.footer.diskType !== HARD_DISK_TYPE_DIFFERENCING) { - throw new Error('Unable to merge, child is not a delta backup.') - } + // Merging in differencing disk is prohibited in our case. + if (parentVhd.footer.diskType !== HARD_DISK_TYPE_DYNAMIC) { + throw new Error('Unable to merge, parent is not a full backup.') + } - // Merging in differencing disk is prohibited in our case. - if (parentVhd.footer.diskType !== HARD_DISK_TYPE_DYNAMIC) { - throw new Error('Unable to merge, parent is not a full backup.') - } + // Allocation table map is not yet implemented. + if ( + parentVhd.hasBlockAllocationTableMap() || + childVhd.hasBlockAllocationTableMap() + ) { + throw new Error('Unsupported allocation table map.') + } - // Allocation table map is not yet implemented. - if ( - parentVhd.hasBlockAllocationTableMap() || - childVhd.hasBlockAllocationTableMap() - ) { - throw new Error('Unsupported allocation table map.') - } + // Read allocation table of child/parent. + await Promise.all([parentVhd.readBlockTable(), childVhd.readBlockTable()]) - // Read allocation table of child/parent. - await Promise.all([parentVhd.readBlockTable(), childVhd.readBlockTable()]) + await parentVhd.ensureBatSize(childVhd.header.maxTableEntries) - await parentVhd.ensureBatSize(childVhd.header.maxTableEntries) + let mergedDataSize = 0 + for ( + let blockId = 0; + blockId < childVhd.header.maxTableEntries; + blockId++ + ) { + if (childVhd._getBatEntry(blockId) !== BLOCK_UNUSED) { + mergedDataSize += await parentVhd.coalesceBlock(childVhd, blockId) + } + } + const cFooter = childVhd.footer + const pFooter = parentVhd.footer - let mergedDataSize = 0 + pFooter.currentSize = { ...cFooter.currentSize } + pFooter.diskGeometry = { ...cFooter.diskGeometry } + pFooter.originalSize = { ...cFooter.originalSize } + pFooter.timestamp = cFooter.timestamp - for (let blockId = 0; blockId < childVhd.header.maxTableEntries; blockId++) { - if (childVhd._getBatEntry(blockId) !== BLOCK_UNUSED) { - mergedDataSize += await parentVhd.coalesceBlock(childVhd, blockId) + // necessary to update values and to recreate the footer after block + // creation + await parentVhd.writeFooter() + + return mergedDataSize + } finally { + await childHandler.closeFile(childVhd._fd) } + } finally { + await parentHandler.closeFile(parentVhd._fd) } - - const cFooter = childVhd.footer - const pFooter = parentVhd.footer - - pFooter.currentSize = { ...cFooter.currentSize } - pFooter.diskGeometry = { ...cFooter.diskGeometry } - pFooter.originalSize = { ...cFooter.originalSize } - pFooter.timestamp = cFooter.timestamp - - // necessary to update values and to recreate the footer after block - // creation - await parentVhd.writeFooter() - - return mergedDataSize }) // returns true if the child was actually modified diff --git a/packages/xo-server/src/vhd-test.js b/packages/xo-server/src/vhd-test.js new file mode 100644 index 000000000..db458421c --- /dev/null +++ b/packages/xo-server/src/vhd-test.js @@ -0,0 +1,72 @@ +import execa from 'execa' +import vhdMerge, { chainVhd, Vhd } from './vhd-merge' +import LocalHandler from './remote-handlers/local.js' + +async function testVhdMerge () { + console.log('before merge') + const moOfRandom = 4 + await execa('bash', [ + '-c', + `head -c ${moOfRandom}M < /dev/urandom >randomfile`, + ]) + await execa('bash', [ + '-c', + `head -c ${moOfRandom / 2}M < /dev/urandom >small_randomfile`, + ]) + await execa('qemu-img', [ + 'convert', + '-f', + 'raw', + '-Ovpc', + 'randomfile', + 'randomfile.vhd', + ]) + await execa('vhd-util', ['check', '-t', '-n', 'randomfile.vhd']) + await execa('vhd-util', ['create', '-s', moOfRandom, '-n', 'empty.vhd']) + // await execa('vhd-util', ['snapshot', '-n', 'randomfile_delta.vhd', '-p', 'randomfile.vhd']) + + const handler = new LocalHandler({ url: 'file://' + process.cwd() }) + const originalSize = await handler._getSize('randomfile') + await chainVhd(handler, 'empty.vhd', handler, 'randomfile.vhd') + const childVhd = new Vhd(handler, 'randomfile.vhd') + console.log('changing type') + await childVhd.readHeaderAndFooter() + console.log('child vhd', childVhd.footer.currentSize, originalSize) + await childVhd.readBlockTable() + childVhd.footer.diskType = 4 // Delta backup. + await childVhd.writeFooter() + console.log('chained') + await vhdMerge(handler, 'empty.vhd', handler, 'randomfile.vhd') + console.log('merged') + const parentVhd = new Vhd(handler, 'empty.vhd') + await parentVhd.readHeaderAndFooter() + console.log('parent vhd', parentVhd.footer.currentSize) + + await execa('qemu-img', [ + 'convert', + '-f', + 'vpc', + '-Oraw', + 'empty.vhd', + 'recovered', + ]) + await execa('truncate', ['-s', originalSize, 'recovered']) + console.log('ls', (await execa('ls', ['-lt'])).stdout) + console.log( + 'diff', + (await execa('diff', ['-q', 'randomfile', 'recovered'])).stdout + ) + + /* const vhd = new Vhd(handler, 'randomfile_delta.vhd') + await vhd.readHeaderAndFooter() + await vhd.readBlockTable() + console.log('vhd.header.maxTableEntries', vhd.header.maxTableEntries) + await vhd.ensureBatSize(300) + + console.log('vhd.header.maxTableEntries', vhd.header.maxTableEntries) + */ + console.log(await handler.list()) + console.log('lol') +} + +export { testVhdMerge as default }