Compare commits

...

6 Commits

Author SHA1 Message Date
Florent BEAUCHAMP
4d10e261f8 feat(s3): compute sensible chunk size for s3 upload 2023-10-10 14:45:38 +02:00
Florent BEAUCHAMP
84252c3abe feat(fs/s3): compute md5 only when needed 2023-10-10 11:44:55 +02:00
Florent BEAUCHAMP
4fb48e01fa fix(@xen-orchestra/fs: compute md5 only when needed 2023-10-10 11:33:56 +02:00
Florent BEAUCHAMP
516fc3f6ff feat: object lock mode need content md5
also Put object part with a prevalculated md5 and size doesn'c consume additionnal memory against presigned + raw upload
2023-10-04 14:26:59 +02:00
Florent BEAUCHAMP
676851ea82 feat(s3): test upload without sdk 2023-10-02 14:49:21 +02:00
Florent BEAUCHAMP
a7a64f4281 fix(fs/s3): throw an error if upload >50GB 2023-10-02 14:31:01 +02:00
9 changed files with 336 additions and 15 deletions

View File

@@ -681,11 +681,13 @@ export class RemoteAdapter {
}
}
async outputStream(path, input, { checksum = true, validator = noop } = {}) {
async outputStream(path, input, { checksum = true, maxStreamLength, streamLength, validator = noop } = {}) {
const container = watchStreamSize(input)
await this._handler.outputStream(path, input, {
checksum,
dirMode: this._dirMode,
maxStreamLength,
streamLength,
async validator() {
await input.task
return validator.apply(this, arguments)
@@ -742,8 +744,15 @@ export class RemoteAdapter {
}
}
readFullVmBackup(metadata) {
return this._handler.createReadStream(resolve('/', dirname(metadata._filename), metadata.xva))
async readFullVmBackup(metadata) {
const xvaPath = resolve('/', dirname(metadata._filename), metadata.xva)
const stream = await this._handler.createReadStream(xvaPath)
try {
stream.length = await this._handler.getSize(xvaPath)
} catch (error) {
warn(`Can't compute length of xva file`, { xvaPath, error })
}
return stream
}
async readVmBackupMetadata(path) {

View File

@@ -29,6 +29,8 @@ export const FullRemote = class FullRemoteVmBackupRunner extends AbstractRemote
writer =>
writer.run({
stream: forkStreamUnpipe(stream),
// stream is copied and transformed, it's not safe to attach additionnal properties to it
streamLength: stream.length,
timestamp: metadata.timestamp,
vm: metadata.vm,
vmSnapshot: metadata.vmSnapshot,

View File

@@ -35,13 +35,22 @@ export const FullXapi = class FullXapiVmBackupRunner extends AbstractXapi {
useSnapshot: false,
})
)
const vdis = await exportedVm.$getDisks()
let maxStreamLength = 1024 * 1024 // Ovf file and tar headers are a few KB, let's stay safe
vdis.forEach(vdiRef => {
const vdi = this._xapi.getObject(vdiRef)
maxStreamLength += vdi.physical_utilisation ?? 0 // at most the xva will take the physical usage of the disk
// it can be smaller due to the smaller block size for xva than vhd, and compression of xcp-ng
})
const sizeContainer = watchStreamSize(stream)
const timestamp = Date.now()
await this._callWriters(
writer =>
writer.run({
maxStreamLength,
sizeContainer,
stream: forkStreamUnpipe(stream),
timestamp,

View File

@@ -24,7 +24,7 @@ export class FullRemoteWriter extends MixinRemoteWriter(AbstractFullWriter) {
)
}
async _run({ timestamp, sizeContainer, stream, vm, vmSnapshot }) {
async _run({ maxStreamLength, timestamp, sizeContainer, stream, streamLength, vm, vmSnapshot }) {
const settings = this._settings
const job = this._job
const scheduleId = this._scheduleId
@@ -65,6 +65,8 @@ export class FullRemoteWriter extends MixinRemoteWriter(AbstractFullWriter) {
await Task.run({ name: 'transfer' }, async () => {
await adapter.outputStream(dataFilename, stream, {
maxStreamLength,
streamLength,
validator: tmpPath => adapter.isValidXva(tmpPath),
})
return { size: sizeContainer.size }

View File

@@ -1,9 +1,9 @@
import { AbstractWriter } from './_AbstractWriter.mjs'
export class AbstractFullWriter extends AbstractWriter {
async run({ timestamp, sizeContainer, stream, vm, vmSnapshot }) {
async run({ maxStreamLength, timestamp, sizeContainer, stream, streamLength, vm, vmSnapshot }) {
try {
return await this._run({ timestamp, sizeContainer, stream, vm, vmSnapshot })
return await this._run({ maxStreamLength, timestamp, sizeContainer, stream, streamLength, vm, vmSnapshot })
} finally {
// ensure stream is properly closed
stream.destroy()

View File

@@ -25,6 +25,7 @@
"@aws-sdk/lib-storage": "^3.54.0",
"@aws-sdk/middleware-apply-body-checksum": "^3.58.0",
"@aws-sdk/node-http-handler": "^3.54.0",
"@aws-sdk/s3-request-presigner": "^3.421.0",
"@sindresorhus/df": "^3.1.1",
"@vates/async-each": "^1.0.0",
"@vates/coalesce-calls": "^0.1.0",

View File

@@ -189,7 +189,7 @@ export default class RemoteHandlerAbstract {
* @param {number} [options.dirMode]
* @param {(this: RemoteHandlerAbstract, path: string) => Promise<undefined>} [options.validator] Function that will be called before the data is commited to the remote, if it fails, file should not exist
*/
async outputStream(path, input, { checksum = true, dirMode, validator } = {}) {
async outputStream(path, input, { checksum = true, dirMode, maxStreamLength, streamLength, validator } = {}) {
path = normalizePath(path)
let checksumStream
@@ -201,6 +201,8 @@ export default class RemoteHandlerAbstract {
}
await this._outputStream(path, input, {
dirMode,
maxStreamLength,
streamLength,
validator,
})
if (checksum) {

View File

@@ -5,6 +5,7 @@ import {
CreateMultipartUploadCommand,
DeleteObjectCommand,
GetObjectCommand,
GetObjectLockConfigurationCommand,
HeadObjectCommand,
ListObjectsV2Command,
PutObjectCommand,
@@ -17,7 +18,7 @@ import { getApplyMd5BodyChecksumPlugin } from '@aws-sdk/middleware-apply-body-ch
import { Agent as HttpAgent } from 'http'
import { Agent as HttpsAgent } from 'https'
import { createLogger } from '@xen-orchestra/log'
import { PassThrough, pipeline } from 'stream'
import { PassThrough, Transform, pipeline } from 'stream'
import { parse } from 'xo-remote-parser'
import copyStreamToBuffer from './_copyStreamToBuffer.js'
import guessAwsRegion from './_guessAwsRegion.js'
@@ -30,6 +31,8 @@ import { pRetry } from 'promise-toolbox'
// limits: https://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html
const MAX_PART_SIZE = 1024 * 1024 * 1024 * 5 // 5GB
const MAX_PART_NUMBER = 10000
const MIN_PART_SIZE = 5 * 1024 * 1024
const { warn } = createLogger('xo:fs:s3')
export default class S3Handler extends RemoteHandlerAbstract {
@@ -71,9 +74,6 @@ export default class S3Handler extends RemoteHandlerAbstract {
}),
})
// Workaround for https://github.com/aws/aws-sdk-js-v3/issues/2673
this.#s3.middlewareStack.use(getApplyMd5BodyChecksumPlugin(this.#s3.config))
const parts = split(path)
this.#bucket = parts.shift()
this.#dir = join(...parts)
@@ -223,18 +223,41 @@ export default class S3Handler extends RemoteHandlerAbstract {
}
}
async _outputStream(path, input, { validator }) {
async _outputStream(path, input, { maxStreamLength, streamLength, validator }) {
const maxInputLength = streamLength ?? maxStreamLength
let partSize
if (maxInputLength === undefined) {
warn(`Writing ${path} to a S3 remote without a max size set will cut it to 50GB`, { path })
partSize = MIN_PART_SIZE // min size for S3
} else {
partSize = Math.min(Math.max(Math.ceil(maxInputLength / MAX_PART_NUMBER), MIN_PART_SIZE), MAX_PART_SIZE)
}
// esnure we d'ont try to upload a stream to big for this part size
let readCounter = 0
const streamCutter = new Transform({
transform(chunk, encoding, callback) {
const MAX_SIZE = MAX_PART_NUMBER * partSize
readCounter += chunk.length
if (readCounter > MAX_SIZE) {
callback(new Error(`read ${readCounter} bytes, maximum size allowed is ${MAX_SIZE} `))
} else {
callback(null, chunk)
}
},
})
// Workaround for "ReferenceError: ReadableStream is not defined"
// https://github.com/aws/aws-sdk-js-v3/issues/2522
const Body = new PassThrough()
pipeline(input, Body, () => {})
pipeline(input, streamCutter, Body, () => {})
const upload = new Upload({
client: this.#s3,
params: {
...this.#createParams(path),
Body,
},
partSize,
leavePartsOnError: false,
})
await upload.done()
@@ -418,6 +441,21 @@ export default class S3Handler extends RemoteHandlerAbstract {
async _closeFile(fd) {}
async _sync() {
await super._sync()
try {
const res = await this.#s3.send(new GetObjectLockConfigurationCommand({ Bucket: this.#bucket }))
if (res.ObjectLockConfiguration?.ObjectLockEnabled === 'Enabled') {
// Workaround for https://github.com/aws/aws-sdk-js-v3/issues/2673
// increase memory consumption in outputStream as if buffer the streams
this.#s3.middlewareStack.use(getApplyMd5BodyChecksumPlugin(this.#s3.config))
}
} catch (error) {
if (error.Code !== 'ObjectLockConfigurationNotFoundError') {
throw error
}
}
}
useVhdDirectory() {
return true
}

View File

@@ -0,0 +1,258 @@
import fs from 'fs/promises'
import { getSignedUrl } from "@aws-sdk/s3-request-presigner"
import { createHash } from "crypto";
import {
CompleteMultipartUploadCommand,
CreateMultipartUploadCommand,
GetObjectLockConfigurationCommand,
PutObjectCommand,
S3Client,
UploadPartCommand,
} from '@aws-sdk/client-s3'
import { NodeHttpHandler } from '@aws-sdk/node-http-handler'
import { Agent as HttpAgent } from 'http'
import { Agent as HttpsAgent } from 'https'
import { parse } from 'xo-remote-parser'
import { join, split } from './dist/path.js'
import guessAwsRegion from './dist/_guessAwsRegion.js'
import { PassThrough } from 'stream'
import { readChunk } from '@vates/read-chunk'
import { pFromCallback } from 'promise-toolbox'
async function v2(url, inputStream){
const {
allowUnauthorized,
host,
path,
username,
password,
protocol,
region = guessAwsRegion(host),
} = parse(url)
const client = new S3Client({
apiVersion: '2006-03-01',
endpoint: `${protocol}://s3.us-east-2.amazonaws.com`,
forcePathStyle: true,
credentials: {
accessKeyId: username,
secretAccessKey: password,
},
region,
requestHandler: new NodeHttpHandler({
socketTimeout: 600000,
httpAgent: new HttpAgent({
keepAlive: true,
}),
httpsAgent: new HttpsAgent({
rejectUnauthorized: !allowUnauthorized,
keepAlive: true,
}),
}),
})
const pathParts = split(path)
const bucket = pathParts.shift()
const dir = join(...pathParts)
const command = new CreateMultipartUploadCommand({
Bucket: bucket, Key: join(dir, 'flov2')
})
const multipart = await client.send(command)
console.log({multipart})
const parts = []
// monitor memory usage
const intervalMonitorMemoryUsage = setInterval(()=>console.log(Math.round(process.memoryUsage().rss/1024/1024)), 2000)
const CHUNK_SIZE = Math.ceil(5*1024*1024*1024*1024/10000) // smallest chunk allowing 5TB upload
async function read(inputStream, maxReadSize){
if(maxReadSize === 0){
return null
}
process.stdout.write('+')
const chunk = await readChunk(inputStream, maxReadSize)
process.stdout.write('@')
return chunk
}
async function write(data, chunkStream, remainingBytes){
const ready = chunkStream.write(data)
if(!ready){
process.stdout.write('.')
await pFromCallback(cb=> chunkStream.once('drain', cb))
process.stdout.write('@')
}
remainingBytes -= data.length
process.stdout.write(remainingBytes+' ')
return remainingBytes
}
async function uploadChunk(inputStream){
const PartNumber = parts.length +1
let done = false
let remainingBytes = CHUNK_SIZE
const maxChunkPartSize = Math.round(CHUNK_SIZE / 1000)
const chunkStream = new PassThrough()
console.log({maxChunkPartSize,CHUNK_SIZE})
let data
let chunkBuffer = []
const hash = createHash('md5');
try{
while((data = await read(inputStream, Math.min(remainingBytes, maxChunkPartSize))) !== null){
chunkBuffer.push(data)
hash.update(data)
remainingBytes -= data.length
//remainingBytes = await write(data, chunkStream, remainingBytes)
}
console.log('data put')
const fullBuffer = Buffer.alloc(maxChunkPartSize,0)
done = remainingBytes > 0
// add padding at the end of the file (not a problem for tar like : xva/ova)
// if not content length will not match and we'll have UND_ERR_REQ_CONTENT_LENGTH_MISMATCH error
console.log('full padding')
while(remainingBytes > maxChunkPartSize){
chunkBuffer.push(fullBuffer)
hash.update(fullBuffer)
remainingBytes -= maxChunkPartSize
//remainingBytes = await write(fullBuffer,chunkStream, remainingBytes)
}
console.log('full padding done ')
chunkBuffer.push(Buffer.alloc(remainingBytes,0))
hash.update(Buffer.alloc(remainingBytes,0))
console.log('md5 ok ')
//await write(Buffer.alloc(remainingBytes,0),chunkStream, remainingBytes)
// wait for the end of the upload
const command = new UploadPartCommand({
...multipart,
PartNumber,
ContentLength:CHUNK_SIZE,
Body: chunkStream,
ContentMD5 : hash.digest('base64')
})
const promise = client.send(command)
for (const buffer of chunkBuffer){
await write(buffer, chunkStream, remainingBytes)
}
chunkStream.on('error', err => console.error(err))
const res = await promise
console.log({res, headers : res.headers })
parts.push({ ETag:/*res.headers.get('etag') */res.ETag, PartNumber })
}catch(err){
console.error(err)
throw err
}
return done
}
while(!await uploadChunk(inputStream)){
console.log('uploaded one chunk', parts.length)
}
// mark the upload as complete and ask s3 to glue the chunk together
const completRes = await client.send(
new CompleteMultipartUploadCommand({
...multipart,
MultipartUpload: { Parts: parts },
})
)
console.log({completRes})
clearInterval(intervalMonitorMemoryUsage)
}
async function simplePut(url , inputStream){
const {
allowUnauthorized,
host,
path,
username,
password,
protocol,
region = guessAwsRegion(host),
} = parse(url)
const client = new S3Client({
apiVersion: '2006-03-01',
endpoint: `${protocol}://s3.us-east-2.amazonaws.com`,
forcePathStyle: true,
credentials: {
accessKeyId: username,
secretAccessKey: password,
},
region,
requestHandler: new NodeHttpHandler({
socketTimeout: 600000,
httpAgent: new HttpAgent({
keepAlive: true,
}),
httpsAgent: new HttpsAgent({
rejectUnauthorized: !allowUnauthorized,
keepAlive: true,
}),
}),
})
const pathParts = split(path)
const bucket = pathParts.shift()
const dir = join(...pathParts)
//const hasObjectLock = await client.send(new GetObjectLockConfigurationCommand({Bucket: bucket}))
//console.log(hasObjectLock.ObjectLockConfiguration?.ObjectLockEnabled === 'Enabled')
const md5 = await createMD5('/tmp/1g')
console.log({md5})
const command = new PutObjectCommand({
Bucket: bucket, Key: join(dir, 'simple'),
ContentMD5: md5,
ContentLength: 1024*1024*1024,
Body: inputStream
})
const intervalMonitorMemoryUsage = setInterval(()=>console.log(Math.round(process.memoryUsage().rss/1024/1024)), 2000)
const res = await client.send(command)
/*
const presignedUrl = await getSignedUrl(client, command,{ expiresIn: 3600 });
const res = await fetch(presignedUrl, {
method: 'PUT',
body:inputStream,
duplex: "half",
headers:{
"x-amz-decoded-content-length": 1024*1024*1024,
"content-md5" : md5
}
})*/
clearInterval(intervalMonitorMemoryUsage)
console.log(res)
}
async function createMD5(filePath) {
const input = await fs.open(filePath) // big ass file
return new Promise((res, rej) => {
const hash = createHash('md5');
const rStream = input.createReadStream(filePath);
rStream.on('data', (data) => {
hash.update(data);
});
rStream.on('end', () => {
res(hash.digest('base64'));
});
})
}
const input = await fs.open('/tmp/1g') // big ass file
const inputStream = input.createReadStream()
const remoteUrl = ""
v2(remoteUrl,inputStream)
//simplePut(remoteUrl,inputStream)