feat(backups,vhd-lib): implement copyless merge (#6271)
This commit is contained in:
parent
8a71f84733
commit
fd752fee80
@ -59,30 +59,20 @@ async function mergeVhdChain(chain, { handler, logInfo, remove, merge }) {
|
||||
let done, total
|
||||
const handle = setInterval(() => {
|
||||
if (done !== undefined) {
|
||||
logInfo(`merging children in progress`, { children, parent, doneCount: done, totalCount: total})
|
||||
logInfo(`merging children in progress`, { children, parent, doneCount: done, totalCount: total })
|
||||
}
|
||||
}, 10e3)
|
||||
|
||||
const mergedSize = await mergeVhd(handler, parent, handler, children, {
|
||||
logInfo,
|
||||
onProgress({ done: d, total: t }) {
|
||||
done = d
|
||||
total = t
|
||||
},
|
||||
remove,
|
||||
})
|
||||
|
||||
clearInterval(handle)
|
||||
const mergeTargetChild = children.shift()
|
||||
await Promise.all([
|
||||
VhdAbstract.rename(handler, parent, mergeTargetChild),
|
||||
asyncMap(children, child => {
|
||||
logInfo(`the VHD child is already merged`, { child })
|
||||
if (remove) {
|
||||
logInfo(`deleting merged VHD child`, { child })
|
||||
return VhdAbstract.unlink(handler, child)
|
||||
}
|
||||
}),
|
||||
])
|
||||
|
||||
return mergedSize
|
||||
}
|
||||
}
|
||||
|
@ -7,6 +7,8 @@
|
||||
|
||||
> Users must be able to say: “Nice enhancement, I'm eager to test it”
|
||||
|
||||
- [Backup] Merge delta backups without copying data when using VHD directories on NFS/SMB/local remote(https://github.com/vatesfr/xen-orchestra/pull/6271))
|
||||
|
||||
### Bug fixes
|
||||
|
||||
> Users must be able to say: “I had this issue, happy to know it's fixed”
|
||||
@ -33,9 +35,11 @@
|
||||
|
||||
- @vates/event-listeners-manager patch
|
||||
- @vates/read-chunk major
|
||||
- @xen-orchestra/backups minor
|
||||
- @xen-orchestra/xapi minor
|
||||
- vhd-lib minor
|
||||
- xo-remote-parser minor
|
||||
- xo-server patch
|
||||
- xo-server minor
|
||||
- xo-vmdk-to-vhd patch
|
||||
|
||||
<!--packages-end-->
|
||||
|
@ -104,7 +104,7 @@ exports.VhdAbstract = class VhdAbstract {
|
||||
*
|
||||
* @returns {number} the merged data size
|
||||
*/
|
||||
async coalesceBlock(child, blockId) {
|
||||
async mergeBlock(child, blockId) {
|
||||
const block = await child.readBlock(blockId)
|
||||
await this.writeEntireBlock(block)
|
||||
return block.data.length
|
||||
|
@ -53,19 +53,25 @@ test('Can coalesce block', async () => {
|
||||
const childDirectoryVhd = yield openVhd(handler, childDirectoryName)
|
||||
await childDirectoryVhd.readBlockAllocationTable()
|
||||
|
||||
await parentVhd.coalesceBlock(childFileVhd, 0)
|
||||
let childBlockData = (await childDirectoryVhd.readBlock(0)).data
|
||||
await parentVhd.mergeBlock(childDirectoryVhd, 0)
|
||||
await parentVhd.writeFooter()
|
||||
await parentVhd.writeBlockAllocationTable()
|
||||
let parentBlockData = (await parentVhd.readBlock(0)).data
|
||||
let childBlockData = (await childFileVhd.readBlock(0)).data
|
||||
// block should be present in parent
|
||||
expect(parentBlockData.equals(childBlockData)).toEqual(true)
|
||||
// block should not be in child since it's a rename for vhd directory
|
||||
await expect(childDirectoryVhd.readBlock(0)).rejects.toThrowError()
|
||||
|
||||
await parentVhd.coalesceBlock(childDirectoryVhd, 0)
|
||||
childBlockData = (await childFileVhd.readBlock(1)).data
|
||||
await parentVhd.mergeBlock(childFileVhd, 1)
|
||||
await parentVhd.writeFooter()
|
||||
await parentVhd.writeBlockAllocationTable()
|
||||
parentBlockData = (await parentVhd.readBlock(0)).data
|
||||
childBlockData = (await childDirectoryVhd.readBlock(0)).data
|
||||
expect(parentBlockData).toEqual(childBlockData)
|
||||
parentBlockData = (await parentVhd.readBlock(1)).data
|
||||
// block should be present in parent in case of mixed vhdfile/vhddirectory
|
||||
expect(parentBlockData.equals(childBlockData)).toEqual(true)
|
||||
// block should still be child
|
||||
await childFileVhd.readBlock(1)
|
||||
})
|
||||
})
|
||||
|
||||
|
@ -142,13 +142,13 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
|
||||
return test(this.#blockTable, blockId)
|
||||
}
|
||||
|
||||
_getChunkPath(partName) {
|
||||
#getChunkPath(partName) {
|
||||
return this._path + '/' + partName
|
||||
}
|
||||
|
||||
async _readChunk(partName) {
|
||||
// here we can implement compression and / or crypto
|
||||
const buffer = await this._handler.readFile(this._getChunkPath(partName))
|
||||
const buffer = await this._handler.readFile(this.#getChunkPath(partName))
|
||||
|
||||
const uncompressed = await this.#compressor.decompress(buffer)
|
||||
return {
|
||||
@ -164,16 +164,20 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
|
||||
)
|
||||
|
||||
const compressed = await this.#compressor.compress(buffer)
|
||||
return this._handler.outputFile(this._getChunkPath(partName), compressed, this._opts)
|
||||
return this._handler.outputFile(this.#getChunkPath(partName), compressed, this._opts)
|
||||
}
|
||||
|
||||
// put block in subdirectories to limit impact when doing directory listing
|
||||
_getBlockPath(blockId) {
|
||||
#getBlockPath(blockId) {
|
||||
const blockPrefix = Math.floor(blockId / 1e3)
|
||||
const blockSuffix = blockId - blockPrefix * 1e3
|
||||
return `blocks/${blockPrefix}/${blockSuffix}`
|
||||
}
|
||||
|
||||
_getFullBlockPath(blockId) {
|
||||
return this.#getChunkPath(this.#getBlockPath(blockId))
|
||||
}
|
||||
|
||||
async readHeaderAndFooter() {
|
||||
await this.#readChunkFilters()
|
||||
|
||||
@ -200,7 +204,7 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
|
||||
if (onlyBitmap) {
|
||||
throw new Error(`reading 'bitmap of block' ${blockId} in a VhdDirectory is not implemented`)
|
||||
}
|
||||
const { buffer } = await this._readChunk(this._getBlockPath(blockId))
|
||||
const { buffer } = await this._readChunk(this.#getBlockPath(blockId))
|
||||
return {
|
||||
id: blockId,
|
||||
bitmap: buffer.slice(0, this.bitmapSize),
|
||||
@ -240,25 +244,39 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
|
||||
}
|
||||
|
||||
// only works if data are in the same handler
|
||||
// and if the full block is modified in child ( which is the case whit xcp)
|
||||
// and if the full block is modified in child ( which is the case with xcp)
|
||||
// and if the compression type is same on both sides
|
||||
async coalesceBlock(child, blockId) {
|
||||
async mergeBlock(child, blockId, isResumingMerge = false) {
|
||||
const childBlockPath = child._getFullBlockPath?.(blockId)
|
||||
if (
|
||||
!(child instanceof VhdDirectory) ||
|
||||
childBlockPath !== undefined ||
|
||||
this._handler !== child._handler ||
|
||||
child.compressionType !== this.compressionType
|
||||
child.compressionType !== this.compressionType ||
|
||||
child.compressionType === 'MIXED'
|
||||
) {
|
||||
return super.coalesceBlock(child, blockId)
|
||||
return super.mergeBlock(child, blockId)
|
||||
}
|
||||
await this._handler.copy(
|
||||
child._getChunkPath(child._getBlockPath(blockId)),
|
||||
this._getChunkPath(this._getBlockPath(blockId))
|
||||
)
|
||||
try {
|
||||
await this._handler.rename(childBlockPath, this._getFullBlockPath(blockId))
|
||||
} catch (error) {
|
||||
if (error.code === 'ENOENT' && isResumingMerge === true) {
|
||||
// when resuming, the blocks moved since the last merge state write are
|
||||
// not in the child anymore but it should be ok
|
||||
|
||||
// it will throw an error if block is missing in parent
|
||||
// won't detect if the block was already in parent and is broken/missing in child
|
||||
const { data } = await this.readBlock(blockId)
|
||||
assert.strictEqual(data.length, this.header.blockSize)
|
||||
} else {
|
||||
throw error
|
||||
}
|
||||
}
|
||||
setBitmap(this.#blockTable, blockId)
|
||||
return sectorsToBytes(this.sectorsPerBlock)
|
||||
}
|
||||
|
||||
async writeEntireBlock(block) {
|
||||
await this._writeChunk(this._getBlockPath(block.id), block.buffer)
|
||||
await this._writeChunk(this.#getBlockPath(block.id), block.buffer)
|
||||
setBitmap(this.#blockTable, block.id)
|
||||
}
|
||||
|
||||
|
@ -222,14 +222,14 @@ test('Can coalesce block', async () => {
|
||||
const childDirectoryVhd = yield openVhd(handler, childDirectoryName)
|
||||
await childDirectoryVhd.readBlockAllocationTable()
|
||||
|
||||
await parentVhd.coalesceBlock(childFileVhd, 0)
|
||||
await parentVhd.mergeBlock(childFileVhd, 0)
|
||||
await parentVhd.writeFooter()
|
||||
await parentVhd.writeBlockAllocationTable()
|
||||
let parentBlockData = (await parentVhd.readBlock(0)).data
|
||||
let childBlockData = (await childFileVhd.readBlock(0)).data
|
||||
expect(parentBlockData).toEqual(childBlockData)
|
||||
|
||||
await parentVhd.coalesceBlock(childDirectoryVhd, 0)
|
||||
await parentVhd.mergeBlock(childDirectoryVhd, 0)
|
||||
await parentVhd.writeFooter()
|
||||
await parentVhd.writeBlockAllocationTable()
|
||||
parentBlockData = (await parentVhd.readBlock(0)).data
|
||||
|
@ -43,6 +43,16 @@ const VhdSynthetic = class VhdSynthetic extends VhdAbstract {
|
||||
}
|
||||
}
|
||||
|
||||
get compressionType() {
|
||||
const compressionType = this.vhds[0].compressionType
|
||||
for (let i = 0; i < this.vhds.length; i++) {
|
||||
if (compressionType !== this.vhds[i].compressionType) {
|
||||
return 'MIXED'
|
||||
}
|
||||
}
|
||||
return compressionType
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {Array<VhdAbstract>} vhds the chain of Vhds used to compute this Vhd, from the deepest child (in position 0), to the root (in the last position)
|
||||
* only the last one can have any type. Other must have type DISK_TYPES.DIFFERENCING (delta)
|
||||
@ -74,17 +84,28 @@ const VhdSynthetic = class VhdSynthetic extends VhdAbstract {
|
||||
}
|
||||
}
|
||||
|
||||
async readBlock(blockId, onlyBitmap = false) {
|
||||
#getVhdWithBlock(blockId) {
|
||||
const index = this.#vhds.findIndex(vhd => vhd.containsBlock(blockId))
|
||||
assert(index !== -1, `no such block ${blockId}`)
|
||||
return this.#vhds[index]
|
||||
}
|
||||
|
||||
async readBlock(blockId, onlyBitmap = false) {
|
||||
// only read the content of the first vhd containing this block
|
||||
return await this.#vhds[index].readBlock(blockId, onlyBitmap)
|
||||
return await this.#getVhdWithBlock(blockId).readBlock(blockId, onlyBitmap)
|
||||
}
|
||||
|
||||
async mergeBlock(child, blockId) {
|
||||
throw new Error(`can't coalesce block into a vhd synthetic`)
|
||||
}
|
||||
|
||||
_readParentLocatorData(id) {
|
||||
return this.#vhds[this.#vhds.length - 1]._readParentLocatorData(id)
|
||||
}
|
||||
_getFullBlockPath(blockId) {
|
||||
const vhd = this.#getVhdWithBlock(blockId)
|
||||
return vhd?._getFullBlockPath(blockId)
|
||||
}
|
||||
}
|
||||
|
||||
// add decorated static method
|
||||
|
@ -6,7 +6,8 @@ exports.checkVhdChain = require('./checkChain')
|
||||
exports.createReadableSparseStream = require('./createReadableSparseStream')
|
||||
exports.createVhdStreamWithLength = require('./createVhdStreamWithLength')
|
||||
exports.createVhdDirectoryFromStream = require('./createVhdDirectoryFromStream').createVhdDirectoryFromStream
|
||||
exports.mergeVhd = require('./merge')
|
||||
const { mergeVhd } = require('./merge')
|
||||
exports.mergeVhd = mergeVhd
|
||||
exports.peekFooterFromVhdStream = require('./peekFooterFromVhdStream')
|
||||
exports.openVhd = require('./openVhd').openVhd
|
||||
exports.VhdAbstract = require('./Vhd/VhdAbstract').VhdAbstract
|
||||
|
@ -9,6 +9,7 @@ const { getHandler } = require('@xen-orchestra/fs')
|
||||
const { pFromCallback } = require('promise-toolbox')
|
||||
|
||||
const { VhdFile, chainVhd, mergeVhd } = require('./index')
|
||||
const { _cleanupVhds: cleanupVhds } = require('./merge')
|
||||
|
||||
const { checkFile, createRandomFile, convertFromRawToVhd } = require('./tests/utils')
|
||||
|
||||
@ -38,14 +39,15 @@ test('merge works in normal cases', async () => {
|
||||
await createRandomFile(`${tempDir}/${childRandomFileName}`, mbOfChildren)
|
||||
await convertFromRawToVhd(`${tempDir}/${childRandomFileName}`, `${tempDir}/${child1FileName}`)
|
||||
await chainVhd(handler, parentFileName, handler, child1FileName, true)
|
||||
await checkFile(`${tempDir}/${parentFileName}`)
|
||||
|
||||
// merge
|
||||
await mergeVhd(handler, parentFileName, handler, child1FileName)
|
||||
|
||||
// check that vhd is still valid
|
||||
await checkFile(`${tempDir}/${parentFileName}`)
|
||||
// check that the merged vhd is still valid
|
||||
await checkFile(`${tempDir}/${child1FileName}`)
|
||||
|
||||
const parentVhd = new VhdFile(handler, parentFileName)
|
||||
const parentVhd = new VhdFile(handler, child1FileName)
|
||||
await parentVhd.readHeaderAndFooter()
|
||||
await parentVhd.readBlockAllocationTable()
|
||||
|
||||
@ -138,11 +140,11 @@ test('it can resume a merge ', async () => {
|
||||
await mergeVhd(handler, 'parent.vhd', handler, 'child1.vhd')
|
||||
|
||||
// reload header footer and block allocation table , they should succed
|
||||
await parentVhd.readHeaderAndFooter()
|
||||
await parentVhd.readBlockAllocationTable()
|
||||
await childVhd.readHeaderAndFooter()
|
||||
await childVhd.readBlockAllocationTable()
|
||||
let offset = 0
|
||||
// check that the data are the same as source
|
||||
for await (const block of parentVhd.blocks()) {
|
||||
for await (const block of childVhd.blocks()) {
|
||||
const blockContent = block.data
|
||||
// first block is marked as already merged, should not be modified
|
||||
// second block should come from children
|
||||
@ -153,7 +155,7 @@ test('it can resume a merge ', async () => {
|
||||
await fs.read(fd, buffer, 0, buffer.length, offset)
|
||||
|
||||
expect(buffer.equals(blockContent)).toEqual(true)
|
||||
offset += parentVhd.header.blockSize
|
||||
offset += childVhd.header.blockSize
|
||||
}
|
||||
})
|
||||
|
||||
@ -183,9 +185,9 @@ test('it merge multiple child in one pass ', async () => {
|
||||
await mergeVhd(handler, parentFileName, handler, [grandChildFileName, childFileName])
|
||||
|
||||
// check that vhd is still valid
|
||||
await checkFile(parentFileName)
|
||||
await checkFile(grandChildFileName)
|
||||
|
||||
const parentVhd = new VhdFile(handler, parentFileName)
|
||||
const parentVhd = new VhdFile(handler, grandChildFileName)
|
||||
await parentVhd.readHeaderAndFooter()
|
||||
await parentVhd.readBlockAllocationTable()
|
||||
|
||||
@ -206,3 +208,21 @@ test('it merge multiple child in one pass ', async () => {
|
||||
offset += parentVhd.header.blockSize
|
||||
}
|
||||
})
|
||||
|
||||
test('it cleans vhd mergedfiles', async () => {
|
||||
const handler = getHandler({ url: `file://${tempDir}` })
|
||||
|
||||
await handler.writeFile('parent', 'parentData')
|
||||
await handler.writeFile('child1', 'child1Data')
|
||||
await handler.writeFile('child2', 'child2Data')
|
||||
await handler.writeFile('child3', 'child3Data')
|
||||
|
||||
// childPath is from the grand children to the children
|
||||
await cleanupVhds(handler, 'parent', ['child3', 'child2', 'child1'], { remove: true })
|
||||
|
||||
// only child3 should stay, with the data of parent
|
||||
const [child3, ...other] = await handler.list('.')
|
||||
expect(other.length).toEqual(0)
|
||||
expect(child3).toEqual('child3')
|
||||
expect((await handler.readFile('child3')).toString('utf8')).toEqual('parentData')
|
||||
})
|
||||
|
@ -12,11 +12,35 @@ const { basename, dirname } = require('path')
|
||||
const { DISK_TYPES } = require('./_constants')
|
||||
const { Disposable } = require('promise-toolbox')
|
||||
const { asyncEach } = require('@vates/async-each')
|
||||
const { VhdAbstract } = require('./Vhd/VhdAbstract')
|
||||
const { VhdDirectory } = require('./Vhd/VhdDirectory')
|
||||
const { VhdSynthetic } = require('./Vhd/VhdSynthetic')
|
||||
const { asyncMap } = require('@xen-orchestra/async-map')
|
||||
|
||||
const { warn } = createLogger('vhd-lib:merge')
|
||||
|
||||
// the chain we want to merge is [ ancestor, child1, ..., childn]
|
||||
// this chain can have grand children or grand parent
|
||||
//
|
||||
// 1. Create a VhdSynthetic from all children if more than 1 child are merged
|
||||
// 2. Merge the resulting vhd into the ancestor
|
||||
// 2.a if at least one is a file : copy file part from child to parent
|
||||
// 2.b if they are all vhd directory : move blocks from children to the ancestor
|
||||
// 3. update the size, uuid and timestamp of the ancestor with those of child n
|
||||
// 3. Delete all (now) unused VHDs
|
||||
// 4. Rename the ancestor to to child n
|
||||
//
|
||||
// VhdSynthetic
|
||||
// |
|
||||
// /‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾\
|
||||
// [ ancestor, child1, ...,child n-1, child n ]
|
||||
// | \___________________/ ^
|
||||
// | | |
|
||||
// | unused VHDs |
|
||||
// | |
|
||||
// \___________rename_____________/
|
||||
|
||||
// write the merge progress file at most every `delay` seconds
|
||||
function makeThrottledWriter(handler, path, delay) {
|
||||
let lastWrite = Date.now()
|
||||
return async json => {
|
||||
@ -28,21 +52,45 @@ function makeThrottledWriter(handler, path, delay) {
|
||||
}
|
||||
}
|
||||
|
||||
// make the rename / delete part of the merge process
|
||||
// will fail if parent and children are in different remote
|
||||
|
||||
function cleanupVhds(handler, parent, children, { logInfo = noop, remove = false } = {}) {
|
||||
if (!Array.isArray(children)) {
|
||||
children = [children]
|
||||
}
|
||||
const mergeTargetChild = children.shift()
|
||||
|
||||
return Promise.all([
|
||||
VhdAbstract.rename(handler, parent, mergeTargetChild),
|
||||
asyncMap(children, child => {
|
||||
logInfo(`the VHD child is already merged`, { child })
|
||||
if (remove) {
|
||||
logInfo(`deleting merged VHD child`, { child })
|
||||
return VhdAbstract.unlink(handler, child)
|
||||
}
|
||||
}),
|
||||
])
|
||||
}
|
||||
module.exports._cleanupVhds = cleanupVhds
|
||||
|
||||
// Merge one or multiple vhd child into vhd parent.
|
||||
// childPath can be array to create a synthetic VHD from multiple VHDs
|
||||
// childPath is from the grand children to the children
|
||||
//
|
||||
// TODO: rename the VHD file during the merge
|
||||
module.exports = limitConcurrency(2)(async function merge(
|
||||
module.exports.mergeVhd = limitConcurrency(2)(async function merge(
|
||||
parentHandler,
|
||||
parentPath,
|
||||
childHandler,
|
||||
childPath,
|
||||
{ onProgress = noop } = {}
|
||||
{ onProgress = noop, logInfo = noop, remove } = {}
|
||||
) {
|
||||
const mergeStatePath = dirname(parentPath) + '/' + '.' + basename(parentPath) + '.merge.json'
|
||||
|
||||
return await Disposable.use(async function* () {
|
||||
let mergeState
|
||||
let isResuming = false
|
||||
try {
|
||||
const mergeStateContent = await parentHandler.readFile(mergeStatePath)
|
||||
mergeState = JSON.parse(mergeStateContent)
|
||||
@ -75,6 +123,7 @@ module.exports = limitConcurrency(2)(async function merge(
|
||||
assert.strictEqual(childVhd.footer.diskType, DISK_TYPES.DIFFERENCING)
|
||||
assert.strictEqual(childVhd.header.blockSize, parentVhd.header.blockSize)
|
||||
} else {
|
||||
isResuming = true
|
||||
// vhd should not have changed to resume
|
||||
assert.strictEqual(parentVhd.header.checksum, mergeState.parent.header)
|
||||
assert.strictEqual(childVhd.header.checksum, mergeState.child.header)
|
||||
@ -115,12 +164,12 @@ module.exports = limitConcurrency(2)(async function merge(
|
||||
let counter = 0
|
||||
|
||||
const mergeStateWriter = makeThrottledWriter(parentHandler, mergeStatePath, 10e3)
|
||||
|
||||
await asyncEach(
|
||||
toMerge,
|
||||
async blockId => {
|
||||
merging.add(blockId)
|
||||
mergeState.mergedDataSize += await parentVhd.coalesceBlock(childVhd, blockId)
|
||||
mergeState.mergedDataSize += await parentVhd.mergeBlock(childVhd, blockId, isResuming)
|
||||
|
||||
merging.delete(blockId)
|
||||
|
||||
onProgress({
|
||||
@ -155,6 +204,8 @@ module.exports = limitConcurrency(2)(async function merge(
|
||||
// should be a disposable
|
||||
parentHandler.unlink(mergeStatePath).catch(warn)
|
||||
|
||||
await cleanupVhds(parentHandler, parentPath, childPath, { logInfo, remove })
|
||||
|
||||
return mergeState.mergedDataSize
|
||||
})
|
||||
})
|
||||
|
@ -85,10 +85,9 @@ async function convertToVhdDirectory(rawFileName, vhdFileName, path) {
|
||||
await fs.mkdir(path + '/blocks/0/')
|
||||
const stats = await fs.stat(rawFileName)
|
||||
|
||||
const sizeMB = stats.size / 1024 / 1024
|
||||
for (let i = 0, offset = 0; i < sizeMB; i++, offset += blockDataSize) {
|
||||
for (let i = 0, offset = 0; offset < stats.size; i++, offset += blockDataSize) {
|
||||
const blockData = Buffer.alloc(blockDataSize)
|
||||
await fs.read(srcRaw, blockData, offset)
|
||||
await fs.read(srcRaw, blockData, 0, blockData.length, offset)
|
||||
await fs.writeFile(path + '/blocks/0/' + i, Buffer.concat([bitmap, blockData]))
|
||||
}
|
||||
await fs.close(srcRaw)
|
||||
|
Loading…
Reference in New Issue
Block a user