Compare commits
2 Commits
lite/defin
...
feat_sizes
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d4ca0da8b9 | ||
|
|
486d50f2f1 |
@@ -669,7 +669,7 @@ class RemoteAdapter {
|
||||
const handler = this._handler
|
||||
if (this.useVhdDirectory()) {
|
||||
const dataPath = `${dirname(path)}/data/${uuidv4()}.vhd`
|
||||
const size = await createVhdDirectoryFromStream(handler, dataPath, input, {
|
||||
const sizes = await createVhdDirectoryFromStream(handler, dataPath, input, {
|
||||
concurrency: writeBlockConcurrency,
|
||||
compression: this.#getCompressionType(),
|
||||
async validator() {
|
||||
@@ -678,9 +678,14 @@ class RemoteAdapter {
|
||||
},
|
||||
})
|
||||
await VhdAbstract.createAlias(handler, path, dataPath)
|
||||
return size
|
||||
return sizes
|
||||
} else {
|
||||
return this.outputStream(path, input, { checksum, validator })
|
||||
const size = this.outputStream(path, input, { checksum, validator })
|
||||
return {
|
||||
compressedSize: size,
|
||||
sourceSize: size,
|
||||
writtenSize: size,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
const sum = require('lodash/sum')
|
||||
const UUID = require('uuid')
|
||||
const { asyncMap } = require('@xen-orchestra/async-map')
|
||||
const { Constants, openVhd, VhdAbstract, VhdFile } = require('vhd-lib')
|
||||
const { Constants, openVhd, VhdAbstract } = require('vhd-lib')
|
||||
const { isVhdAlias, resolveVhdAlias } = require('vhd-lib/aliases')
|
||||
const { dirname, resolve } = require('path')
|
||||
const { DISK_TYPES } = Constants
|
||||
@@ -15,24 +15,14 @@ const { Task } = require('./Task.js')
|
||||
const { Disposable } = require('promise-toolbox')
|
||||
const handlerPath = require('@xen-orchestra/fs/path')
|
||||
|
||||
// checking the size of a vhd directory is costly
|
||||
// 1 Http Query per 1000 blocks
|
||||
// we only check size of all the vhd are VhdFiles
|
||||
function shouldComputeVhdsSize(handler, vhds) {
|
||||
if (handler.isEncrypted) {
|
||||
return false
|
||||
}
|
||||
return vhds.every(vhd => vhd instanceof VhdFile)
|
||||
}
|
||||
|
||||
const computeVhdsSize = (handler, vhdPaths) =>
|
||||
Disposable.use(
|
||||
vhdPaths.map(vhdPath => openVhd(handler, vhdPath)),
|
||||
async vhds => {
|
||||
if (shouldComputeVhdsSize(handler, vhds)) {
|
||||
const sizes = await asyncMap(vhds, vhd => vhd.getSize())
|
||||
return sum(sizes)
|
||||
}
|
||||
await Promise.all(vhds.map(vhd => vhd.readBlockAllocationTable()))
|
||||
// get file size for vhdfile, computed size from bat for vhd directory
|
||||
const sizes = await asyncMap(vhds, vhd => vhd.streamSize())
|
||||
return sum(sizes)
|
||||
}
|
||||
)
|
||||
|
||||
@@ -534,11 +524,6 @@ exports.cleanVm = async function cleanVm(
|
||||
const linkedVhds = Object.keys(vhds).map(key => resolve('/', vmDir, vhds[key]))
|
||||
fileSystemSize = await computeVhdsSize(handler, linkedVhds)
|
||||
|
||||
// the size is not computed in some cases (e.g. VhdDirectory)
|
||||
if (fileSystemSize === undefined) {
|
||||
return
|
||||
}
|
||||
|
||||
// don't warn if the size has changed after a merge
|
||||
if (!merged && fileSystemSize !== size) {
|
||||
// FIXME: figure out why it occurs so often and, once fixed, log the real problems with `logWarn`
|
||||
@@ -556,6 +541,8 @@ exports.cleanVm = async function cleanVm(
|
||||
|
||||
// systematically update size after a merge
|
||||
if ((merged || fixMetadata) && size !== fileSystemSize) {
|
||||
// @todo add a cumulatedTransferSize property ?
|
||||
// @todo update writtenSize, compressedSize
|
||||
metadata.size = fileSystemSize
|
||||
mustRegenerateCache = true
|
||||
try {
|
||||
|
||||
@@ -205,7 +205,7 @@ class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrementalWrite
|
||||
// TODO remove when this has been done before the export
|
||||
await checkVhd(handler, parentPath)
|
||||
}
|
||||
|
||||
// @todo : sum per property
|
||||
transferSize += await adapter.writeVhd(path, deltaExport.streams[`${id}.vhd`], {
|
||||
// no checksum for VHDs, because they will be invalidated by
|
||||
// merges and chainings
|
||||
@@ -232,7 +232,7 @@ class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrementalWrite
|
||||
|
||||
return { size: transferSize }
|
||||
})
|
||||
metadataContent.size = size
|
||||
metadataContent.size = size // @todo: transferSize
|
||||
this._metadataFileName = await adapter.writeVmBackupMetadata(vm.uuid, metadataContent)
|
||||
|
||||
// TODO: run cleanup?
|
||||
|
||||
@@ -168,7 +168,12 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
|
||||
// in case of VhdDirectory, we want to create the file if it does not exists
|
||||
const flags = this._opts?.flags === 'r+' ? 'w' : this._opts?.flags
|
||||
const compressed = await this.#compressor.compress(buffer)
|
||||
return this._handler.outputFile(this.#getChunkPath(partName), compressed, { flags })
|
||||
const writtenSize = await this._handler.outputFile(this.#getChunkPath(partName), compressed, { flags })
|
||||
return {
|
||||
size: buffer.length,
|
||||
compressedSize: compressed.length,
|
||||
writtenSize,
|
||||
}
|
||||
}
|
||||
|
||||
// put block in subdirectories to limit impact when doing directory listing
|
||||
@@ -228,7 +233,7 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
|
||||
footer.checksum = checksumStruct(rawFooter, fuFooter)
|
||||
debug(`Write footer (checksum=${footer.checksum}). (data=${rawFooter.toString('hex')})`)
|
||||
|
||||
await this._writeChunk('footer', rawFooter)
|
||||
return await this._writeChunk('footer', rawFooter)
|
||||
}
|
||||
|
||||
async writeHeader() {
|
||||
@@ -236,8 +241,9 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
|
||||
const rawHeader = fuHeader.pack(header)
|
||||
header.checksum = checksumStruct(rawHeader, fuHeader)
|
||||
debug(`Write header (checksum=${header.checksum}). (data=${rawHeader.toString('hex')})`)
|
||||
await this._writeChunk('header', rawHeader)
|
||||
const sizes = await this._writeChunk('header', rawHeader)
|
||||
await this.#writeChunkFilters()
|
||||
return sizes
|
||||
}
|
||||
|
||||
writeBlockAllocationTable() {
|
||||
@@ -285,8 +291,9 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
|
||||
}
|
||||
|
||||
async writeEntireBlock(block) {
|
||||
await this._writeChunk(this.#getBlockPath(block.id), block.buffer)
|
||||
const sizes = await this._writeChunk(this.#getBlockPath(block.id), block.buffer)
|
||||
setBitmap(this.#blockTable, block.id)
|
||||
return sizes
|
||||
}
|
||||
|
||||
async _readParentLocatorData(id) {
|
||||
@@ -294,8 +301,9 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
|
||||
}
|
||||
|
||||
async _writeParentLocatorData(id, data) {
|
||||
await this._writeChunk('parentLocatorEntry' + id, data)
|
||||
const sizes = await this._writeChunk('parentLocatorEntry' + id, data)
|
||||
this.header.parentLocatorEntry[id].platformDataOffset = 0
|
||||
return sizes
|
||||
}
|
||||
|
||||
async #writeChunkFilters() {
|
||||
|
||||
@@ -463,7 +463,7 @@ exports.VhdFile = class VhdFile extends VhdAbstract {
|
||||
}
|
||||
}
|
||||
|
||||
async getSize() {
|
||||
async streamSize() {
|
||||
return await this._handler.getSize(this._path)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,6 +10,16 @@ const { warn } = createLogger('vhd-lib:createVhdDirectoryFromStream')
|
||||
|
||||
const buildVhd = Disposable.wrap(async function* (handler, path, inputStream, { concurrency, compression }) {
|
||||
const vhd = yield VhdDirectory.create(handler, path, { compression })
|
||||
const sizes = {
|
||||
compressedSize: 0,
|
||||
sourceSize: 0,
|
||||
writtenSize: 0,
|
||||
}
|
||||
const updateSums = ({ writtenSize, compressedSize, sourceSize }) => {
|
||||
sizes.writtenSize += writtenSize ?? 0
|
||||
sizes.compressedSize += compressedSize ?? 0
|
||||
sizes.sourceSize += sourceSize ?? 0
|
||||
}
|
||||
await asyncEach(
|
||||
parseVhdStream(inputStream),
|
||||
async function (item) {
|
||||
@@ -21,10 +31,10 @@ const buildVhd = Disposable.wrap(async function* (handler, path, inputStream, {
|
||||
vhd.header = item.header
|
||||
break
|
||||
case 'parentLocator':
|
||||
await vhd.writeParentLocator({ ...item, data: item.buffer })
|
||||
updateSums(await vhd.writeParentLocator({ ...item, data: item.buffer }))
|
||||
break
|
||||
case 'block':
|
||||
await vhd.writeEntireBlock(item)
|
||||
updateSums(await vhd.writeEntireBlock(item))
|
||||
break
|
||||
case 'bat':
|
||||
// it exists but I don't care
|
||||
@@ -36,9 +46,18 @@ const buildVhd = Disposable.wrap(async function* (handler, path, inputStream, {
|
||||
{
|
||||
concurrency,
|
||||
}
|
||||
)(await Promise.all([vhd.writeFooter(), vhd.writeHeader(), vhd.writeBlockAllocationTable()])).forEach(
|
||||
([footer, header, bat]) => {
|
||||
updateSums(footer)
|
||||
updateSums(header)
|
||||
updateSums(bat)
|
||||
}
|
||||
)
|
||||
await Promise.all([vhd.writeFooter(), vhd.writeHeader(), vhd.writeBlockAllocationTable()])
|
||||
return vhd.streamSize()
|
||||
const vhdSize = vhd.streamSize()
|
||||
return {
|
||||
...sizes,
|
||||
vhdSize,
|
||||
}
|
||||
})
|
||||
|
||||
exports.createVhdDirectoryFromStream = async function createVhdDirectoryFromStream(
|
||||
@@ -48,11 +67,11 @@ exports.createVhdDirectoryFromStream = async function createVhdDirectoryFromStre
|
||||
{ validator, concurrency = 16, compression } = {}
|
||||
) {
|
||||
try {
|
||||
const size = await buildVhd(handler, path, inputStream, { concurrency, compression })
|
||||
const sizes = await buildVhd(handler, path, inputStream, { concurrency, compression })
|
||||
if (validator !== undefined) {
|
||||
await validator.call(this, path)
|
||||
}
|
||||
return size
|
||||
return sizes
|
||||
} catch (error) {
|
||||
// cleanup on error
|
||||
await handler.rmtree(path).catch(warn)
|
||||
|
||||
@@ -142,9 +142,11 @@ const COLUMNS = [
|
||||
return
|
||||
}
|
||||
if (operationTask.message === 'transfer' && vmTransferSize === undefined) {
|
||||
// @todo handle if size is an object
|
||||
vmTransferSize = operationTask.result?.size
|
||||
}
|
||||
if (operationTask.message === 'merge' && vmMergeSize === undefined) {
|
||||
// @todo handle if size is an object
|
||||
vmMergeSize = operationTask.result?.size
|
||||
}
|
||||
|
||||
|
||||
@@ -330,6 +330,7 @@ const SrTask = ({ children, className, task }) => (
|
||||
)
|
||||
|
||||
const TransferMergeTask = ({ className, task }) => {
|
||||
// @todo : handle case when size is an object
|
||||
const size = defined(() => task.result.size, 0)
|
||||
if (task.status === 'success' && size === 0 && task.warnings?.length === 0) {
|
||||
return null
|
||||
|
||||
Reference in New Issue
Block a user