Compare commits

...

2 Commits

Author SHA1 Message Date
Florent BEAUCHAMP
d4ca0da8b9 fix(backups/cleanVm): vhd size computation 2023-07-05 16:15:02 +02:00
Florent BEAUCHAMP
486d50f2f1 feat(@xen-orchestra/backups): store more detailled sizes 2023-07-05 14:38:17 +02:00
8 changed files with 59 additions and 37 deletions

View File

@@ -669,7 +669,7 @@ class RemoteAdapter {
const handler = this._handler const handler = this._handler
if (this.useVhdDirectory()) { if (this.useVhdDirectory()) {
const dataPath = `${dirname(path)}/data/${uuidv4()}.vhd` const dataPath = `${dirname(path)}/data/${uuidv4()}.vhd`
const size = await createVhdDirectoryFromStream(handler, dataPath, input, { const sizes = await createVhdDirectoryFromStream(handler, dataPath, input, {
concurrency: writeBlockConcurrency, concurrency: writeBlockConcurrency,
compression: this.#getCompressionType(), compression: this.#getCompressionType(),
async validator() { async validator() {
@@ -678,9 +678,14 @@ class RemoteAdapter {
}, },
}) })
await VhdAbstract.createAlias(handler, path, dataPath) await VhdAbstract.createAlias(handler, path, dataPath)
return size return sizes
} else { } else {
return this.outputStream(path, input, { checksum, validator }) const size = this.outputStream(path, input, { checksum, validator })
return {
compressedSize: size,
sourceSize: size,
writtenSize: size,
}
} }
} }

View File

