feat(xo-server): improve VHD merge speed (#2643)

Avoid re-opening/closing the files multiple times, introduce a lot of latency in remote FS.
This commit is contained in:
Nicolas Raynaud 2018-03-02 11:08:01 -07:00 committed by Julien Fontanet
parent baf6d30348
commit e76a0ad4bd
6 changed files with 287 additions and 104 deletions

View File

@ -0,0 +1,13 @@
#!/usr/bin/env node
'use strict'
global.Promise = require('bluebird')
process.on('unhandledRejection', function (reason) {
console.warn('[Warn] Possibly unhandled rejection:', reason && reason.stack || reason)
})
require("exec-promise")(require("../dist/vhd-test").default)

View File

@ -112,6 +112,7 @@ export default class RemoteHandlerAbstract {
file,
{ checksum = false, ignoreMissingChecksum = false, ...options } = {}
) {
const path = typeof file === 'string' ? file : file.path
const streamP = this._createReadStream(file, options).then(stream => {
// detect early errors
let promise = eventToPromise(stream, 'readable')
@ -142,7 +143,7 @@ export default class RemoteHandlerAbstract {
// avoid a unhandled rejection warning
;streamP::ignoreErrors()
return this.readFile(`${file}.checksum`).then(
return this.readFile(`${path}.checksum`).then(
checksum =>
streamP.then(stream => {
const { length } = stream
@ -164,6 +165,22 @@ export default class RemoteHandlerAbstract {
throw new Error('Not implemented')
}
async openFile (path, flags) {
return { fd: await this._openFile(path, flags), path }
}
async _openFile (path, flags) {
throw new Error('Not implemented')
}
async closeFile (fd) {
return this._closeFile(fd.fd)
}
async _closeFile (fd) {
throw new Error('Not implemented')
}
async refreshChecksum (path) {
const stream = addChecksumToReadStream(await this.createReadStream(path))
stream.resume() // start reading the whole file
@ -172,6 +189,7 @@ export default class RemoteHandlerAbstract {
}
async createOutputStream (file, { checksum = false, ...options } = {}) {
const path = typeof file === 'string' ? file : file.path
const streamP = this._createOutputStream(file, {
flags: 'wx',
...options,
@ -192,7 +210,7 @@ export default class RemoteHandlerAbstract {
streamWithChecksum.pipe(stream)
streamWithChecksum.checksum
.then(value => this.outputFile(`${file}.checksum`, value))
.then(value => this.outputFile(`${path}.checksum`, value))
.catch(forwardError)
return connectorStream

View File

@ -63,13 +63,29 @@ export default class LocalHandler extends RemoteHandlerAbstract {
}
async _createReadStream (file, options) {
if (typeof file === 'string') {
return fs.createReadStream(this._getFilePath(file), options)
} else {
return fs.createReadStream('', {
autoClose: false,
...options,
fd: file.fd,
})
}
}
async _createOutputStream (file, options) {
if (typeof file === 'string') {
const path = this._getFilePath(file)
await fs.ensureDir(dirname(path))
return fs.createWriteStream(path, options)
} else {
return fs.createWriteStream('', {
autoClose: false,
...options,
fd: file.fd,
})
}
}
async _unlink (file) {
@ -82,7 +98,17 @@ export default class LocalHandler extends RemoteHandlerAbstract {
}
async _getSize (file) {
const stats = await fs.stat(this._getFilePath(file))
const stats = await fs.stat(
this._getFilePath(typeof file === 'string' ? file : file.path)
)
return stats.size
}
async _openFile (path, flags) {
return fs.open(this._getFilePath(path), flags)
}
async _closeFile (fd) {
return fs.close(fd)
}
}

View File

@ -139,6 +139,9 @@ export default class SmbHandler extends RemoteHandlerAbstract {
}
async _createReadStream (file, options = {}) {
if (typeof file !== 'string') {
file = file.path
}
const client = this._getClient(this._remote)
let stream
@ -154,6 +157,9 @@ export default class SmbHandler extends RemoteHandlerAbstract {
}
async _createOutputStream (file, options = {}) {
if (typeof file !== 'string') {
file = file.path
}
const client = this._getClient(this._remote)
const path = this._getFilePath(file)
const dir = this._dirname(path)
@ -188,7 +194,9 @@ export default class SmbHandler extends RemoteHandlerAbstract {
let size
try {
size = await client.getSize(this._getFilePath(file))::pFinally(() => {
size = await client
.getSize(this._getFilePath(typeof file === 'string' ? file : file.path))
::pFinally(() => {
client.close()
})
} catch (error) {
@ -197,4 +205,11 @@ export default class SmbHandler extends RemoteHandlerAbstract {
return size
}
// this is a fake
async _openFile (path) {
return this._getFilePath(path)
}
async _closeFile (fd) {}
}

View File

@ -182,7 +182,7 @@ function checksumStruct (rawStruct, struct) {
// ===================================================================
class Vhd {
export class Vhd {
constructor (handler, path) {
this._handler = handler
this._path = path
@ -193,7 +193,7 @@ class Vhd {
// =================================================================
_readStream (start, n) {
return this._handler.createReadStream(this._path, {
return this._handler.createReadStream(this._fd ? this._fd : this._path, {
start,
end: start + n - 1, // end is inclusive
})
@ -328,10 +328,12 @@ class Vhd {
).then(
buf =>
onlyBitmap
? { bitmap: buf }
? { id: blockId, bitmap: buf }
: {
id: blockId,
bitmap: buf.slice(0, this.bitmapSize),
data: buf.slice(this.bitmapSize),
buffer: buf,
}
)
}
@ -339,7 +341,6 @@ class Vhd {
// get the identifiers and first sectors of the first and last block
// in the file
//
// return undefined if none
_getFirstAndLastBlocks () {
const n = this.header.maxTableEntries
const bat = this.blockTable
@ -353,7 +354,9 @@ class Vhd {
j += VHD_ENTRY_SIZE
if (i === n) {
throw new Error('no allocated block found')
const error = new Error('no allocated block found')
error.noBlock = true
throw error
}
}
lastSector = firstSector
@ -383,27 +386,26 @@ class Vhd {
// =================================================================
// Write a buffer/stream at a given position in a vhd file.
_write (data, offset) {
async _write (data, offset) {
debug(
`_write offset=${offset} size=${
Buffer.isBuffer(data) ? data.length : '???'
}`
)
// TODO: could probably be merged in remote handlers.
return this._handler
.createOutputStream(this._path, {
const stream = await this._handler.createOutputStream(
this._fd ? this._fd : this._path,
{
flags: 'r+',
start: offset,
})
.then(
Buffer.isBuffer(data)
? stream =>
new Promise((resolve, reject) => {
}
)
return Buffer.isBuffer(data)
? new Promise((resolve, reject) => {
stream.on('error', reject)
stream.end(data, resolve)
})
: stream => eventToPromise(data.pipe(stream), 'finish')
)
: eventToPromise(data.pipe(stream), 'finish')
}
async ensureBatSize (size) {
@ -415,11 +417,11 @@ class Vhd {
}
const tableOffset = uint32ToUint64(header.tableOffset)
const { first, firstSector, lastSector } = this._getFirstAndLastBlocks()
// extend BAT
const maxTableEntries = (header.maxTableEntries = size)
const batSize = maxTableEntries * VHD_ENTRY_SIZE
const batSize = sectorsToBytes(
sectorsRoundUpNoZero(maxTableEntries * VHD_ENTRY_SIZE)
)
const prevBat = this.blockTable
const bat = (this.blockTable = Buffer.allocUnsafe(batSize))
prevBat.copy(bat)
@ -428,7 +430,7 @@ class Vhd {
`ensureBatSize: extend in memory BAT ${prevMaxTableEntries} -> ${maxTableEntries}`
)
const extendBat = () => {
const extendBat = async () => {
debug(
`ensureBatSize: extend in file BAT ${prevMaxTableEntries} -> ${maxTableEntries}`
)
@ -438,25 +440,37 @@ class Vhd {
tableOffset + prevBat.length
)
}
try {
const { first, firstSector, lastSector } = this._getFirstAndLastBlocks()
if (tableOffset + batSize < sectorsToBytes(firstSector)) {
return Promise.all([extendBat(), this.writeHeader()])
}
const { fullBlockSize } = this
const newFirstSector = lastSector + fullBlockSize / VHD_SECTOR_SIZE
debug(`ensureBatSize: move first block ${firstSector} -> ${newFirstSector}`)
debug(
`ensureBatSize: move first block ${firstSector} -> ${newFirstSector}`
)
return Promise.all([
// copy the first block at the end
this._readStream(sectorsToBytes(firstSector), fullBlockSize)
.then(stream => this._write(stream, sectorsToBytes(newFirstSector)))
.then(extendBat),
this._setBatEntry(first, newFirstSector),
this.writeHeader(),
this.writeFooter(),
])
const stream = await this._readStream(
sectorsToBytes(firstSector),
fullBlockSize
)
await this._write(stream, sectorsToBytes(newFirstSector))
await extendBat()
await this._setBatEntry(first, newFirstSector)
await this.writeHeader()
await this.writeFooter()
} catch (e) {
if (e.noBlock) {
await extendBat()
await this.writeHeader()
await this.writeFooter()
} else {
throw e
}
}
}
// set the first sector (bitmap) of a block
@ -510,7 +524,16 @@ class Vhd {
await this._write(bitmap, sectorsToBytes(blockAddr))
}
async writeBlockSectors (block, beginSectorId, endSectorId) {
async writeEntireBlock (block) {
let blockAddr = this._getBatEntry(block.id)
if (blockAddr === BLOCK_UNUSED) {
blockAddr = await this.createBlock(block.id)
}
await this._write(block.buffer, sectorsToBytes(blockAddr))
}
async writeBlockSectors (block, beginSectorId, endSectorId, parentBitmap) {
let blockAddr = this._getBatEntry(block.id)
if (blockAddr === BLOCK_UNUSED) {
@ -525,6 +548,11 @@ class Vhd {
}, sectors=${beginSectorId}...${endSectorId}`
)
for (let i = beginSectorId; i < endSectorId; ++i) {
mapSetBit(parentBitmap, i)
}
await this.writeBlockBitmap(blockAddr, parentBitmap)
await this._write(
block.data.slice(
sectorsToBytes(beginSectorId),
@ -532,20 +560,11 @@ class Vhd {
),
sectorsToBytes(offset)
)
const { bitmap } = await this._readBlock(block.id, true)
for (let i = beginSectorId; i < endSectorId; ++i) {
mapSetBit(bitmap, i)
}
await this.writeBlockBitmap(blockAddr, bitmap)
}
// Merge block id (of vhd child) into vhd parent.
async coalesceBlock (child, blockId) {
// Get block data and bitmap of block id.
const { bitmap, data } = await child._readBlock(blockId)
const block = await child._readBlock(blockId)
const { bitmap, data } = block
debug(`coalesceBlock block=${blockId}`)
@ -556,7 +575,7 @@ class Vhd {
if (!mapTestBit(bitmap, i)) {
continue
}
let parentBitmap = null
let endSector = i + 1
// Count changed sectors.
@ -566,7 +585,16 @@ class Vhd {
// Write n sectors into parent.
debug(`coalesceBlock: write sectors=${i}...${endSector}`)
await this.writeBlockSectors({ id: blockId, data }, i, endSector)
const isFullBlock = i === 0 && endSector === sectorsPerBlock
if (isFullBlock) {
await this.writeEntireBlock(block)
} else {
if (parentBitmap === null) {
parentBitmap = (await this._readBlock(blockId, true)).bitmap
}
await this.writeBlockSectors(block, i, endSector, parentBitmap)
}
i = endSector
}
@ -620,8 +648,11 @@ export default concurrency(2)(async function vhdMerge (
childPath
) {
const parentVhd = new Vhd(parentHandler, parentPath)
parentVhd._fd = await parentHandler.openFile(parentPath, 'r+')
try {
const childVhd = new Vhd(childHandler, childPath)
childVhd._fd = await childHandler.openFile(childPath, 'r')
try {
// Reading footer and header.
await Promise.all([
parentVhd.readHeaderAndFooter(),
@ -654,13 +685,15 @@ export default concurrency(2)(async function vhdMerge (
await parentVhd.ensureBatSize(childVhd.header.maxTableEntries)
let mergedDataSize = 0
for (let blockId = 0; blockId < childVhd.header.maxTableEntries; blockId++) {
for (
let blockId = 0;
blockId < childVhd.header.maxTableEntries;
blockId++
) {
if (childVhd._getBatEntry(blockId) !== BLOCK_UNUSED) {
mergedDataSize += await parentVhd.coalesceBlock(childVhd, blockId)
}
}
const cFooter = childVhd.footer
const pFooter = parentVhd.footer
@ -674,6 +707,12 @@ export default concurrency(2)(async function vhdMerge (
await parentVhd.writeFooter()
return mergedDataSize
} finally {
await childHandler.closeFile(childVhd._fd)
}
} finally {
await parentHandler.closeFile(parentVhd._fd)
}
})
// returns true if the child was actually modified

View File

@ -0,0 +1,72 @@
import execa from 'execa'
import vhdMerge, { chainVhd, Vhd } from './vhd-merge'
import LocalHandler from './remote-handlers/local.js'
async function testVhdMerge () {
console.log('before merge')
const moOfRandom = 4
await execa('bash', [
'-c',
`head -c ${moOfRandom}M < /dev/urandom >randomfile`,
])
await execa('bash', [
'-c',
`head -c ${moOfRandom / 2}M < /dev/urandom >small_randomfile`,
])
await execa('qemu-img', [
'convert',
'-f',
'raw',
'-Ovpc',
'randomfile',
'randomfile.vhd',
])
await execa('vhd-util', ['check', '-t', '-n', 'randomfile.vhd'])
await execa('vhd-util', ['create', '-s', moOfRandom, '-n', 'empty.vhd'])
// await execa('vhd-util', ['snapshot', '-n', 'randomfile_delta.vhd', '-p', 'randomfile.vhd'])
const handler = new LocalHandler({ url: 'file://' + process.cwd() })
const originalSize = await handler._getSize('randomfile')
await chainVhd(handler, 'empty.vhd', handler, 'randomfile.vhd')
const childVhd = new Vhd(handler, 'randomfile.vhd')
console.log('changing type')
await childVhd.readHeaderAndFooter()
console.log('child vhd', childVhd.footer.currentSize, originalSize)
await childVhd.readBlockTable()
childVhd.footer.diskType = 4 // Delta backup.
await childVhd.writeFooter()
console.log('chained')
await vhdMerge(handler, 'empty.vhd', handler, 'randomfile.vhd')
console.log('merged')
const parentVhd = new Vhd(handler, 'empty.vhd')
await parentVhd.readHeaderAndFooter()
console.log('parent vhd', parentVhd.footer.currentSize)
await execa('qemu-img', [
'convert',
'-f',
'vpc',
'-Oraw',
'empty.vhd',
'recovered',
])
await execa('truncate', ['-s', originalSize, 'recovered'])
console.log('ls', (await execa('ls', ['-lt'])).stdout)
console.log(
'diff',
(await execa('diff', ['-q', 'randomfile', 'recovered'])).stdout
)
/* const vhd = new Vhd(handler, 'randomfile_delta.vhd')
await vhd.readHeaderAndFooter()
await vhd.readBlockTable()
console.log('vhd.header.maxTableEntries', vhd.header.maxTableEntries)
await vhd.ensureBatSize(300)
console.log('vhd.header.maxTableEntries', vhd.header.maxTableEntries)
*/
console.log(await handler.list())
console.log('lol')
}
export { testVhdMerge as default }