feat(vhd-lib): merge resume can resume when rename fails (#6530)
This commit is contained in:
committed by
GitHub
parent
9d5bc8af6e
commit
f6c227e7f5
@@ -6,9 +6,9 @@ const fs = require('fs-extra')
|
||||
const rimraf = require('rimraf')
|
||||
const tmp = require('tmp')
|
||||
const { getSyncedHandler } = require('@xen-orchestra/fs')
|
||||
const { pFromCallback } = require('promise-toolbox')
|
||||
const { pFromCallback, Disposable } = require('promise-toolbox')
|
||||
|
||||
const { VhdFile, chainVhd } = require('./index')
|
||||
const { VhdFile, chainVhd, openVhd } = require('./index')
|
||||
const { _cleanupVhds: cleanupVhds, mergeVhdChain } = require('./merge')
|
||||
|
||||
const { checkFile, createRandomFile, convertFromRawToVhd } = require('./tests/utils')
|
||||
@@ -163,6 +163,78 @@ test('it can resume a simple merge ', async () => {
|
||||
}
|
||||
})
|
||||
|
||||
test('it can resume a failed renaming ', async () => {
|
||||
const mbOfFather = 8
|
||||
const mbOfChildren = 4
|
||||
const parentRandomFileName = `${tempDir}/randomfile`
|
||||
|
||||
await createRandomFile(`${tempDir}/randomfile`, mbOfFather)
|
||||
await convertFromRawToVhd(`${tempDir}/randomfile`, `${tempDir}/parent.vhd`)
|
||||
const parentVhd = new VhdFile(handler, 'parent.vhd')
|
||||
await parentVhd.readHeaderAndFooter()
|
||||
|
||||
await createRandomFile(`${tempDir}/small_randomfile`, mbOfChildren)
|
||||
await convertFromRawToVhd(`${tempDir}/small_randomfile`, `${tempDir}/child1.vhd`)
|
||||
await chainVhd(handler, 'parent.vhd', handler, 'child1.vhd', true)
|
||||
|
||||
|
||||
const childVhd = new VhdFile(handler, 'child1.vhd')
|
||||
await childVhd.readHeaderAndFooter()
|
||||
|
||||
await handler.writeFile(
|
||||
'.parent.vhd.merge.json',
|
||||
JSON.stringify({
|
||||
parent: {
|
||||
header: parentVhd.header.checksum,
|
||||
},
|
||||
child: {
|
||||
header: childVhd.header.checksum,
|
||||
},
|
||||
step: 'cleanupVhds',
|
||||
})
|
||||
)
|
||||
// expect merge to succed
|
||||
await mergeVhdChain(handler, ['parent.vhd', 'child1.vhd'])
|
||||
// parent have been renamed
|
||||
expect(await fs.exists(`${tempDir}/parent.vhd`)).toBeFalsy()
|
||||
expect(await fs.exists(`${tempDir}/.parent.vhd.merge.json`)).toBeFalsy()
|
||||
|
||||
Disposable.use(openVhd(handler, 'child1.vhd'), async mergedVhd => {
|
||||
await mergedVhd.readBlockAllocationTable()
|
||||
// the resume is at the step 'cleanupVhds' it should not have merged blocks and should still contians parent data
|
||||
|
||||
let offset = 0
|
||||
const fd = await fs.open(parentRandomFileName, 'r')
|
||||
for await (const block of mergedVhd.blocks()) {
|
||||
const blockContent = block.data
|
||||
const buffer = Buffer.alloc(blockContent.length)
|
||||
await fs.read(fd, buffer, 0, buffer.length, offset)
|
||||
|
||||
expect(buffer.equals(blockContent)).toEqual(true)
|
||||
offset += childVhd.header.blockSize
|
||||
}
|
||||
})
|
||||
|
||||
// merge succeed if renaming was already done
|
||||
|
||||
await handler.writeFile(
|
||||
'.parent.vhd.merge.json',
|
||||
JSON.stringify({
|
||||
parent: {
|
||||
header: parentVhd.header.checksum,
|
||||
},
|
||||
child: {
|
||||
header: childVhd.header.checksum,
|
||||
},
|
||||
step: 'cleanupVhds',
|
||||
})
|
||||
)
|
||||
await mergeVhdChain(handler, ['parent.vhd', 'child1.vhd'])
|
||||
expect(await fs.exists(`${tempDir}/parent.vhd`)).toBeFalsy()
|
||||
expect(await fs.exists(`${tempDir}/child1.vhd`)).toBeTruthy()
|
||||
expect(await fs.exists(`${tempDir}/.parent.vhd.merge.json`)).toBeFalsy()
|
||||
})
|
||||
|
||||
test('it can resume a multiple merge ', async () => {
|
||||
const mbOfFather = 8
|
||||
const mbOfChildren = 6
|
||||
@@ -226,7 +298,11 @@ test('it can resume a multiple merge ', async () => {
|
||||
})
|
||||
)
|
||||
// it should succeed
|
||||
await mergeVhdChain(handler, ['parent.vhd', 'child.vhd', 'grandchild.vhd'])
|
||||
await mergeVhdChain(handler, ['parent.vhd', 'child.vhd', 'grandchild.vhd'],{removeUnused: true})
|
||||
expect(await fs.exists(`${tempDir}/parent.vhd`)).toBeFalsy()
|
||||
expect(await fs.exists(`${tempDir}/child.vhd`)).toBeFalsy()
|
||||
expect(await fs.exists(`${tempDir}/grandchild.vhd`)).toBeTruthy()
|
||||
expect(await fs.exists(`${tempDir}/.parent.vhd.merge.json`)).toBeFalsy()
|
||||
})
|
||||
|
||||
test('it merge multiple child in one pass ', async () => {
|
||||
@@ -279,17 +355,4 @@ test('it merge multiple child in one pass ', async () => {
|
||||
}
|
||||
})
|
||||
|
||||
test('it cleans vhd mergedfiles', async () => {
|
||||
await handler.writeFile('parent', 'parentData')
|
||||
await handler.writeFile('child1', 'child1Data')
|
||||
await handler.writeFile('child2', 'child2Data')
|
||||
await handler.writeFile('child3', 'child3Data')
|
||||
|
||||
await cleanupVhds(handler, ['parent', 'child1', 'child2', 'child3'], { merge: true, removeUnused: true })
|
||||
|
||||
// only child3 should stay, with the data of parent
|
||||
const [child3, ...other] = await handler.list('.')
|
||||
expect(other.length).toEqual(0)
|
||||
expect(child3).toEqual('child3')
|
||||
expect((await handler.readFile('child3')).toString('utf8')).toEqual('parentData')
|
||||
})
|
||||
|
||||
@@ -41,91 +41,97 @@ const { warn } = createLogger('vhd-lib:merge')
|
||||
// | |
|
||||
// \_____________rename_____________/
|
||||
|
||||
// write the merge progress file at most every `delay` seconds
|
||||
function makeThrottledWriter(handler, path, delay) {
|
||||
let lastWrite = 0
|
||||
return async json => {
|
||||
class Merger {
|
||||
#chain
|
||||
#childrenPaths
|
||||
#handler
|
||||
#isResuming = false
|
||||
#lastStateWrittenAt = 0
|
||||
#logInfo
|
||||
#mergeBlockConcurrency
|
||||
#onProgress
|
||||
#parentPath
|
||||
#removeUnused
|
||||
#state
|
||||
#statePath
|
||||
|
||||
constructor(handler, chain, { onProgress, logInfo, removeUnused, mergeBlockConcurrency }) {
|
||||
this.#chain = chain
|
||||
this.#handler = handler
|
||||
this.#parentPath = chain[0]
|
||||
this.#childrenPaths = chain.slice(1)
|
||||
this.#logInfo = logInfo
|
||||
this.#onProgress = onProgress
|
||||
this.#removeUnused = removeUnused
|
||||
this.#mergeBlockConcurrency = mergeBlockConcurrency
|
||||
|
||||
this.#statePath = dirname(this.#parentPath) + '/.' + basename(this.#parentPath) + '.merge.json'
|
||||
}
|
||||
|
||||
async #writeState() {
|
||||
await this.#handler.writeFile(this.#statePath, JSON.stringify(this.#state), { flags: 'w' }).catch(warn)
|
||||
}
|
||||
|
||||
async #writeStateThrottled() {
|
||||
const delay = 10e3
|
||||
const now = Date.now()
|
||||
if (now - lastWrite > delay) {
|
||||
lastWrite = now
|
||||
await handler.writeFile(path, JSON.stringify(json), { flags: 'w' }).catch(warn)
|
||||
if (now - this.#lastStateWrittenAt > delay) {
|
||||
this.#lastStateWrittenAt = now
|
||||
await this.#writeState()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// make the rename / delete part of the merge process
|
||||
// will fail if parent and children are in different remote
|
||||
|
||||
async function cleanupVhds(handler, chain, { logInfo = noop, removeUnused = false } = {}) {
|
||||
const parent = chain[0]
|
||||
const children = chain.slice(1, -1)
|
||||
const mergeTargetChild = chain[chain.length - 1]
|
||||
|
||||
await handler.rename(parent, mergeTargetChild)
|
||||
|
||||
return asyncMap(children, child => {
|
||||
logInfo(`the VHD child is already merged`, { child })
|
||||
if (removeUnused) {
|
||||
logInfo(`deleting merged VHD child`, { child })
|
||||
return VhdAbstract.unlink(handler, child)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
module.exports._cleanupVhds = cleanupVhds
|
||||
|
||||
// Merge a chain of VHDs into a single VHD
|
||||
module.exports.mergeVhdChain = limitConcurrency(2)(async function mergeVhdChain(
|
||||
handler,
|
||||
chain,
|
||||
{ onProgress = noop, logInfo = noop, removeUnused = false, mergeBlockConcurrency = 2 } = {}
|
||||
) {
|
||||
assert(chain.length >= 2)
|
||||
|
||||
const parentPath = chain[0]
|
||||
const childrenPaths = chain.slice(1)
|
||||
|
||||
const mergeStatePath = dirname(parentPath) + '/.' + basename(parentPath) + '.merge.json'
|
||||
|
||||
return await Disposable.use(async function* () {
|
||||
let mergeState
|
||||
let isResuming = false
|
||||
async merge() {
|
||||
try {
|
||||
const mergeStateContent = await handler.readFile(mergeStatePath)
|
||||
mergeState = JSON.parse(mergeStateContent)
|
||||
const mergeStateContent = await this.#handler.readFile(this.#statePath)
|
||||
this.#state = JSON.parse(mergeStateContent)
|
||||
|
||||
// work-around a bug introduce in 97d94b795
|
||||
//
|
||||
// currentBlock could be `null` due to the JSON.stringify of a `NaN` value
|
||||
if (mergeState.currentBlock === null) {
|
||||
mergeState.currentBlock = 0
|
||||
if (this.#state.currentBlock === null) {
|
||||
this.#state.currentBlock = 0
|
||||
}
|
||||
this.#isResuming = true
|
||||
} catch (error) {
|
||||
if (error.code !== 'ENOENT') {
|
||||
warn('problem while checking the merge state', { error })
|
||||
}
|
||||
}
|
||||
/* eslint-disable no-fallthrough */
|
||||
switch (this.#state?.step ?? 'mergeBlocks') {
|
||||
case 'mergeBlocks':
|
||||
await this.#step_mergeBlocks()
|
||||
case 'cleanupVhds':
|
||||
await this.#step_cleanVhds()
|
||||
return this.#cleanup()
|
||||
default:
|
||||
warn(`Step ${this.#state.step} is unknown`, { state: this.#state })
|
||||
}
|
||||
/* eslint-enable no-fallthrough */
|
||||
}
|
||||
|
||||
async *#openVhds() {
|
||||
// during merging, the end footer of the parent can be overwritten by new blocks
|
||||
// we should use it as a way to check vhd health
|
||||
const parentVhd = yield openVhd(handler, parentPath, {
|
||||
const parentVhd = yield openVhd(this.#handler, this.#parentPath, {
|
||||
flags: 'r+',
|
||||
checkSecondFooter: mergeState === undefined,
|
||||
checkSecondFooter: this.#state === undefined,
|
||||
})
|
||||
let childVhd
|
||||
const parentIsVhdDirectory = parentVhd instanceof VhdDirectory
|
||||
let childIsVhdDirectory
|
||||
if (childrenPaths.length !== 1) {
|
||||
childVhd = yield VhdSynthetic.open(handler, childrenPaths)
|
||||
if (this.#childrenPaths.length !== 1) {
|
||||
childVhd = yield VhdSynthetic.open(this.#handler, this.#childrenPaths)
|
||||
childIsVhdDirectory = childVhd.checkVhdsClass(VhdDirectory)
|
||||
} else {
|
||||
childVhd = yield openVhd(handler, childrenPaths[0])
|
||||
childVhd = yield openVhd(this.#handler, this.#childrenPaths[0])
|
||||
childIsVhdDirectory = childVhd instanceof VhdDirectory
|
||||
}
|
||||
|
||||
// merging vhdFile must not be concurrently with the potential block reordering after a change
|
||||
const concurrency = parentIsVhdDirectory && childIsVhdDirectory ? mergeBlockConcurrency : 1
|
||||
if (mergeState === undefined) {
|
||||
this.#mergeBlockConcurrency = parentIsVhdDirectory && childIsVhdDirectory ? this.#mergeBlockConcurrency : 1
|
||||
if (this.#state === undefined) {
|
||||
// merge should be along a vhd chain
|
||||
assert.strictEqual(UUID.stringify(childVhd.header.parentUuid), UUID.stringify(parentVhd.footer.uuid))
|
||||
const parentDiskType = parentVhd.footer.diskType
|
||||
@@ -133,70 +139,86 @@ module.exports.mergeVhdChain = limitConcurrency(2)(async function mergeVhdChain(
|
||||
assert.strictEqual(childVhd.footer.diskType, DISK_TYPES.DIFFERENCING)
|
||||
assert.strictEqual(childVhd.header.blockSize, parentVhd.header.blockSize)
|
||||
} else {
|
||||
isResuming = true
|
||||
// vhd should not have changed to resume
|
||||
assert.strictEqual(parentVhd.header.checksum, mergeState.parent.header)
|
||||
assert.strictEqual(childVhd.header.checksum, mergeState.child.header)
|
||||
assert.strictEqual(parentVhd.header.checksum, this.#state.parent.header)
|
||||
assert.strictEqual(childVhd.header.checksum, this.#state.child.header)
|
||||
}
|
||||
|
||||
// Read allocation table of child/parent.
|
||||
await Promise.all([parentVhd.readBlockAllocationTable(), childVhd.readBlockAllocationTable()])
|
||||
|
||||
return { childVhd, parentVhd }
|
||||
}
|
||||
|
||||
async #step_mergeBlocks() {
|
||||
const self = this
|
||||
await Disposable.use(async function* () {
|
||||
const { childVhd, parentVhd } = yield* self.#openVhds()
|
||||
const { maxTableEntries } = childVhd.header
|
||||
|
||||
if (self.#state === undefined) {
|
||||
await parentVhd.ensureBatSize(childVhd.header.maxTableEntries)
|
||||
|
||||
self.#state = {
|
||||
child: { header: childVhd.header.checksum },
|
||||
parent: { header: parentVhd.header.checksum },
|
||||
currentBlock: 0,
|
||||
mergedDataSize: 0,
|
||||
step: 'mergeBlocks',
|
||||
chain: self.#chain.map(vhdPath => handlerPath.relativeFromFile(self.#statePath, vhdPath)),
|
||||
}
|
||||
|
||||
// finds first allocated block for the 2 following loops
|
||||
while (self.#state.currentBlock < maxTableEntries && !childVhd.containsBlock(self.#state.currentBlock)) {
|
||||
++self.#state.currentBlock
|
||||
}
|
||||
await self.#writeState()
|
||||
}
|
||||
await self.#mergeBlocks(parentVhd, childVhd)
|
||||
await self.#updateHeaders(parentVhd, childVhd)
|
||||
})
|
||||
}
|
||||
|
||||
async #mergeBlocks(parentVhd, childVhd) {
|
||||
const { maxTableEntries } = childVhd.header
|
||||
|
||||
if (mergeState === undefined) {
|
||||
await parentVhd.ensureBatSize(childVhd.header.maxTableEntries)
|
||||
|
||||
mergeState = {
|
||||
child: { header: childVhd.header.checksum },
|
||||
parent: { header: parentVhd.header.checksum },
|
||||
currentBlock: 0,
|
||||
mergedDataSize: 0,
|
||||
chain: chain.map(vhdPath => handlerPath.relativeFromFile(mergeStatePath, vhdPath)),
|
||||
}
|
||||
|
||||
// finds first allocated block for the 2 following loops
|
||||
while (mergeState.currentBlock < maxTableEntries && !childVhd.containsBlock(mergeState.currentBlock)) {
|
||||
++mergeState.currentBlock
|
||||
}
|
||||
}
|
||||
|
||||
// counts number of allocated blocks
|
||||
const toMerge = []
|
||||
for (let block = mergeState.currentBlock; block < maxTableEntries; block++) {
|
||||
for (let block = this.#state.currentBlock; block < maxTableEntries; block++) {
|
||||
if (childVhd.containsBlock(block)) {
|
||||
toMerge.push(block)
|
||||
}
|
||||
}
|
||||
const nBlocks = toMerge.length
|
||||
onProgress({ total: nBlocks, done: 0 })
|
||||
this.#onProgress({ total: nBlocks, done: 0 })
|
||||
|
||||
const merging = new Set()
|
||||
let counter = 0
|
||||
|
||||
const mergeStateWriter = makeThrottledWriter(handler, mergeStatePath, 10e3)
|
||||
await mergeStateWriter(mergeState)
|
||||
await asyncEach(
|
||||
toMerge,
|
||||
async blockId => {
|
||||
merging.add(blockId)
|
||||
mergeState.mergedDataSize += await parentVhd.mergeBlock(childVhd, blockId, isResuming)
|
||||
this.#state.mergedDataSize += await parentVhd.mergeBlock(childVhd, blockId, this.#isResuming)
|
||||
|
||||
mergeState.currentBlock = Math.min(...merging)
|
||||
this.#state.currentBlock = Math.min(...merging)
|
||||
merging.delete(blockId)
|
||||
|
||||
onProgress({
|
||||
this.#onProgress({
|
||||
total: nBlocks,
|
||||
done: counter + 1,
|
||||
})
|
||||
counter++
|
||||
mergeStateWriter(mergeState)
|
||||
this.#writeStateThrottled()
|
||||
},
|
||||
{
|
||||
concurrency,
|
||||
concurrency: this.#mergeBlockConcurrency,
|
||||
}
|
||||
)
|
||||
onProgress({ total: nBlocks, done: nBlocks })
|
||||
// ensure data size is correct
|
||||
await this.#writeState()
|
||||
this.#onProgress({ total: nBlocks, done: nBlocks })
|
||||
}
|
||||
|
||||
async #updateHeaders(parentVhd, childVhd) {
|
||||
// some blocks could have been created or moved in parent : write bat
|
||||
await parentVhd.writeBlockAllocationTable()
|
||||
|
||||
@@ -212,19 +234,70 @@ module.exports.mergeVhdChain = limitConcurrency(2)(async function mergeVhdChain(
|
||||
// necessary to update values and to recreate the footer after block
|
||||
// creation
|
||||
await parentVhd.writeFooter()
|
||||
}
|
||||
|
||||
await cleanupVhds(handler, chain, { logInfo, removeUnused })
|
||||
// make the rename / delete part of the merge process
|
||||
// will fail if parent and children are in different remote
|
||||
async #step_cleanVhds() {
|
||||
assert.notEqual(this.#state, undefined)
|
||||
this.#state.step = 'cleanupVhds'
|
||||
await this.#writeState()
|
||||
|
||||
// should be a disposable
|
||||
handler.unlink(mergeStatePath).catch(warn)
|
||||
const chain = this.#chain
|
||||
const handler = this.#handler
|
||||
|
||||
return mergeState.mergedDataSize
|
||||
}).catch(error => {
|
||||
const parent = chain[0]
|
||||
const children = chain.slice(1, -1)
|
||||
const mergeTargetChild = chain[chain.length - 1]
|
||||
|
||||
// in the case is an alias, renaming parent to mergeTargetChild will keep the real data
|
||||
// of mergeTargetChild in the data folder
|
||||
// mergeTargetChild is already in an incomplete state, its blocks have been transferred to parent
|
||||
await VhdAbstract.unlink(handler, mergeTargetChild)
|
||||
|
||||
try {
|
||||
await handler.rename(parent, mergeTargetChild)
|
||||
} catch (error) {
|
||||
// maybe the renaming was already successfull during merge
|
||||
if (error.code === 'ENOENT' && this.#isResuming) {
|
||||
Disposable.use(openVhd(handler, mergeTargetChild), vhd => {
|
||||
// we are sure that mergeTargetChild is the right one
|
||||
assert.strictEqual(vhd.header.checksum, this.#state.parent.header)
|
||||
})
|
||||
this.#logInfo(`the VHD parent was already renamed`, { parent, mergeTargetChild })
|
||||
}
|
||||
}
|
||||
|
||||
await asyncMap(children, child => {
|
||||
this.#logInfo(`the VHD child is already merged`, { child })
|
||||
if (this.#removeUnused) {
|
||||
this.#logInfo(`deleting merged VHD child`, { child })
|
||||
return VhdAbstract.unlink(handler, child)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async #cleanup() {
|
||||
const mergedSize = this.#state?.mergedDataSize ?? 0
|
||||
await this.#handler.unlink(this.#statePath).catch(warn)
|
||||
return mergedSize
|
||||
}
|
||||
}
|
||||
|
||||
module.exports.mergeVhdChain = limitConcurrency(2)(async function mergeVhdChain(
|
||||
handler,
|
||||
chain,
|
||||
{ onProgress = noop, logInfo = noop, removeUnused = false, mergeBlockConcurrency = 2 } = {}
|
||||
) {
|
||||
const merger = new Merger(handler, chain, { onProgress, logInfo, removeUnused, mergeBlockConcurrency })
|
||||
try {
|
||||
return merger.merge()
|
||||
} catch (error) {
|
||||
try {
|
||||
error.chain = chain
|
||||
} finally {
|
||||
// eslint-disable-next-line no-unsafe-finally
|
||||
throw error
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user