feat(fs/s3): compute sensible chunk size for uploads
This commit is contained in:
parent
5048485a85
commit
37b2113763
@ -681,11 +681,13 @@ export class RemoteAdapter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async outputStream(path, input, { checksum = true, validator = noop } = {}) {
|
async outputStream(path, input, { checksum = true, maxStreamLength, streamLength, validator = noop } = {}) {
|
||||||
const container = watchStreamSize(input)
|
const container = watchStreamSize(input)
|
||||||
await this._handler.outputStream(path, input, {
|
await this._handler.outputStream(path, input, {
|
||||||
checksum,
|
checksum,
|
||||||
dirMode: this._dirMode,
|
dirMode: this._dirMode,
|
||||||
|
maxStreamLength,
|
||||||
|
streamLength,
|
||||||
async validator() {
|
async validator() {
|
||||||
await input.task
|
await input.task
|
||||||
return validator.apply(this, arguments)
|
return validator.apply(this, arguments)
|
||||||
|
@ -29,6 +29,8 @@ export const FullRemote = class FullRemoteVmBackupRunner extends AbstractRemote
|
|||||||
writer =>
|
writer =>
|
||||||
writer.run({
|
writer.run({
|
||||||
stream: forkStreamUnpipe(stream),
|
stream: forkStreamUnpipe(stream),
|
||||||
|
// stream will be forked and transformed, it's not safe to attach additionnal properties to it
|
||||||
|
streamLength: stream.length,
|
||||||
timestamp: metadata.timestamp,
|
timestamp: metadata.timestamp,
|
||||||
vm: metadata.vm,
|
vm: metadata.vm,
|
||||||
vmSnapshot: metadata.vmSnapshot,
|
vmSnapshot: metadata.vmSnapshot,
|
||||||
|
@ -35,13 +35,23 @@ export const FullXapi = class FullXapiVmBackupRunner extends AbstractXapi {
|
|||||||
useSnapshot: false,
|
useSnapshot: false,
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const vdis = await exportedVm.$getDisks()
|
||||||
|
let maxStreamLength = 1024 * 1024 // Ovf file and tar headers are a few KB, let's stay safe
|
||||||
|
for (const vdiRef of vdis) {
|
||||||
|
const vdi = await this._xapi.getRecord(vdiRef)
|
||||||
|
// at most the xva will take the physical usage of the disk
|
||||||
|
// the resulting stream can be smaller due to the smaller block size for xva than vhd, and compression of xcp-ng
|
||||||
|
maxStreamLength += vdi.physical_utilisation
|
||||||
|
}
|
||||||
|
|
||||||
const sizeContainer = watchStreamSize(stream)
|
const sizeContainer = watchStreamSize(stream)
|
||||||
|
|
||||||
const timestamp = Date.now()
|
const timestamp = Date.now()
|
||||||
|
|
||||||
await this._callWriters(
|
await this._callWriters(
|
||||||
writer =>
|
writer =>
|
||||||
writer.run({
|
writer.run({
|
||||||
|
maxStreamLength,
|
||||||
sizeContainer,
|
sizeContainer,
|
||||||
stream: forkStreamUnpipe(stream),
|
stream: forkStreamUnpipe(stream),
|
||||||
timestamp,
|
timestamp,
|
||||||
|
@ -24,7 +24,7 @@ export class FullRemoteWriter extends MixinRemoteWriter(AbstractFullWriter) {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
async _run({ timestamp, sizeContainer, stream, vm, vmSnapshot }) {
|
async _run({ maxStreamLength, timestamp, sizeContainer, stream, streamLength, vm, vmSnapshot }) {
|
||||||
const settings = this._settings
|
const settings = this._settings
|
||||||
const job = this._job
|
const job = this._job
|
||||||
const scheduleId = this._scheduleId
|
const scheduleId = this._scheduleId
|
||||||
@ -65,6 +65,8 @@ export class FullRemoteWriter extends MixinRemoteWriter(AbstractFullWriter) {
|
|||||||
|
|
||||||
await Task.run({ name: 'transfer' }, async () => {
|
await Task.run({ name: 'transfer' }, async () => {
|
||||||
await adapter.outputStream(dataFilename, stream, {
|
await adapter.outputStream(dataFilename, stream, {
|
||||||
|
maxStreamLength,
|
||||||
|
streamLength,
|
||||||
validator: tmpPath => adapter.isValidXva(tmpPath),
|
validator: tmpPath => adapter.isValidXva(tmpPath),
|
||||||
})
|
})
|
||||||
return { size: sizeContainer.size }
|
return { size: sizeContainer.size }
|
||||||
|
@ -1,9 +1,9 @@
|
|||||||
import { AbstractWriter } from './_AbstractWriter.mjs'
|
import { AbstractWriter } from './_AbstractWriter.mjs'
|
||||||
|
|
||||||
export class AbstractFullWriter extends AbstractWriter {
|
export class AbstractFullWriter extends AbstractWriter {
|
||||||
async run({ timestamp, sizeContainer, stream, vm, vmSnapshot }) {
|
async run({ maxStreamLength, timestamp, sizeContainer, stream, streamLength, vm, vmSnapshot }) {
|
||||||
try {
|
try {
|
||||||
return await this._run({ timestamp, sizeContainer, stream, vm, vmSnapshot })
|
return await this._run({ maxStreamLength, timestamp, sizeContainer, stream, streamLength, vm, vmSnapshot })
|
||||||
} finally {
|
} finally {
|
||||||
// ensure stream is properly closed
|
// ensure stream is properly closed
|
||||||
stream.destroy()
|
stream.destroy()
|
||||||
|
@ -189,7 +189,7 @@ export default class RemoteHandlerAbstract {
|
|||||||
* @param {number} [options.dirMode]
|
* @param {number} [options.dirMode]
|
||||||
* @param {(this: RemoteHandlerAbstract, path: string) => Promise<undefined>} [options.validator] Function that will be called before the data is commited to the remote, if it fails, file should not exist
|
* @param {(this: RemoteHandlerAbstract, path: string) => Promise<undefined>} [options.validator] Function that will be called before the data is commited to the remote, if it fails, file should not exist
|
||||||
*/
|
*/
|
||||||
async outputStream(path, input, { checksum = true, dirMode, validator } = {}) {
|
async outputStream(path, input, { checksum = true, dirMode, maxStreamLength, streamLength, validator } = {}) {
|
||||||
path = normalizePath(path)
|
path = normalizePath(path)
|
||||||
let checksumStream
|
let checksumStream
|
||||||
|
|
||||||
@ -201,6 +201,8 @@ export default class RemoteHandlerAbstract {
|
|||||||
}
|
}
|
||||||
await this._outputStream(path, input, {
|
await this._outputStream(path, input, {
|
||||||
dirMode,
|
dirMode,
|
||||||
|
maxStreamLength,
|
||||||
|
streamLength,
|
||||||
validator,
|
validator,
|
||||||
})
|
})
|
||||||
if (checksum) {
|
if (checksum) {
|
||||||
|
@ -31,6 +31,8 @@ import { pRetry } from 'promise-toolbox'
|
|||||||
|
|
||||||
// limits: https://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html
|
// limits: https://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html
|
||||||
const MAX_PART_SIZE = 1024 * 1024 * 1024 * 5 // 5GB
|
const MAX_PART_SIZE = 1024 * 1024 * 1024 * 5 // 5GB
|
||||||
|
const MAX_PART_NUMBER = 10000
|
||||||
|
const MIN_PART_SIZE = 5 * 1024 * 1024
|
||||||
const { warn } = createLogger('xo:fs:s3')
|
const { warn } = createLogger('xo:fs:s3')
|
||||||
|
|
||||||
export default class S3Handler extends RemoteHandlerAbstract {
|
export default class S3Handler extends RemoteHandlerAbstract {
|
||||||
@ -221,15 +223,20 @@ export default class S3Handler extends RemoteHandlerAbstract {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async _outputStream(path, input, { validator }) {
|
async _outputStream(path, input, { streamLength, maxStreamLength = streamLength, validator }) {
|
||||||
// S3 storage is limited to 10K part, each part is limited to 5GB. And the total upload must be smaller than 5TB
|
// S3 storage is limited to 10K part, each part is limited to 5GB. And the total upload must be smaller than 5TB
|
||||||
// a bigger partSize increase the memory consumption of aws/lib-storage exponentially
|
// a bigger partSize increase the memory consumption of aws/lib-storage exponentially
|
||||||
const MAX_PART = 10000
|
let partSize
|
||||||
const PART_SIZE = 5 * 1024 * 1024
|
if (maxStreamLength === undefined) {
|
||||||
const MAX_SIZE = MAX_PART * PART_SIZE
|
warn(`Writing ${path} to a S3 remote without a max size set will cut it to 50GB`, { path })
|
||||||
|
partSize = MIN_PART_SIZE // min size for S3
|
||||||
|
} else {
|
||||||
|
partSize = Math.min(Math.max(Math.ceil(maxStreamLength / MAX_PART_NUMBER), MIN_PART_SIZE), MAX_PART_SIZE)
|
||||||
|
}
|
||||||
|
|
||||||
// ensure we don't try to upload a stream to big for this partSize
|
// ensure we don't try to upload a stream to big for this partSize
|
||||||
let readCounter = 0
|
let readCounter = 0
|
||||||
|
const MAX_SIZE = MAX_PART_NUMBER * partSize
|
||||||
const streamCutter = new Transform({
|
const streamCutter = new Transform({
|
||||||
transform(chunk, encoding, callback) {
|
transform(chunk, encoding, callback) {
|
||||||
readCounter += chunk.length
|
readCounter += chunk.length
|
||||||
@ -252,7 +259,7 @@ export default class S3Handler extends RemoteHandlerAbstract {
|
|||||||
...this.#createParams(path),
|
...this.#createParams(path),
|
||||||
Body,
|
Body,
|
||||||
},
|
},
|
||||||
partSize: PART_SIZE,
|
partSize,
|
||||||
leavePartsOnError: false,
|
leavePartsOnError: false,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user