feat(backups,vhd-lib): implement copyless merge (#6271)

This commit is contained in:
Florent BEAUCHAMP 2022-06-22 10:36:57 +02:00 committed by GitHub
parent 8a71f84733
commit fd752fee80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 167 additions and 57 deletions

View File

@ -59,30 +59,20 @@ async function mergeVhdChain(chain, { handler, logInfo, remove, merge }) {
let done, total
const handle = setInterval(() => {
if (done !== undefined) {
logInfo(`merging children in progress`, { children, parent, doneCount: done, totalCount: total})
logInfo(`merging children in progress`, { children, parent, doneCount: done, totalCount: total })
}
}, 10e3)
const mergedSize = await mergeVhd(handler, parent, handler, children, {
logInfo,
onProgress({ done: d, total: t }) {
done = d
total = t
},
remove,
})
clearInterval(handle)
const mergeTargetChild = children.shift()
await Promise.all([
VhdAbstract.rename(handler, parent, mergeTargetChild),
asyncMap(children, child => {
logInfo(`the VHD child is already merged`, { child })
if (remove) {
logInfo(`deleting merged VHD child`, { child })
return VhdAbstract.unlink(handler, child)
}
}),
])
return mergedSize
}
}

View File

@ -7,6 +7,8 @@
> Users must be able to say: “Nice enhancement, I'm eager to test it”
- [Backup] Merge delta backups without copying data when using VHD directories on NFS/SMB/local remote(https://github.com/vatesfr/xen-orchestra/pull/6271))
### Bug fixes
> Users must be able to say: “I had this issue, happy to know it's fixed”
@ -33,9 +35,11 @@
- @vates/event-listeners-manager patch
- @vates/read-chunk major
- @xen-orchestra/backups minor
- @xen-orchestra/xapi minor
- vhd-lib minor
- xo-remote-parser minor
- xo-server patch
- xo-server minor
- xo-vmdk-to-vhd patch
<!--packages-end-->

View File

@ -104,7 +104,7 @@ exports.VhdAbstract = class VhdAbstract {
*
* @returns {number} the merged data size
*/
async coalesceBlock(child, blockId) {
async mergeBlock(child, blockId) {
const block = await child.readBlock(blockId)
await this.writeEntireBlock(block)
return block.data.length

View File

@ -53,19 +53,25 @@ test('Can coalesce block', async () => {
const childDirectoryVhd = yield openVhd(handler, childDirectoryName)
await childDirectoryVhd.readBlockAllocationTable()
await parentVhd.coalesceBlock(childFileVhd, 0)
let childBlockData = (await childDirectoryVhd.readBlock(0)).data
await parentVhd.mergeBlock(childDirectoryVhd, 0)
await parentVhd.writeFooter()
await parentVhd.writeBlockAllocationTable()
let parentBlockData = (await parentVhd.readBlock(0)).data
let childBlockData = (await childFileVhd.readBlock(0)).data
// block should be present in parent
expect(parentBlockData.equals(childBlockData)).toEqual(true)
// block should not be in child since it's a rename for vhd directory
await expect(childDirectoryVhd.readBlock(0)).rejects.toThrowError()
await parentVhd.coalesceBlock(childDirectoryVhd, 0)
childBlockData = (await childFileVhd.readBlock(1)).data
await parentVhd.mergeBlock(childFileVhd, 1)
await parentVhd.writeFooter()
await parentVhd.writeBlockAllocationTable()
parentBlockData = (await parentVhd.readBlock(0)).data
childBlockData = (await childDirectoryVhd.readBlock(0)).data
expect(parentBlockData).toEqual(childBlockData)
parentBlockData = (await parentVhd.readBlock(1)).data
// block should be present in parent in case of mixed vhdfile/vhddirectory
expect(parentBlockData.equals(childBlockData)).toEqual(true)
// block should still be child
await childFileVhd.readBlock(1)
})
})

View File

@ -142,13 +142,13 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
return test(this.#blockTable, blockId)
}
_getChunkPath(partName) {
#getChunkPath(partName) {
return this._path + '/' + partName
}
async _readChunk(partName) {
// here we can implement compression and / or crypto
const buffer = await this._handler.readFile(this._getChunkPath(partName))
const buffer = await this._handler.readFile(this.#getChunkPath(partName))
const uncompressed = await this.#compressor.decompress(buffer)
return {
@ -164,16 +164,20 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
)
const compressed = await this.#compressor.compress(buffer)
return this._handler.outputFile(this._getChunkPath(partName), compressed, this._opts)
return this._handler.outputFile(this.#getChunkPath(partName), compressed, this._opts)
}
// put block in subdirectories to limit impact when doing directory listing
_getBlockPath(blockId) {
#getBlockPath(blockId) {
const blockPrefix = Math.floor(blockId / 1e3)
const blockSuffix = blockId - blockPrefix * 1e3
return `blocks/${blockPrefix}/${blockSuffix}`
}
_getFullBlockPath(blockId) {
return this.#getChunkPath(this.#getBlockPath(blockId))
}
async readHeaderAndFooter() {
await this.#readChunkFilters()
@ -200,7 +204,7 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
if (onlyBitmap) {
throw new Error(`reading 'bitmap of block' ${blockId} in a VhdDirectory is not implemented`)
}
const { buffer } = await this._readChunk(this._getBlockPath(blockId))
const { buffer } = await this._readChunk(this.#getBlockPath(blockId))
return {
id: blockId,
bitmap: buffer.slice(0, this.bitmapSize),
@ -240,25 +244,39 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
}
// only works if data are in the same handler
// and if the full block is modified in child ( which is the case whit xcp)
// and if the full block is modified in child ( which is the case with xcp)
// and if the compression type is same on both sides
async coalesceBlock(child, blockId) {
async mergeBlock(child, blockId, isResumingMerge = false) {
const childBlockPath = child._getFullBlockPath?.(blockId)
if (
!(child instanceof VhdDirectory) ||
childBlockPath !== undefined ||
this._handler !== child._handler ||
child.compressionType !== this.compressionType
child.compressionType !== this.compressionType ||
child.compressionType === 'MIXED'
) {
return super.coalesceBlock(child, blockId)
return super.mergeBlock(child, blockId)
}
await this._handler.copy(
child._getChunkPath(child._getBlockPath(blockId)),
this._getChunkPath(this._getBlockPath(blockId))
)
try {
await this._handler.rename(childBlockPath, this._getFullBlockPath(blockId))
} catch (error) {
if (error.code === 'ENOENT' && isResumingMerge === true) {
// when resuming, the blocks moved since the last merge state write are
// not in the child anymore but it should be ok
// it will throw an error if block is missing in parent
// won't detect if the block was already in parent and is broken/missing in child
const { data } = await this.readBlock(blockId)
assert.strictEqual(data.length, this.header.blockSize)
} else {
throw error
}
}
setBitmap(this.#blockTable, blockId)
return sectorsToBytes(this.sectorsPerBlock)
}
async writeEntireBlock(block) {
await this._writeChunk(this._getBlockPath(block.id), block.buffer)
await this._writeChunk(this.#getBlockPath(block.id), block.buffer)
setBitmap(this.#blockTable, block.id)
}

View File

@ -222,14 +222,14 @@ test('Can coalesce block', async () => {
const childDirectoryVhd = yield openVhd(handler, childDirectoryName)
await childDirectoryVhd.readBlockAllocationTable()
await parentVhd.coalesceBlock(childFileVhd, 0)
await parentVhd.mergeBlock(childFileVhd, 0)
await parentVhd.writeFooter()
await parentVhd.writeBlockAllocationTable()
let parentBlockData = (await parentVhd.readBlock(0)).data
let childBlockData = (await childFileVhd.readBlock(0)).data
expect(parentBlockData).toEqual(childBlockData)
await parentVhd.coalesceBlock(childDirectoryVhd, 0)
await parentVhd.mergeBlock(childDirectoryVhd, 0)
await parentVhd.writeFooter()
await parentVhd.writeBlockAllocationTable()
parentBlockData = (await parentVhd.readBlock(0)).data

View File

@ -43,6 +43,16 @@ const VhdSynthetic = class VhdSynthetic extends VhdAbstract {
}
}
get compressionType() {
const compressionType = this.vhds[0].compressionType
for (let i = 0; i < this.vhds.length; i++) {
if (compressionType !== this.vhds[i].compressionType) {
return 'MIXED'
}
}
return compressionType
}
/**
* @param {Array<VhdAbstract>} vhds the chain of Vhds used to compute this Vhd, from the deepest child (in position 0), to the root (in the last position)
* only the last one can have any type. Other must have type DISK_TYPES.DIFFERENCING (delta)
@ -74,17 +84,28 @@ const VhdSynthetic = class VhdSynthetic extends VhdAbstract {
}
}
async readBlock(blockId, onlyBitmap = false) {
#getVhdWithBlock(blockId) {
const index = this.#vhds.findIndex(vhd => vhd.containsBlock(blockId))
assert(index !== -1, `no such block ${blockId}`)
return this.#vhds[index]
}
async readBlock(blockId, onlyBitmap = false) {
// only read the content of the first vhd containing this block
return await this.#vhds[index].readBlock(blockId, onlyBitmap)
return await this.#getVhdWithBlock(blockId).readBlock(blockId, onlyBitmap)
}
async mergeBlock(child, blockId) {
throw new Error(`can't coalesce block into a vhd synthetic`)
}
_readParentLocatorData(id) {
return this.#vhds[this.#vhds.length - 1]._readParentLocatorData(id)
}
_getFullBlockPath(blockId) {
const vhd = this.#getVhdWithBlock(blockId)
return vhd?._getFullBlockPath(blockId)
}
}
// add decorated static method

View File

@ -6,7 +6,8 @@ exports.checkVhdChain = require('./checkChain')
exports.createReadableSparseStream = require('./createReadableSparseStream')
exports.createVhdStreamWithLength = require('./createVhdStreamWithLength')
exports.createVhdDirectoryFromStream = require('./createVhdDirectoryFromStream').createVhdDirectoryFromStream
exports.mergeVhd = require('./merge')
const { mergeVhd } = require('./merge')
exports.mergeVhd = mergeVhd
exports.peekFooterFromVhdStream = require('./peekFooterFromVhdStream')
exports.openVhd = require('./openVhd').openVhd
exports.VhdAbstract = require('./Vhd/VhdAbstract').VhdAbstract

View File

@ -9,6 +9,7 @@ const { getHandler } = require('@xen-orchestra/fs')
const { pFromCallback } = require('promise-toolbox')
const { VhdFile, chainVhd, mergeVhd } = require('./index')
const { _cleanupVhds: cleanupVhds } = require('./merge')
const { checkFile, createRandomFile, convertFromRawToVhd } = require('./tests/utils')
@ -38,14 +39,15 @@ test('merge works in normal cases', async () => {
await createRandomFile(`${tempDir}/${childRandomFileName}`, mbOfChildren)
await convertFromRawToVhd(`${tempDir}/${childRandomFileName}`, `${tempDir}/${child1FileName}`)
await chainVhd(handler, parentFileName, handler, child1FileName, true)
await checkFile(`${tempDir}/${parentFileName}`)
// merge
await mergeVhd(handler, parentFileName, handler, child1FileName)
// check that vhd is still valid
await checkFile(`${tempDir}/${parentFileName}`)
// check that the merged vhd is still valid
await checkFile(`${tempDir}/${child1FileName}`)
const parentVhd = new VhdFile(handler, parentFileName)
const parentVhd = new VhdFile(handler, child1FileName)
await parentVhd.readHeaderAndFooter()
await parentVhd.readBlockAllocationTable()
@ -138,11 +140,11 @@ test('it can resume a merge ', async () => {
await mergeVhd(handler, 'parent.vhd', handler, 'child1.vhd')
// reload header footer and block allocation table , they should succed
await parentVhd.readHeaderAndFooter()
await parentVhd.readBlockAllocationTable()
await childVhd.readHeaderAndFooter()
await childVhd.readBlockAllocationTable()
let offset = 0
// check that the data are the same as source
for await (const block of parentVhd.blocks()) {
for await (const block of childVhd.blocks()) {
const blockContent = block.data
// first block is marked as already merged, should not be modified
// second block should come from children
@ -153,7 +155,7 @@ test('it can resume a merge ', async () => {
await fs.read(fd, buffer, 0, buffer.length, offset)
expect(buffer.equals(blockContent)).toEqual(true)
offset += parentVhd.header.blockSize
offset += childVhd.header.blockSize
}
})
@ -183,9 +185,9 @@ test('it merge multiple child in one pass ', async () => {
await mergeVhd(handler, parentFileName, handler, [grandChildFileName, childFileName])
// check that vhd is still valid
await checkFile(parentFileName)
await checkFile(grandChildFileName)
const parentVhd = new VhdFile(handler, parentFileName)
const parentVhd = new VhdFile(handler, grandChildFileName)
await parentVhd.readHeaderAndFooter()
await parentVhd.readBlockAllocationTable()
@ -206,3 +208,21 @@ test('it merge multiple child in one pass ', async () => {
offset += parentVhd.header.blockSize
}
})
test('it cleans vhd mergedfiles', async () => {
const handler = getHandler({ url: `file://${tempDir}` })
await handler.writeFile('parent', 'parentData')
await handler.writeFile('child1', 'child1Data')
await handler.writeFile('child2', 'child2Data')
await handler.writeFile('child3', 'child3Data')
// childPath is from the grand children to the children
await cleanupVhds(handler, 'parent', ['child3', 'child2', 'child1'], { remove: true })
// only child3 should stay, with the data of parent
const [child3, ...other] = await handler.list('.')
expect(other.length).toEqual(0)
expect(child3).toEqual('child3')
expect((await handler.readFile('child3')).toString('utf8')).toEqual('parentData')
})

View File

@ -12,11 +12,35 @@ const { basename, dirname } = require('path')
const { DISK_TYPES } = require('./_constants')
const { Disposable } = require('promise-toolbox')
const { asyncEach } = require('@vates/async-each')
const { VhdAbstract } = require('./Vhd/VhdAbstract')
const { VhdDirectory } = require('./Vhd/VhdDirectory')
const { VhdSynthetic } = require('./Vhd/VhdSynthetic')
const { asyncMap } = require('@xen-orchestra/async-map')
const { warn } = createLogger('vhd-lib:merge')
// the chain we want to merge is [ ancestor, child1, ..., childn]
// this chain can have grand children or grand parent
//
// 1. Create a VhdSynthetic from all children if more than 1 child are merged
// 2. Merge the resulting vhd into the ancestor
// 2.a if at least one is a file : copy file part from child to parent
// 2.b if they are all vhd directory : move blocks from children to the ancestor
// 3. update the size, uuid and timestamp of the ancestor with those of child n
// 3. Delete all (now) unused VHDs
// 4. Rename the ancestor to to child n
//
// VhdSynthetic
// |
// /‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾\
// [ ancestor, child1, ...,child n-1, child n ]
// | \___________________/ ^
// | | |
// | unused VHDs |
// | |
// \___________rename_____________/
// write the merge progress file at most every `delay` seconds
function makeThrottledWriter(handler, path, delay) {
let lastWrite = Date.now()
return async json => {
@ -28,21 +52,45 @@ function makeThrottledWriter(handler, path, delay) {
}
}
// make the rename / delete part of the merge process
// will fail if parent and children are in different remote
function cleanupVhds(handler, parent, children, { logInfo = noop, remove = false } = {}) {
if (!Array.isArray(children)) {
children = [children]
}
const mergeTargetChild = children.shift()
return Promise.all([
VhdAbstract.rename(handler, parent, mergeTargetChild),
asyncMap(children, child => {
logInfo(`the VHD child is already merged`, { child })
if (remove) {
logInfo(`deleting merged VHD child`, { child })
return VhdAbstract.unlink(handler, child)
}
}),
])
}
module.exports._cleanupVhds = cleanupVhds
// Merge one or multiple vhd child into vhd parent.
// childPath can be array to create a synthetic VHD from multiple VHDs
// childPath is from the grand children to the children
//
// TODO: rename the VHD file during the merge
module.exports = limitConcurrency(2)(async function merge(
module.exports.mergeVhd = limitConcurrency(2)(async function merge(
parentHandler,
parentPath,
childHandler,
childPath,
{ onProgress = noop } = {}
{ onProgress = noop, logInfo = noop, remove } = {}
) {
const mergeStatePath = dirname(parentPath) + '/' + '.' + basename(parentPath) + '.merge.json'
return await Disposable.use(async function* () {
let mergeState
let isResuming = false
try {
const mergeStateContent = await parentHandler.readFile(mergeStatePath)
mergeState = JSON.parse(mergeStateContent)
@ -75,6 +123,7 @@ module.exports = limitConcurrency(2)(async function merge(
assert.strictEqual(childVhd.footer.diskType, DISK_TYPES.DIFFERENCING)
assert.strictEqual(childVhd.header.blockSize, parentVhd.header.blockSize)
} else {
isResuming = true
// vhd should not have changed to resume
assert.strictEqual(parentVhd.header.checksum, mergeState.parent.header)
assert.strictEqual(childVhd.header.checksum, mergeState.child.header)
@ -115,12 +164,12 @@ module.exports = limitConcurrency(2)(async function merge(
let counter = 0
const mergeStateWriter = makeThrottledWriter(parentHandler, mergeStatePath, 10e3)
await asyncEach(
toMerge,
async blockId => {
merging.add(blockId)
mergeState.mergedDataSize += await parentVhd.coalesceBlock(childVhd, blockId)
mergeState.mergedDataSize += await parentVhd.mergeBlock(childVhd, blockId, isResuming)
merging.delete(blockId)
onProgress({
@ -155,6 +204,8 @@ module.exports = limitConcurrency(2)(async function merge(
// should be a disposable
parentHandler.unlink(mergeStatePath).catch(warn)
await cleanupVhds(parentHandler, parentPath, childPath, { logInfo, remove })
return mergeState.mergedDataSize
})
})

View File

@ -85,10 +85,9 @@ async function convertToVhdDirectory(rawFileName, vhdFileName, path) {
await fs.mkdir(path + '/blocks/0/')
const stats = await fs.stat(rawFileName)
const sizeMB = stats.size / 1024 / 1024
for (let i = 0, offset = 0; i < sizeMB; i++, offset += blockDataSize) {
for (let i = 0, offset = 0; offset < stats.size; i++, offset += blockDataSize) {
const blockData = Buffer.alloc(blockDataSize)
await fs.read(srcRaw, blockData, offset)
await fs.read(srcRaw, blockData, 0, blockData.length, offset)
await fs.writeFile(path + '/blocks/0/' + i, Buffer.concat([bitmap, blockData]))
}
await fs.close(srcRaw)