Compare commits
2 Commits
improveFor
...
feat_sizes
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d4ca0da8b9 | ||
|
|
486d50f2f1 |
@@ -669,7 +669,7 @@ class RemoteAdapter {
|
|||||||
const handler = this._handler
|
const handler = this._handler
|
||||||
if (this.useVhdDirectory()) {
|
if (this.useVhdDirectory()) {
|
||||||
const dataPath = `${dirname(path)}/data/${uuidv4()}.vhd`
|
const dataPath = `${dirname(path)}/data/${uuidv4()}.vhd`
|
||||||
const size = await createVhdDirectoryFromStream(handler, dataPath, input, {
|
const sizes = await createVhdDirectoryFromStream(handler, dataPath, input, {
|
||||||
concurrency: writeBlockConcurrency,
|
concurrency: writeBlockConcurrency,
|
||||||
compression: this.#getCompressionType(),
|
compression: this.#getCompressionType(),
|
||||||
async validator() {
|
async validator() {
|
||||||
@@ -678,9 +678,14 @@ class RemoteAdapter {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
await VhdAbstract.createAlias(handler, path, dataPath)
|
await VhdAbstract.createAlias(handler, path, dataPath)
|
||||||
return size
|
return sizes
|
||||||
} else {
|
} else {
|
||||||
return this.outputStream(path, input, { checksum, validator })
|
const size = this.outputStream(path, input, { checksum, validator })
|
||||||
|
return {
|
||||||
|
compressedSize: size,
|
||||||
|
sourceSize: size,
|
||||||
|
writtenSize: size,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -3,7 +3,7 @@
|
|||||||
const sum = require('lodash/sum')
|
const sum = require('lodash/sum')
|
||||||
const UUID = require('uuid')
|
const UUID = require('uuid')
|
||||||
const { asyncMap } = require('@xen-orchestra/async-map')
|
const { asyncMap } = require('@xen-orchestra/async-map')
|
||||||
const { Constants, openVhd, VhdAbstract, VhdFile } = require('vhd-lib')
|
const { Constants, openVhd, VhdAbstract } = require('vhd-lib')
|
||||||
const { isVhdAlias, resolveVhdAlias } = require('vhd-lib/aliases')
|
const { isVhdAlias, resolveVhdAlias } = require('vhd-lib/aliases')
|
||||||
const { dirname, resolve } = require('path')
|
const { dirname, resolve } = require('path')
|
||||||
const { DISK_TYPES } = Constants
|
const { DISK_TYPES } = Constants
|
||||||
@@ -15,24 +15,14 @@ const { Task } = require('./Task.js')
|
|||||||
const { Disposable } = require('promise-toolbox')
|
const { Disposable } = require('promise-toolbox')
|
||||||
const handlerPath = require('@xen-orchestra/fs/path')
|
const handlerPath = require('@xen-orchestra/fs/path')
|
||||||
|
|
||||||
// checking the size of a vhd directory is costly
|
|
||||||
// 1 Http Query per 1000 blocks
|
|
||||||
// we only check size of all the vhd are VhdFiles
|
|
||||||
function shouldComputeVhdsSize(handler, vhds) {
|
|
||||||
if (handler.isEncrypted) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return vhds.every(vhd => vhd instanceof VhdFile)
|
|
||||||
}
|
|
||||||
|
|
||||||
const computeVhdsSize = (handler, vhdPaths) =>
|
const computeVhdsSize = (handler, vhdPaths) =>
|
||||||
Disposable.use(
|
Disposable.use(
|
||||||
vhdPaths.map(vhdPath => openVhd(handler, vhdPath)),
|
vhdPaths.map(vhdPath => openVhd(handler, vhdPath)),
|
||||||
async vhds => {
|
async vhds => {
|
||||||
if (shouldComputeVhdsSize(handler, vhds)) {
|
await Promise.all(vhds.map(vhd => vhd.readBlockAllocationTable()))
|
||||||
const sizes = await asyncMap(vhds, vhd => vhd.getSize())
|
// get file size for vhdfile, computed size from bat for vhd directory
|
||||||
return sum(sizes)
|
const sizes = await asyncMap(vhds, vhd => vhd.streamSize())
|
||||||
}
|
return sum(sizes)
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -534,11 +524,6 @@ exports.cleanVm = async function cleanVm(
|
|||||||
const linkedVhds = Object.keys(vhds).map(key => resolve('/', vmDir, vhds[key]))
|
const linkedVhds = Object.keys(vhds).map(key => resolve('/', vmDir, vhds[key]))
|
||||||
fileSystemSize = await computeVhdsSize(handler, linkedVhds)
|
fileSystemSize = await computeVhdsSize(handler, linkedVhds)
|
||||||
|
|
||||||
// the size is not computed in some cases (e.g. VhdDirectory)
|
|
||||||
if (fileSystemSize === undefined) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// don't warn if the size has changed after a merge
|
// don't warn if the size has changed after a merge
|
||||||
if (!merged && fileSystemSize !== size) {
|
if (!merged && fileSystemSize !== size) {
|
||||||
// FIXME: figure out why it occurs so often and, once fixed, log the real problems with `logWarn`
|
// FIXME: figure out why it occurs so often and, once fixed, log the real problems with `logWarn`
|
||||||
@@ -556,6 +541,8 @@ exports.cleanVm = async function cleanVm(
|
|||||||
|
|
||||||
// systematically update size after a merge
|
// systematically update size after a merge
|
||||||
if ((merged || fixMetadata) && size !== fileSystemSize) {
|
if ((merged || fixMetadata) && size !== fileSystemSize) {
|
||||||
|
// @todo add a cumulatedTransferSize property ?
|
||||||
|
// @todo update writtenSize, compressedSize
|
||||||
metadata.size = fileSystemSize
|
metadata.size = fileSystemSize
|
||||||
mustRegenerateCache = true
|
mustRegenerateCache = true
|
||||||
try {
|
try {
|
||||||
|
|||||||
@@ -205,7 +205,7 @@ class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrementalWrite
|
|||||||
// TODO remove when this has been done before the export
|
// TODO remove when this has been done before the export
|
||||||
await checkVhd(handler, parentPath)
|
await checkVhd(handler, parentPath)
|
||||||
}
|
}
|
||||||
|
// @todo : sum per property
|
||||||
transferSize += await adapter.writeVhd(path, deltaExport.streams[`${id}.vhd`], {
|
transferSize += await adapter.writeVhd(path, deltaExport.streams[`${id}.vhd`], {
|
||||||
// no checksum for VHDs, because they will be invalidated by
|
// no checksum for VHDs, because they will be invalidated by
|
||||||
// merges and chainings
|
// merges and chainings
|
||||||
@@ -232,7 +232,7 @@ class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrementalWrite
|
|||||||
|
|
||||||
return { size: transferSize }
|
return { size: transferSize }
|
||||||
})
|
})
|
||||||
metadataContent.size = size
|
metadataContent.size = size // @todo: transferSize
|
||||||
this._metadataFileName = await adapter.writeVmBackupMetadata(vm.uuid, metadataContent)
|
this._metadataFileName = await adapter.writeVmBackupMetadata(vm.uuid, metadataContent)
|
||||||
|
|
||||||
// TODO: run cleanup?
|
// TODO: run cleanup?
|
||||||
|
|||||||
@@ -168,7 +168,12 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
|
|||||||
// in case of VhdDirectory, we want to create the file if it does not exists
|
// in case of VhdDirectory, we want to create the file if it does not exists
|
||||||
const flags = this._opts?.flags === 'r+' ? 'w' : this._opts?.flags
|
const flags = this._opts?.flags === 'r+' ? 'w' : this._opts?.flags
|
||||||
const compressed = await this.#compressor.compress(buffer)
|
const compressed = await this.#compressor.compress(buffer)
|
||||||
return this._handler.outputFile(this.#getChunkPath(partName), compressed, { flags })
|
const writtenSize = await this._handler.outputFile(this.#getChunkPath(partName), compressed, { flags })
|
||||||
|
return {
|
||||||
|
size: buffer.length,
|
||||||
|
compressedSize: compressed.length,
|
||||||
|
writtenSize,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// put block in subdirectories to limit impact when doing directory listing
|
// put block in subdirectories to limit impact when doing directory listing
|
||||||
@@ -228,7 +233,7 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
|
|||||||
footer.checksum = checksumStruct(rawFooter, fuFooter)
|
footer.checksum = checksumStruct(rawFooter, fuFooter)
|
||||||
debug(`Write footer (checksum=${footer.checksum}). (data=${rawFooter.toString('hex')})`)
|
debug(`Write footer (checksum=${footer.checksum}). (data=${rawFooter.toString('hex')})`)
|
||||||
|
|
||||||
await this._writeChunk('footer', rawFooter)
|
return await this._writeChunk('footer', rawFooter)
|
||||||
}
|
}
|
||||||
|
|
||||||
async writeHeader() {
|
async writeHeader() {
|
||||||
@@ -236,8 +241,9 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
|
|||||||
const rawHeader = fuHeader.pack(header)
|
const rawHeader = fuHeader.pack(header)
|
||||||
header.checksum = checksumStruct(rawHeader, fuHeader)
|
header.checksum = checksumStruct(rawHeader, fuHeader)
|
||||||
debug(`Write header (checksum=${header.checksum}). (data=${rawHeader.toString('hex')})`)
|
debug(`Write header (checksum=${header.checksum}). (data=${rawHeader.toString('hex')})`)
|
||||||
await this._writeChunk('header', rawHeader)
|
const sizes = await this._writeChunk('header', rawHeader)
|
||||||
await this.#writeChunkFilters()
|
await this.#writeChunkFilters()
|
||||||
|
return sizes
|
||||||
}
|
}
|
||||||
|
|
||||||
writeBlockAllocationTable() {
|
writeBlockAllocationTable() {
|
||||||
@@ -285,8 +291,9 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async writeEntireBlock(block) {
|
async writeEntireBlock(block) {
|
||||||
await this._writeChunk(this.#getBlockPath(block.id), block.buffer)
|
const sizes = await this._writeChunk(this.#getBlockPath(block.id), block.buffer)
|
||||||
setBitmap(this.#blockTable, block.id)
|
setBitmap(this.#blockTable, block.id)
|
||||||
|
return sizes
|
||||||
}
|
}
|
||||||
|
|
||||||
async _readParentLocatorData(id) {
|
async _readParentLocatorData(id) {
|
||||||
@@ -294,8 +301,9 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async _writeParentLocatorData(id, data) {
|
async _writeParentLocatorData(id, data) {
|
||||||
await this._writeChunk('parentLocatorEntry' + id, data)
|
const sizes = await this._writeChunk('parentLocatorEntry' + id, data)
|
||||||
this.header.parentLocatorEntry[id].platformDataOffset = 0
|
this.header.parentLocatorEntry[id].platformDataOffset = 0
|
||||||
|
return sizes
|
||||||
}
|
}
|
||||||
|
|
||||||
async #writeChunkFilters() {
|
async #writeChunkFilters() {
|
||||||
|
|||||||
@@ -463,7 +463,7 @@ exports.VhdFile = class VhdFile extends VhdAbstract {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async getSize() {
|
async streamSize() {
|
||||||
return await this._handler.getSize(this._path)
|
return await this._handler.getSize(this._path)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,6 +10,16 @@ const { warn } = createLogger('vhd-lib:createVhdDirectoryFromStream')
|
|||||||
|
|
||||||
const buildVhd = Disposable.wrap(async function* (handler, path, inputStream, { concurrency, compression }) {
|
const buildVhd = Disposable.wrap(async function* (handler, path, inputStream, { concurrency, compression }) {
|
||||||
const vhd = yield VhdDirectory.create(handler, path, { compression })
|
const vhd = yield VhdDirectory.create(handler, path, { compression })
|
||||||
|
const sizes = {
|
||||||
|
compressedSize: 0,
|
||||||
|
sourceSize: 0,
|
||||||
|
writtenSize: 0,
|
||||||
|
}
|
||||||
|
const updateSums = ({ writtenSize, compressedSize, sourceSize }) => {
|
||||||
|
sizes.writtenSize += writtenSize ?? 0
|
||||||
|
sizes.compressedSize += compressedSize ?? 0
|
||||||
|
sizes.sourceSize += sourceSize ?? 0
|
||||||
|
}
|
||||||
await asyncEach(
|
await asyncEach(
|
||||||
parseVhdStream(inputStream),
|
parseVhdStream(inputStream),
|
||||||
async function (item) {
|
async function (item) {
|
||||||
@@ -21,10 +31,10 @@ const buildVhd = Disposable.wrap(async function* (handler, path, inputStream, {
|
|||||||
vhd.header = item.header
|
vhd.header = item.header
|
||||||
break
|
break
|
||||||
case 'parentLocator':
|
case 'parentLocator':
|
||||||
await vhd.writeParentLocator({ ...item, data: item.buffer })
|
updateSums(await vhd.writeParentLocator({ ...item, data: item.buffer }))
|
||||||
break
|
break
|
||||||
case 'block':
|
case 'block':
|
||||||
await vhd.writeEntireBlock(item)
|
updateSums(await vhd.writeEntireBlock(item))
|
||||||
break
|
break
|
||||||
case 'bat':
|
case 'bat':
|
||||||
// it exists but I don't care
|
// it exists but I don't care
|
||||||
@@ -36,9 +46,18 @@ const buildVhd = Disposable.wrap(async function* (handler, path, inputStream, {
|
|||||||
{
|
{
|
||||||
concurrency,
|
concurrency,
|
||||||
}
|
}
|
||||||
|
)(await Promise.all([vhd.writeFooter(), vhd.writeHeader(), vhd.writeBlockAllocationTable()])).forEach(
|
||||||
|
([footer, header, bat]) => {
|
||||||
|
updateSums(footer)
|
||||||
|
updateSums(header)
|
||||||
|
updateSums(bat)
|
||||||
|
}
|
||||||
)
|
)
|
||||||
await Promise.all([vhd.writeFooter(), vhd.writeHeader(), vhd.writeBlockAllocationTable()])
|
const vhdSize = vhd.streamSize()
|
||||||
return vhd.streamSize()
|
return {
|
||||||
|
...sizes,
|
||||||
|
vhdSize,
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
exports.createVhdDirectoryFromStream = async function createVhdDirectoryFromStream(
|
exports.createVhdDirectoryFromStream = async function createVhdDirectoryFromStream(
|
||||||
@@ -48,11 +67,11 @@ exports.createVhdDirectoryFromStream = async function createVhdDirectoryFromStre
|
|||||||
{ validator, concurrency = 16, compression } = {}
|
{ validator, concurrency = 16, compression } = {}
|
||||||
) {
|
) {
|
||||||
try {
|
try {
|
||||||
const size = await buildVhd(handler, path, inputStream, { concurrency, compression })
|
const sizes = await buildVhd(handler, path, inputStream, { concurrency, compression })
|
||||||
if (validator !== undefined) {
|
if (validator !== undefined) {
|
||||||
await validator.call(this, path)
|
await validator.call(this, path)
|
||||||
}
|
}
|
||||||
return size
|
return sizes
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
// cleanup on error
|
// cleanup on error
|
||||||
await handler.rmtree(path).catch(warn)
|
await handler.rmtree(path).catch(warn)
|
||||||
|
|||||||
@@ -142,9 +142,11 @@ const COLUMNS = [
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
if (operationTask.message === 'transfer' && vmTransferSize === undefined) {
|
if (operationTask.message === 'transfer' && vmTransferSize === undefined) {
|
||||||
|
// @todo handle if size is an object
|
||||||
vmTransferSize = operationTask.result?.size
|
vmTransferSize = operationTask.result?.size
|
||||||
}
|
}
|
||||||
if (operationTask.message === 'merge' && vmMergeSize === undefined) {
|
if (operationTask.message === 'merge' && vmMergeSize === undefined) {
|
||||||
|
// @todo handle if size is an object
|
||||||
vmMergeSize = operationTask.result?.size
|
vmMergeSize = operationTask.result?.size
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -330,6 +330,7 @@ const SrTask = ({ children, className, task }) => (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const TransferMergeTask = ({ className, task }) => {
|
const TransferMergeTask = ({ className, task }) => {
|
||||||
|
// @todo : handle case when size is an object
|
||||||
const size = defined(() => task.result.size, 0)
|
const size = defined(() => task.result.size, 0)
|
||||||
if (task.status === 'success' && size === 0 && task.warnings?.length === 0) {
|
if (task.status === 'success' && size === 0 && task.warnings?.length === 0) {
|
||||||
return null
|
return null
|
||||||
|
|||||||
Reference in New Issue
Block a user