Compare commits
6 Commits
test
...
feat_s3_st
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4d10e261f8 | ||
|
|
84252c3abe | ||
|
|
4fb48e01fa | ||
|
|
516fc3f6ff | ||
|
|
676851ea82 | ||
|
|
a7a64f4281 |
@@ -681,11 +681,13 @@ export class RemoteAdapter {
|
||||
}
|
||||
}
|
||||
|
||||
async outputStream(path, input, { checksum = true, validator = noop } = {}) {
|
||||
async outputStream(path, input, { checksum = true, maxStreamLength, streamLength, validator = noop } = {}) {
|
||||
const container = watchStreamSize(input)
|
||||
await this._handler.outputStream(path, input, {
|
||||
checksum,
|
||||
dirMode: this._dirMode,
|
||||
maxStreamLength,
|
||||
streamLength,
|
||||
async validator() {
|
||||
await input.task
|
||||
return validator.apply(this, arguments)
|
||||
@@ -742,8 +744,15 @@ export class RemoteAdapter {
|
||||
}
|
||||
}
|
||||
|
||||
readFullVmBackup(metadata) {
|
||||
return this._handler.createReadStream(resolve('/', dirname(metadata._filename), metadata.xva))
|
||||
async readFullVmBackup(metadata) {
|
||||
const xvaPath = resolve('/', dirname(metadata._filename), metadata.xva)
|
||||
const stream = await this._handler.createReadStream(xvaPath)
|
||||
try {
|
||||
stream.length = await this._handler.getSize(xvaPath)
|
||||
} catch (error) {
|
||||
warn(`Can't compute length of xva file`, { xvaPath, error })
|
||||
}
|
||||
return stream
|
||||
}
|
||||
|
||||
async readVmBackupMetadata(path) {
|
||||
|
||||
@@ -29,6 +29,8 @@ export const FullRemote = class FullRemoteVmBackupRunner extends AbstractRemote
|
||||
writer =>
|
||||
writer.run({
|
||||
stream: forkStreamUnpipe(stream),
|
||||
// stream is copied and transformed, it's not safe to attach additionnal properties to it
|
||||
streamLength: stream.length,
|
||||
timestamp: metadata.timestamp,
|
||||
vm: metadata.vm,
|
||||
vmSnapshot: metadata.vmSnapshot,
|
||||
|
||||
@@ -35,13 +35,22 @@ export const FullXapi = class FullXapiVmBackupRunner extends AbstractXapi {
|
||||
useSnapshot: false,
|
||||
})
|
||||
)
|
||||
|
||||
const vdis = await exportedVm.$getDisks()
|
||||
let maxStreamLength = 1024 * 1024 // Ovf file and tar headers are a few KB, let's stay safe
|
||||
vdis.forEach(vdiRef => {
|
||||
const vdi = this._xapi.getObject(vdiRef)
|
||||
maxStreamLength += vdi.physical_utilisation ?? 0 // at most the xva will take the physical usage of the disk
|
||||
// it can be smaller due to the smaller block size for xva than vhd, and compression of xcp-ng
|
||||
})
|
||||
|
||||
const sizeContainer = watchStreamSize(stream)
|
||||
|
||||
const timestamp = Date.now()
|
||||
|
||||
await this._callWriters(
|
||||
writer =>
|
||||
writer.run({
|
||||
maxStreamLength,
|
||||
sizeContainer,
|
||||
stream: forkStreamUnpipe(stream),
|
||||
timestamp,
|
||||
|
||||
@@ -24,7 +24,7 @@ export class FullRemoteWriter extends MixinRemoteWriter(AbstractFullWriter) {
|
||||
)
|
||||
}
|
||||
|
||||
async _run({ timestamp, sizeContainer, stream, vm, vmSnapshot }) {
|
||||
async _run({ maxStreamLength, timestamp, sizeContainer, stream, streamLength, vm, vmSnapshot }) {
|
||||
const settings = this._settings
|
||||
const job = this._job
|
||||
const scheduleId = this._scheduleId
|
||||
@@ -65,6 +65,8 @@ export class FullRemoteWriter extends MixinRemoteWriter(AbstractFullWriter) {
|
||||
|
||||
await Task.run({ name: 'transfer' }, async () => {
|
||||
await adapter.outputStream(dataFilename, stream, {
|
||||
maxStreamLength,
|
||||
streamLength,
|
||||
validator: tmpPath => adapter.isValidXva(tmpPath),
|
||||
})
|
||||
return { size: sizeContainer.size }
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
import { AbstractWriter } from './_AbstractWriter.mjs'
|
||||
|
||||
export class AbstractFullWriter extends AbstractWriter {
|
||||
async run({ timestamp, sizeContainer, stream, vm, vmSnapshot }) {
|
||||
async run({ maxStreamLength, timestamp, sizeContainer, stream, streamLength, vm, vmSnapshot }) {
|
||||
try {
|
||||
return await this._run({ timestamp, sizeContainer, stream, vm, vmSnapshot })
|
||||
return await this._run({ maxStreamLength, timestamp, sizeContainer, stream, streamLength, vm, vmSnapshot })
|
||||
} finally {
|
||||
// ensure stream is properly closed
|
||||
stream.destroy()
|
||||
|
||||
@@ -25,6 +25,7 @@
|
||||
"@aws-sdk/lib-storage": "^3.54.0",
|
||||
"@aws-sdk/middleware-apply-body-checksum": "^3.58.0",
|
||||
"@aws-sdk/node-http-handler": "^3.54.0",
|
||||
"@aws-sdk/s3-request-presigner": "^3.421.0",
|
||||
"@sindresorhus/df": "^3.1.1",
|
||||
"@vates/async-each": "^1.0.0",
|
||||
"@vates/coalesce-calls": "^0.1.0",
|
||||
|
||||
@@ -189,7 +189,7 @@ export default class RemoteHandlerAbstract {
|
||||
* @param {number} [options.dirMode]
|
||||
* @param {(this: RemoteHandlerAbstract, path: string) => Promise<undefined>} [options.validator] Function that will be called before the data is commited to the remote, if it fails, file should not exist
|
||||
*/
|
||||
async outputStream(path, input, { checksum = true, dirMode, validator } = {}) {
|
||||
async outputStream(path, input, { checksum = true, dirMode, maxStreamLength, streamLength, validator } = {}) {
|
||||
path = normalizePath(path)
|
||||
let checksumStream
|
||||
|
||||
@@ -201,6 +201,8 @@ export default class RemoteHandlerAbstract {
|
||||
}
|
||||
await this._outputStream(path, input, {
|
||||
dirMode,
|
||||
maxStreamLength,
|
||||
streamLength,
|
||||
validator,
|
||||
})
|
||||
if (checksum) {
|
||||
|
||||
@@ -5,6 +5,7 @@ import {
|
||||
CreateMultipartUploadCommand,
|
||||
DeleteObjectCommand,
|
||||
GetObjectCommand,
|
||||
GetObjectLockConfigurationCommand,
|
||||
HeadObjectCommand,
|
||||
ListObjectsV2Command,
|
||||
PutObjectCommand,
|
||||
@@ -17,7 +18,7 @@ import { getApplyMd5BodyChecksumPlugin } from '@aws-sdk/middleware-apply-body-ch
|
||||
import { Agent as HttpAgent } from 'http'
|
||||
import { Agent as HttpsAgent } from 'https'
|
||||
import { createLogger } from '@xen-orchestra/log'
|
||||
import { PassThrough, pipeline } from 'stream'
|
||||
import { PassThrough, Transform, pipeline } from 'stream'
|
||||
import { parse } from 'xo-remote-parser'
|
||||
import copyStreamToBuffer from './_copyStreamToBuffer.js'
|
||||
import guessAwsRegion from './_guessAwsRegion.js'
|
||||
@@ -30,6 +31,8 @@ import { pRetry } from 'promise-toolbox'
|
||||
|
||||
// limits: https://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html
|
||||
const MAX_PART_SIZE = 1024 * 1024 * 1024 * 5 // 5GB
|
||||
const MAX_PART_NUMBER = 10000
|
||||
const MIN_PART_SIZE = 5 * 1024 * 1024
|
||||
const { warn } = createLogger('xo:fs:s3')
|
||||
|
||||
export default class S3Handler extends RemoteHandlerAbstract {
|
||||
@@ -71,9 +74,6 @@ export default class S3Handler extends RemoteHandlerAbstract {
|
||||
}),
|
||||
})
|
||||
|
||||
// Workaround for https://github.com/aws/aws-sdk-js-v3/issues/2673
|
||||
this.#s3.middlewareStack.use(getApplyMd5BodyChecksumPlugin(this.#s3.config))
|
||||
|
||||
const parts = split(path)
|
||||
this.#bucket = parts.shift()
|
||||
this.#dir = join(...parts)
|
||||
@@ -223,18 +223,41 @@ export default class S3Handler extends RemoteHandlerAbstract {
|
||||
}
|
||||
}
|
||||
|
||||
async _outputStream(path, input, { validator }) {
|
||||
async _outputStream(path, input, { maxStreamLength, streamLength, validator }) {
|
||||
const maxInputLength = streamLength ?? maxStreamLength
|
||||
let partSize
|
||||
if (maxInputLength === undefined) {
|
||||
warn(`Writing ${path} to a S3 remote without a max size set will cut it to 50GB`, { path })
|
||||
partSize = MIN_PART_SIZE // min size for S3
|
||||
} else {
|
||||
partSize = Math.min(Math.max(Math.ceil(maxInputLength / MAX_PART_NUMBER), MIN_PART_SIZE), MAX_PART_SIZE)
|
||||
}
|
||||
|
||||
// esnure we d'ont try to upload a stream to big for this part size
|
||||
let readCounter = 0
|
||||
const streamCutter = new Transform({
|
||||
transform(chunk, encoding, callback) {
|
||||
const MAX_SIZE = MAX_PART_NUMBER * partSize
|
||||
readCounter += chunk.length
|
||||
if (readCounter > MAX_SIZE) {
|
||||
callback(new Error(`read ${readCounter} bytes, maximum size allowed is ${MAX_SIZE} `))
|
||||
} else {
|
||||
callback(null, chunk)
|
||||
}
|
||||
},
|
||||
})
|
||||
// Workaround for "ReferenceError: ReadableStream is not defined"
|
||||
// https://github.com/aws/aws-sdk-js-v3/issues/2522
|
||||
const Body = new PassThrough()
|
||||
pipeline(input, Body, () => {})
|
||||
|
||||
pipeline(input, streamCutter, Body, () => {})
|
||||
const upload = new Upload({
|
||||
client: this.#s3,
|
||||
params: {
|
||||
...this.#createParams(path),
|
||||
Body,
|
||||
},
|
||||
partSize,
|
||||
leavePartsOnError: false,
|
||||
})
|
||||
|
||||
await upload.done()
|
||||
@@ -418,6 +441,21 @@ export default class S3Handler extends RemoteHandlerAbstract {
|
||||
|
||||
async _closeFile(fd) {}
|
||||
|
||||
async _sync() {
|
||||
await super._sync()
|
||||
try {
|
||||
const res = await this.#s3.send(new GetObjectLockConfigurationCommand({ Bucket: this.#bucket }))
|
||||
if (res.ObjectLockConfiguration?.ObjectLockEnabled === 'Enabled') {
|
||||
// Workaround for https://github.com/aws/aws-sdk-js-v3/issues/2673
|
||||
// increase memory consumption in outputStream as if buffer the streams
|
||||
this.#s3.middlewareStack.use(getApplyMd5BodyChecksumPlugin(this.#s3.config))
|
||||
}
|
||||
} catch (error) {
|
||||
if (error.Code !== 'ObjectLockConfigurationNotFoundError') {
|
||||
throw error
|
||||
}
|
||||
}
|
||||
}
|
||||
useVhdDirectory() {
|
||||
return true
|
||||
}
|
||||
|
||||
258
@xen-orchestra/fs/testupload.mjs
Normal file
258
@xen-orchestra/fs/testupload.mjs
Normal file
@@ -0,0 +1,258 @@
|
||||
import fs from 'fs/promises'
|
||||
import { getSignedUrl } from "@aws-sdk/s3-request-presigner"
|
||||
import { createHash } from "crypto";
|
||||
import {
|
||||
CompleteMultipartUploadCommand,
|
||||
CreateMultipartUploadCommand,
|
||||
GetObjectLockConfigurationCommand,
|
||||
PutObjectCommand,
|
||||
S3Client,
|
||||
UploadPartCommand,
|
||||
} from '@aws-sdk/client-s3'
|
||||
|
||||
import { NodeHttpHandler } from '@aws-sdk/node-http-handler'
|
||||
import { Agent as HttpAgent } from 'http'
|
||||
import { Agent as HttpsAgent } from 'https'
|
||||
import { parse } from 'xo-remote-parser'
|
||||
import { join, split } from './dist/path.js'
|
||||
|
||||
import guessAwsRegion from './dist/_guessAwsRegion.js'
|
||||
import { PassThrough } from 'stream'
|
||||
import { readChunk } from '@vates/read-chunk'
|
||||
import { pFromCallback } from 'promise-toolbox'
|
||||
|
||||
async function v2(url, inputStream){
|
||||
const {
|
||||
allowUnauthorized,
|
||||
host,
|
||||
path,
|
||||
username,
|
||||
password,
|
||||
protocol,
|
||||
region = guessAwsRegion(host),
|
||||
} = parse(url)
|
||||
const client = new S3Client({
|
||||
apiVersion: '2006-03-01',
|
||||
endpoint: `${protocol}://s3.us-east-2.amazonaws.com`,
|
||||
forcePathStyle: true,
|
||||
credentials: {
|
||||
accessKeyId: username,
|
||||
secretAccessKey: password,
|
||||
},
|
||||
region,
|
||||
requestHandler: new NodeHttpHandler({
|
||||
socketTimeout: 600000,
|
||||
httpAgent: new HttpAgent({
|
||||
keepAlive: true,
|
||||
}),
|
||||
httpsAgent: new HttpsAgent({
|
||||
rejectUnauthorized: !allowUnauthorized,
|
||||
keepAlive: true,
|
||||
}),
|
||||
}),
|
||||
})
|
||||
|
||||
const pathParts = split(path)
|
||||
const bucket = pathParts.shift()
|
||||
const dir = join(...pathParts)
|
||||
|
||||
const command = new CreateMultipartUploadCommand({
|
||||
Bucket: bucket, Key: join(dir, 'flov2')
|
||||
})
|
||||
const multipart = await client.send(command)
|
||||
console.log({multipart})
|
||||
|
||||
const parts = []
|
||||
// monitor memory usage
|
||||
const intervalMonitorMemoryUsage = setInterval(()=>console.log(Math.round(process.memoryUsage().rss/1024/1024)), 2000)
|
||||
|
||||
const CHUNK_SIZE = Math.ceil(5*1024*1024*1024*1024/10000) // smallest chunk allowing 5TB upload
|
||||
|
||||
async function read(inputStream, maxReadSize){
|
||||
if(maxReadSize === 0){
|
||||
return null
|
||||
}
|
||||
process.stdout.write('+')
|
||||
const chunk = await readChunk(inputStream, maxReadSize)
|
||||
process.stdout.write('@')
|
||||
return chunk
|
||||
}
|
||||
|
||||
async function write(data, chunkStream, remainingBytes){
|
||||
const ready = chunkStream.write(data)
|
||||
if(!ready){
|
||||
process.stdout.write('.')
|
||||
await pFromCallback(cb=> chunkStream.once('drain', cb))
|
||||
process.stdout.write('@')
|
||||
}
|
||||
remainingBytes -= data.length
|
||||
process.stdout.write(remainingBytes+' ')
|
||||
return remainingBytes
|
||||
}
|
||||
|
||||
|
||||
async function uploadChunk(inputStream){
|
||||
const PartNumber = parts.length +1
|
||||
let done = false
|
||||
let remainingBytes = CHUNK_SIZE
|
||||
const maxChunkPartSize = Math.round(CHUNK_SIZE / 1000)
|
||||
const chunkStream = new PassThrough()
|
||||
console.log({maxChunkPartSize,CHUNK_SIZE})
|
||||
|
||||
|
||||
|
||||
let data
|
||||
let chunkBuffer = []
|
||||
const hash = createHash('md5');
|
||||
try{
|
||||
while((data = await read(inputStream, Math.min(remainingBytes, maxChunkPartSize))) !== null){
|
||||
chunkBuffer.push(data)
|
||||
hash.update(data)
|
||||
remainingBytes -= data.length
|
||||
//remainingBytes = await write(data, chunkStream, remainingBytes)
|
||||
}
|
||||
console.log('data put')
|
||||
const fullBuffer = Buffer.alloc(maxChunkPartSize,0)
|
||||
done = remainingBytes > 0
|
||||
// add padding at the end of the file (not a problem for tar like : xva/ova)
|
||||
// if not content length will not match and we'll have UND_ERR_REQ_CONTENT_LENGTH_MISMATCH error
|
||||
console.log('full padding')
|
||||
while(remainingBytes > maxChunkPartSize){
|
||||
chunkBuffer.push(fullBuffer)
|
||||
hash.update(fullBuffer)
|
||||
remainingBytes -= maxChunkPartSize
|
||||
//remainingBytes = await write(fullBuffer,chunkStream, remainingBytes)
|
||||
}
|
||||
console.log('full padding done ')
|
||||
chunkBuffer.push(Buffer.alloc(remainingBytes,0))
|
||||
hash.update(Buffer.alloc(remainingBytes,0))
|
||||
console.log('md5 ok ')
|
||||
//await write(Buffer.alloc(remainingBytes,0),chunkStream, remainingBytes)
|
||||
// wait for the end of the upload
|
||||
|
||||
const command = new UploadPartCommand({
|
||||
...multipart,
|
||||
PartNumber,
|
||||
ContentLength:CHUNK_SIZE,
|
||||
Body: chunkStream,
|
||||
ContentMD5 : hash.digest('base64')
|
||||
})
|
||||
const promise = client.send(command)
|
||||
for (const buffer of chunkBuffer){
|
||||
await write(buffer, chunkStream, remainingBytes)
|
||||
}
|
||||
chunkStream.on('error', err => console.error(err))
|
||||
const res = await promise
|
||||
|
||||
console.log({res, headers : res.headers })
|
||||
parts.push({ ETag:/*res.headers.get('etag') */res.ETag, PartNumber })
|
||||
}catch(err){
|
||||
console.error(err)
|
||||
throw err
|
||||
}
|
||||
return done
|
||||
}
|
||||
|
||||
while(!await uploadChunk(inputStream)){
|
||||
console.log('uploaded one chunk', parts.length)
|
||||
}
|
||||
|
||||
// mark the upload as complete and ask s3 to glue the chunk together
|
||||
const completRes = await client.send(
|
||||
new CompleteMultipartUploadCommand({
|
||||
...multipart,
|
||||
MultipartUpload: { Parts: parts },
|
||||
})
|
||||
)
|
||||
console.log({completRes})
|
||||
clearInterval(intervalMonitorMemoryUsage)
|
||||
|
||||
}
|
||||
|
||||
async function simplePut(url , inputStream){
|
||||
const {
|
||||
allowUnauthorized,
|
||||
host,
|
||||
path,
|
||||
username,
|
||||
password,
|
||||
protocol,
|
||||
region = guessAwsRegion(host),
|
||||
} = parse(url)
|
||||
const client = new S3Client({
|
||||
apiVersion: '2006-03-01',
|
||||
endpoint: `${protocol}://s3.us-east-2.amazonaws.com`,
|
||||
forcePathStyle: true,
|
||||
credentials: {
|
||||
accessKeyId: username,
|
||||
secretAccessKey: password,
|
||||
},
|
||||
region,
|
||||
requestHandler: new NodeHttpHandler({
|
||||
socketTimeout: 600000,
|
||||
httpAgent: new HttpAgent({
|
||||
keepAlive: true,
|
||||
}),
|
||||
httpsAgent: new HttpsAgent({
|
||||
rejectUnauthorized: !allowUnauthorized,
|
||||
keepAlive: true,
|
||||
}),
|
||||
}),
|
||||
})
|
||||
|
||||
const pathParts = split(path)
|
||||
const bucket = pathParts.shift()
|
||||
const dir = join(...pathParts)
|
||||
|
||||
//const hasObjectLock = await client.send(new GetObjectLockConfigurationCommand({Bucket: bucket}))
|
||||
//console.log(hasObjectLock.ObjectLockConfiguration?.ObjectLockEnabled === 'Enabled')
|
||||
|
||||
|
||||
const md5 = await createMD5('/tmp/1g')
|
||||
console.log({md5})
|
||||
const command = new PutObjectCommand({
|
||||
Bucket: bucket, Key: join(dir, 'simple'),
|
||||
ContentMD5: md5,
|
||||
ContentLength: 1024*1024*1024,
|
||||
Body: inputStream
|
||||
})
|
||||
const intervalMonitorMemoryUsage = setInterval(()=>console.log(Math.round(process.memoryUsage().rss/1024/1024)), 2000)
|
||||
|
||||
const res = await client.send(command)
|
||||
/*
|
||||
const presignedUrl = await getSignedUrl(client, command,{ expiresIn: 3600 });
|
||||
const res = await fetch(presignedUrl, {
|
||||
method: 'PUT',
|
||||
body:inputStream,
|
||||
duplex: "half",
|
||||
headers:{
|
||||
"x-amz-decoded-content-length": 1024*1024*1024,
|
||||
"content-md5" : md5
|
||||
}
|
||||
})*/
|
||||
clearInterval(intervalMonitorMemoryUsage)
|
||||
|
||||
console.log(res)
|
||||
}
|
||||
|
||||
async function createMD5(filePath) {
|
||||
const input = await fs.open(filePath) // big ass file
|
||||
return new Promise((res, rej) => {
|
||||
const hash = createHash('md5');
|
||||
|
||||
const rStream = input.createReadStream(filePath);
|
||||
rStream.on('data', (data) => {
|
||||
hash.update(data);
|
||||
});
|
||||
rStream.on('end', () => {
|
||||
res(hash.digest('base64'));
|
||||
});
|
||||
})
|
||||
}
|
||||
const input = await fs.open('/tmp/1g') // big ass file
|
||||
const inputStream = input.createReadStream()
|
||||
const remoteUrl = ""
|
||||
|
||||
v2(remoteUrl,inputStream)
|
||||
|
||||
//simplePut(remoteUrl,inputStream)
|
||||
Reference in New Issue
Block a user