@@ -3,7 +3,7 @@
const sum = require('lodash/sum') const sum = require('lodash/sum')
const UUID = require('uuid') const UUID = require('uuid')
const { asyncMap } = require('@xen-orchestra/async-map') const { asyncMap } = require('@xen-orchestra/async-map')
const { Constants, openVhd, VhdAbstract, VhdFile } = require('vhd-lib') const { Constants, openVhd, VhdAbstract } = require('vhd-lib')
const { isVhdAlias, resolveVhdAlias } = require('vhd-lib/aliases') const { isVhdAlias, resolveVhdAlias } = require('vhd-lib/aliases')
const { dirname, resolve } = require('path') const { dirname, resolve } = require('path')
const { DISK_TYPES } = Constants const { DISK_TYPES } = Constants
@@ -15,24 +15,14 @@ const { Task } = require('./Task.js')
const { Disposable } = require('promise-toolbox') const { Disposable } = require('promise-toolbox')
const handlerPath = require('@xen-orchestra/fs/path') const handlerPath = require('@xen-orchestra/fs/path')
// checking the size of a vhd directory is costly
// 1 Http Query per 1000 blocks
// we only check size of all the vhd are VhdFiles
function shouldComputeVhdsSize(handler, vhds) {
if (handler.isEncrypted) {
return false
}
return vhds.every(vhd => vhd instanceof VhdFile)
}
const computeVhdsSize = (handler, vhdPaths) => const computeVhdsSize = (handler, vhdPaths) =>
Disposable.use( Disposable.use(
vhdPaths.map(vhdPath => openVhd(handler, vhdPath)), vhdPaths.map(vhdPath => openVhd(handler, vhdPath)),
async vhds => { async vhds => {
if (shouldComputeVhdsSize(handler, vhds)) { await Promise.all(vhds.map(vhd => vhd.readBlockAllocationTable()))
const sizes = await asyncMap(vhds, vhd => vhd.getSize()) // get file size for vhdfile, computed size from bat for vhd directory
return sum(sizes) const sizes = await asyncMap(vhds, vhd => vhd.streamSize())
} return sum(sizes)
} }
) )
@@ -534,11 +524,6 @@ exports.cleanVm = async function cleanVm(
const linkedVhds = Object.keys(vhds).map(key => resolve('/', vmDir, vhds[key])) const linkedVhds = Object.keys(vhds).map(key => resolve('/', vmDir, vhds[key]))
fileSystemSize = await computeVhdsSize(handler, linkedVhds) fileSystemSize = await computeVhdsSize(handler, linkedVhds)
// the size is not computed in some cases (e.g. VhdDirectory)
if (fileSystemSize === undefined) {
return
}
// don't warn if the size has changed after a merge // don't warn if the size has changed after a merge
if (!merged && fileSystemSize !== size) { if (!merged && fileSystemSize !== size) {
// FIXME: figure out why it occurs so often and, once fixed, log the real problems with `logWarn` // FIXME: figure out why it occurs so often and, once fixed, log the real problems with `logWarn`
@@ -556,6 +541,8 @@ exports.cleanVm = async function cleanVm(
// systematically update size after a merge // systematically update size after a merge
if ((merged || fixMetadata) && size !== fileSystemSize) { if ((merged || fixMetadata) && size !== fileSystemSize) {
// @todo add a cumulatedTransferSize property ?
// @todo update writtenSize, compressedSize
metadata.size = fileSystemSize metadata.size = fileSystemSize
mustRegenerateCache = true mustRegenerateCache = true
try { try {

View File

@@ -205,7 +205,7 @@ class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrementalWrite
// TODO remove when this has been done before the export // TODO remove when this has been done before the export
await checkVhd(handler, parentPath) await checkVhd(handler, parentPath)
} }
// @todo : sum per property
transferSize += await adapter.writeVhd(path, deltaExport.streams[`${id}.vhd`], { transferSize += await adapter.writeVhd(path, deltaExport.streams[`${id}.vhd`], {
// no checksum for VHDs, because they will be invalidated by // no checksum for VHDs, because they will be invalidated by
// merges and chainings // merges and chainings
@@ -232,7 +232,7 @@ class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrementalWrite
return { size: transferSize } return { size: transferSize }
}) })
metadataContent.size = size metadataContent.size = size // @todo: transferSize
this._metadataFileName = await adapter.writeVmBackupMetadata(vm.uuid, metadataContent) this._metadataFileName = await adapter.writeVmBackupMetadata(vm.uuid, metadataContent)
// TODO: run cleanup? // TODO: run cleanup?

View File

@@ -168,7 +168,12 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
// in case of VhdDirectory, we want to create the file if it does not exists // in case of VhdDirectory, we want to create the file if it does not exists
const flags = this._opts?.flags === 'r+' ? 'w' : this._opts?.flags const flags = this._opts?.flags === 'r+' ? 'w' : this._opts?.flags
const compressed = await this.#compressor.compress(buffer) const compressed = await this.#compressor.compress(buffer)
return this._handler.outputFile(this.#getChunkPath(partName), compressed, { flags }) const writtenSize = await this._handler.outputFile(this.#getChunkPath(partName), compressed, { flags })
return {
size: buffer.length,
compressedSize: compressed.length,
writtenSize,
}
} }
// put block in subdirectories to limit impact when doing directory listing // put block in subdirectories to limit impact when doing directory listing
@@ -228,7 +233,7 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
footer.checksum = checksumStruct(rawFooter, fuFooter) footer.checksum = checksumStruct(rawFooter, fuFooter)
debug(`Write footer (checksum=${footer.checksum}). (data=${rawFooter.toString('hex')})`) debug(`Write footer (checksum=${footer.checksum}). (data=${rawFooter.toString('hex')})`)
await this._writeChunk('footer', rawFooter) return await this._writeChunk('footer', rawFooter)
} }
async writeHeader() { async writeHeader() {
@@ -236,8 +241,9 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
const rawHeader = fuHeader.pack(header) const rawHeader = fuHeader.pack(header)
header.checksum = checksumStruct(rawHeader, fuHeader) header.checksum = checksumStruct(rawHeader, fuHeader)
debug(`Write header (checksum=${header.checksum}). (data=${rawHeader.toString('hex')})`) debug(`Write header (checksum=${header.checksum}). (data=${rawHeader.toString('hex')})`)
await this._writeChunk('header', rawHeader) const sizes = await this._writeChunk('header', rawHeader)
await this.#writeChunkFilters() await this.#writeChunkFilters()
return sizes
} }
writeBlockAllocationTable() { writeBlockAllocationTable() {
@@ -285,8 +291,9 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
} }
async writeEntireBlock(block) { async writeEntireBlock(block) {
await this._writeChunk(this.#getBlockPath(block.id), block.buffer) const sizes = await this._writeChunk(this.#getBlockPath(block.id), block.buffer)
setBitmap(this.#blockTable, block.id) setBitmap(this.#blockTable, block.id)
return sizes
} }
async _readParentLocatorData(id) { async _readParentLocatorData(id) {
@@ -294,8 +301,9 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
} }
async _writeParentLocatorData(id, data) { async _writeParentLocatorData(id, data) {
await this._writeChunk('parentLocatorEntry' + id, data) const sizes = await this._writeChunk('parentLocatorEntry' + id, data)
this.header.parentLocatorEntry[id].platformDataOffset = 0 this.header.parentLocatorEntry[id].platformDataOffset = 0
return sizes
} }
async #writeChunkFilters() { async #writeChunkFilters() {

View File

@@ -463,7 +463,7 @@ exports.VhdFile = class VhdFile extends VhdAbstract {
} }
} }
async getSize() { async streamSize() {
return await this._handler.getSize(this._path) return await this._handler.getSize(this._path)
} }
} }

View File

@@ -10,6 +10,16 @@ const { warn } = createLogger('vhd-lib:createVhdDirectoryFromStream')
const buildVhd = Disposable.wrap(async function* (handler, path, inputStream, { concurrency, compression }) { const buildVhd = Disposable.wrap(async function* (handler, path, inputStream, { concurrency, compression }) {
const vhd = yield VhdDirectory.create(handler, path, { compression }) const vhd = yield VhdDirectory.create(handler, path, { compression })
const sizes = {
compressedSize: 0,
sourceSize: 0,
writtenSize: 0,
}
const updateSums = ({ writtenSize, compressedSize, sourceSize }) => {
sizes.writtenSize += writtenSize ?? 0
sizes.compressedSize += compressedSize ?? 0
sizes.sourceSize += sourceSize ?? 0
}
await asyncEach( await asyncEach(
parseVhdStream(inputStream), parseVhdStream(inputStream),
async function (item) { async function (item) {
@@ -21,10 +31,10 @@ const buildVhd = Disposable.wrap(async function* (handler, path, inputStream, {
vhd.header = item.header vhd.header = item.header
break break
case 'parentLocator': case 'parentLocator':
await vhd.writeParentLocator({ ...item, data: item.buffer }) updateSums(await vhd.writeParentLocator({ ...item, data: item.buffer }))
break break
case 'block': case 'block':
await vhd.writeEntireBlock(item) updateSums(await vhd.writeEntireBlock(item))
break break
case 'bat': case 'bat':
// it exists but I don't care // it exists but I don't care
@@ -36,9 +46,18 @@ const buildVhd = Disposable.wrap(async function* (handler, path, inputStream, {
{ {
concurrency, concurrency,
} }
)(await Promise.all([vhd.writeFooter(), vhd.writeHeader(), vhd.writeBlockAllocationTable()])).forEach(
([footer, header, bat]) => {
updateSums(footer)
updateSums(header)
updateSums(bat)
}
) )
await Promise.all([vhd.writeFooter(), vhd.writeHeader(), vhd.writeBlockAllocationTable()]) const vhdSize = vhd.streamSize()
return vhd.streamSize() return {
...sizes,
vhdSize,
}
}) })
exports.createVhdDirectoryFromStream = async function createVhdDirectoryFromStream( exports.createVhdDirectoryFromStream = async function createVhdDirectoryFromStream(
@@ -48,11 +67,11 @@ exports.createVhdDirectoryFromStream = async function createVhdDirectoryFromStre
{ validator, concurrency = 16, compression } = {} { validator, concurrency = 16, compression } = {}
) { ) {
try { try {
const size = await buildVhd(handler, path, inputStream, { concurrency, compression }) const sizes = await buildVhd(handler, path, inputStream, { concurrency, compression })
if (validator !== undefined) { if (validator !== undefined) {
await validator.call(this, path) await validator.call(this, path)
} }
return size return sizes
} catch (error) { } catch (error) {
// cleanup on error // cleanup on error
await handler.rmtree(path).catch(warn) await handler.rmtree(path).catch(warn)

View File

@@ -142,9 +142,11 @@ const COLUMNS = [
return return
} }
if (operationTask.message === 'transfer' && vmTransferSize === undefined) { if (operationTask.message === 'transfer' && vmTransferSize === undefined) {
// @todo handle if size is an object
vmTransferSize = operationTask.result?.size vmTransferSize = operationTask.result?.size
} }
if (operationTask.message === 'merge' && vmMergeSize === undefined) { if (operationTask.message === 'merge' && vmMergeSize === undefined) {
// @todo handle if size is an object
vmMergeSize = operationTask.result?.size vmMergeSize = operationTask.result?.size
} }

View File

@@ -330,6 +330,7 @@ const SrTask = ({ children, className, task }) => (
) )
const TransferMergeTask = ({ className, task }) => { const TransferMergeTask = ({ className, task }) => {
// @todo : handle case when size is an object
const size = defined(() => task.result.size, 0) const size = defined(() => task.result.size, 0)
if (task.status === 'success' && size === 0 && task.warnings?.length === 0) { if (task.status === 'success' && size === 0 && task.warnings?.length === 0) {
return null return null