fix(fs/s3): TimeoutError: Connection timed out after 120000ms (#5397)

This commit is contained in:
Nicolas Raynaud 2021-01-14 23:23:40 +01:00 committed by GitHub
parent 70083c6dca
commit 289b7a3dbe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 93 additions and 81 deletions

View File

@ -24,6 +24,7 @@
"dependencies": {
"@marsaud/smb2": "^0.17.2",
"@sindresorhus/df": "^3.1.1",
"@sullux/aws-sdk": "^1.0.5",
"@xen-orchestra/async-map": "^0.0.0",
"aws-sdk": "^2.686.0",
"decorator-synchronized": "^0.5.0",

View File

@ -1,4 +1,5 @@
import AWS from 'aws-sdk'
import aws from '@sullux/aws-sdk'
import assert from 'assert'
import { parse } from 'xo-remote-parser'
import RemoteHandlerAbstract from './abstract'
@ -16,16 +17,19 @@ export default class S3Handler extends RemoteHandlerAbstract {
constructor(remote, _opts) {
super(remote)
const { host, path, username, password } = parse(remote.url)
// https://www.zenko.io/blog/first-things-first-getting-started-scality-s3-server/
this._s3 = new AWS.S3({
this._s3 = aws({
accessKeyId: username,
apiVersion: '2006-03-01',
endpoint: host,
s3ForcePathStyle: true,
secretAccessKey: password,
signatureVersion: 'v4',
})
httpOptions: {
timeout: 600000,
},
}).s3
const splitPath = path.split('/').filter(s => s.length)
this._bucket = splitPath.shift()
this._dir = splitPath.join('/')
@ -50,35 +54,35 @@ export default class S3Handler extends RemoteHandlerAbstract {
input.on('error', forwardError)
inputStream = checksumStream
}
const upload = this._s3.upload(
await this._s3.upload(
{
...this._createParams(path),
Body: inputStream,
},
{ partSize: IDEAL_FRAGMENT_SIZE }
{ partSize: IDEAL_FRAGMENT_SIZE, queueSize: 1 }
)
await upload.promise()
if (checksum) {
const checksum = await inputStream.checksum
const params = {
...this._createParams(path + '.checksum'),
Body: checksum,
}
await this._s3.upload(params).promise()
await this._s3.upload(params)
}
await input.task
}
async _writeFile(file, data, options) {
return this._s3.putObject({ ...this._createParams(file), Body: data }).promise()
return this._s3.putObject({ ...this._createParams(file), Body: data })
}
async _createReadStream(file, options) {
return this._s3.getObject(this._createParams(file)).createReadStream()
// https://github.com/Sullux/aws-sdk/issues/11
return this._s3.getObject.raw(this._createParams(file)).createReadStream()
}
async _unlink(file) {
return this._s3.deleteObject(this._createParams(file)).promise()
return this._s3.deleteObject(this._createParams(file))
}
async _list(dir) {
@ -88,11 +92,10 @@ export default class S3Handler extends RemoteHandlerAbstract {
const prefix = [this._dir, dir].join('/')
const splitPrefix = splitPath(prefix)
const request = this._s3.listObjectsV2({
const result = await this._s3.listObjectsV2({
Bucket: this._bucket,
Prefix: splitPrefix.join('/'),
})
const result = await request.promise()
const uniq = new Set()
for (const entry of result.Contents) {
const line = splitPath(entry.Key)
@ -104,19 +107,32 @@ export default class S3Handler extends RemoteHandlerAbstract {
}
async _rename(oldPath, newPath) {
const params = {
...this._createParams(newPath),
CopySource: `/${this._bucket}/${this._dir}${oldPath}`,
const size = await this._getSize(oldPath)
const multipartParams = await this._s3.createMultipartUpload({ ...this._createParams(newPath) })
const param2 = { ...multipartParams, CopySource: `/${this._bucket}/${this._dir}${oldPath}` }
try {
const parts = []
let start = 0
while (start < size) {
const range = `bytes=${start}-${Math.min(start + MAX_PART_SIZE, size) - 1}`
const partParams = { ...param2, PartNumber: parts.length + 1, CopySourceRange: range }
const upload = await this._s3.uploadPartCopy(partParams)
parts.push({ ETag: upload.CopyPartResult.ETag, PartNumber: partParams.PartNumber })
start += MAX_PART_SIZE
}
await this._s3.completeMultipartUpload({ ...multipartParams, MultipartUpload: { Parts: parts } })
} catch (e) {
await this._s3.abortMultipartUpload(multipartParams)
throw e
}
await this._s3.copyObject(params).promise()
await this._s3.deleteObject(this._createParams(oldPath)).promise()
await this._s3.deleteObject(this._createParams(oldPath))
}
async _getSize(file) {
if (typeof file !== 'string') {
file = file.fd
}
const result = await this._s3.headObject(this._createParams(file)).promise()
const result = await this._s3.headObject(this._createParams(file))
return +result.ContentLength
}
@ -126,7 +142,7 @@ export default class S3Handler extends RemoteHandlerAbstract {
}
const params = this._createParams(file)
params.Range = `bytes=${position}-${position + buffer.length - 1}`
const result = await this._s3.getObject(params).promise()
const result = await this._s3.getObject(params)
result.Body.copy(buffer)
return { bytesRead: result.Body.length, buffer }
}
@ -136,13 +152,13 @@ export default class S3Handler extends RemoteHandlerAbstract {
file = file.fd
}
const uploadParams = this._createParams(file)
const fileSize = +(await this._s3.headObject(uploadParams).promise()).ContentLength
const fileSize = +(await this._s3.headObject(uploadParams)).ContentLength
if (fileSize < MIN_PART_SIZE) {
const resultBuffer = Buffer.alloc(Math.max(fileSize, position + buffer.length))
const fileContent = (await this._s3.getObject(uploadParams).promise()).Body
const fileContent = (await this._s3.getObject(uploadParams)).Body
fileContent.copy(resultBuffer)
buffer.copy(resultBuffer, position)
await this._s3.putObject({ ...uploadParams, Body: resultBuffer }).promise()
await this._s3.putObject({ ...uploadParams, Body: resultBuffer })
return { buffer, bytesWritten: buffer.length }
} else {
// using this trick: https://stackoverflow.com/a/38089437/72637
@ -151,8 +167,13 @@ export default class S3Handler extends RemoteHandlerAbstract {
// if `prefix` is bigger than 5Mo, it will be sourced from uploadPartCopy()
// otherwise otherwise it will be downloaded, concatenated to `edit`
// `edit` will always be an upload part
// `suffix` will ways be sourced from uploadPartCopy()
const multipartParams = await this._s3.createMultipartUpload(uploadParams).promise()
// `suffix` will always be sourced from uploadPartCopy()
// Then everything will be sliced in 5Gb parts before getting uploaded
const multipartParams = await this._s3.createMultipartUpload(uploadParams)
const copyMultipartParams = {
...multipartParams,
CopySource: `/${this._bucket}/${this._dir + file}`,
}
try {
const parts = []
const prefixSize = position
@ -162,36 +183,32 @@ export default class S3Handler extends RemoteHandlerAbstract {
let editBuffer = buffer
let editBufferOffset = position
let partNumber = 1
if (prefixSize < MIN_PART_SIZE) {
const downloadParams = {
...uploadParams,
Range: `bytes=0-${prefixSize - 1}`,
}
const prefixBuffer =
prefixSize > 0 ? (await this._s3.getObject(downloadParams).promise()).Body : Buffer.alloc(0)
let prefixPosition = 0
// use floor() so that last fragment is handled in the if bellow
let fragmentsCount = Math.floor(prefixSize / MAX_PART_SIZE)
const prefixFragmentSize = MAX_PART_SIZE
let prefixLastFragmentSize = prefixSize - prefixFragmentSize * fragmentsCount
if (prefixLastFragmentSize >= MIN_PART_SIZE) {
// the last fragment of the prefix is smaller than MAX_PART_SIZE, but bigger than the minimum
// so we can copy it too
fragmentsCount++
prefixLastFragmentSize = 0
}
for (let i = 0; i < fragmentsCount; i++) {
const fragmentEnd = Math.min(prefixPosition + prefixFragmentSize, prefixSize)
assert.strictEqual(fragmentEnd - prefixPosition <= MAX_PART_SIZE, true)
const range = `bytes=${prefixPosition}-${fragmentEnd - 1}`
const copyPrefixParams = { ...copyMultipartParams, PartNumber: partNumber++, CopySourceRange: range }
const part = await this._s3.uploadPartCopy(copyPrefixParams)
parts.push({ ETag: part.CopyPartResult.ETag, PartNumber: copyPrefixParams.PartNumber })
prefixPosition += prefixFragmentSize
}
if (prefixLastFragmentSize) {
// grab everything from the prefix that was too small to be copied, download and merge to the edit buffer.
const downloadParams = { ...uploadParams, Range: `bytes=${prefixPosition}-${prefixSize - 1}` }
const prefixBuffer = prefixSize > 0 ? (await this._s3.getObject(downloadParams)).Body : Buffer.alloc(0)
editBuffer = Buffer.concat([prefixBuffer, buffer])
editBufferOffset = 0
} else {
const fragmentsCount = Math.ceil(prefixSize / MAX_PART_SIZE)
const prefixFragmentSize = Math.ceil(prefixSize / fragmentsCount)
const lastFragmentSize = prefixFragmentSize * fragmentsCount - prefixSize
let prefixPosition = 0
for (let i = 0; i < fragmentsCount; i++) {
const copyPrefixParams = {
...multipartParams,
PartNumber: partNumber++,
CopySource: `/${this._bucket}/${this._dir + file}`,
CopySourceRange: `bytes=${prefixPosition}-${prefixPosition + prefixFragmentSize - 1}`,
}
const prefixPart = (await this._s3.uploadPartCopy(copyPrefixParams).promise()).CopyPartResult
parts.push({
ETag: prefixPart.ETag,
PartNumber: copyPrefixParams.PartNumber,
})
prefixPosition += prefixFragmentSize
}
if (lastFragmentSize) {
}
editBufferOffset -= prefixLastFragmentSize
}
if (hasSuffix && editBuffer.length < MIN_PART_SIZE) {
// the edit fragment is too short and is not the last fragment
@ -204,45 +221,32 @@ export default class S3Handler extends RemoteHandlerAbstract {
hasSuffix = suffixSize > 0
const prefixRange = `bytes=${complementOffset}-${complementOffset + complementSize - 1}`
const downloadParams = { ...uploadParams, Range: prefixRange }
const complementBuffer = (await this._s3.getObject(downloadParams).promise()).Body
const complementBuffer = (await this._s3.getObject(downloadParams)).Body
editBuffer = Buffer.concat([editBuffer, complementBuffer])
}
const editParams = {
...multipartParams,
Body: editBuffer,
PartNumber: partNumber++,
}
const editPart = await this._s3.uploadPart(editParams).promise()
const editParams = { ...multipartParams, Body: editBuffer, PartNumber: partNumber++ }
const editPart = await this._s3.uploadPart(editParams)
parts.push({ ETag: editPart.ETag, PartNumber: editParams.PartNumber })
if (hasSuffix) {
// use ceil because the last fragment can be arbitrarily small.
const suffixFragments = Math.ceil(suffixSize / MAX_PART_SIZE)
const suffixFragmentsSize = Math.ceil(suffixSize / suffixFragments)
let suffixFragmentOffset = suffixOffset
for (let i = 0; i < suffixFragments; i++) {
const fragmentEnd = suffixFragmentOffset + suffixFragmentsSize
const fragmentEnd = suffixFragmentOffset + MAX_PART_SIZE
assert.strictEqual(Math.min(fileSize, fragmentEnd) - suffixFragmentOffset <= MAX_PART_SIZE, true)
const suffixRange = `bytes=${suffixFragmentOffset}-${Math.min(fileSize, fragmentEnd) - 1}`
const copySuffixParams = {
...multipartParams,
PartNumber: partNumber++,
CopySource: `/${this._bucket}/${this._dir + file}`,
CopySourceRange: suffixRange,
}
const suffixPart = (await this._s3.uploadPartCopy(copySuffixParams).promise()).CopyPartResult
parts.push({
ETag: suffixPart.ETag,
PartNumber: copySuffixParams.PartNumber,
})
const copySuffixParams = { ...copyMultipartParams, PartNumber: partNumber++, CopySourceRange: suffixRange }
const suffixPart = (await this._s3.uploadPartCopy(copySuffixParams)).CopyPartResult
parts.push({ ETag: suffixPart.ETag, PartNumber: copySuffixParams.PartNumber })
suffixFragmentOffset = fragmentEnd
}
}
await this._s3
.completeMultipartUpload({
...multipartParams,
MultipartUpload: { Parts: parts },
})
.promise()
await this._s3.completeMultipartUpload({
...multipartParams,
MultipartUpload: { Parts: parts },
})
} catch (e) {
await this._s3.abortMultipartUpload(multipartParams).promise()
await this._s3.abortMultipartUpload(multipartParams)
throw e
}
}

View File

@ -21,6 +21,7 @@
- [Backup reports] Fix malformed sent email in case of multiple VMs (PR [#5479](https://github.com/vatesfr/xen-orchestra/pull/5479))
- [Restore/metadata] Ignore disabled remotes on listing backups (PR [#5504](https://github.com/vatesfr/xen-orchestra/pull/5504))
- [VM/network] Change VIF's locking mode automatically to `network_default` when changing network (PR [#5500](https://github.com/vatesfr/xen-orchestra/pull/5500))
- [Backup/S3] Fix `TimeoutError: Connection timed out after 120000ms` (PR [#5456](https://github.com/vatesfr/xen-orchestra/pull/5456))
### Packages to release
@ -39,6 +40,7 @@
>
> In case of conflict, the highest (lowest in previous list) `$version` wins.
- @xen-orchestra/fs patch
- xo-server-backup-reports patch
- xo-server minor
- xo-web minor

View File

@ -1463,6 +1463,11 @@
dependencies:
"@sinonjs/commons" "^1.7.0"
"@sullux/aws-sdk@^1.0.5":
version "1.0.5"
resolved "https://registry.yarnpkg.com/@sullux/aws-sdk/-/aws-sdk-1.0.5.tgz#6d0377024b286f8732250a832a895ef32f764099"
integrity sha512-5Dq9Wjg7UsLyMz87vyGgW2+Ykcz1mJVSNNEwu096L5G1xS1+37dBzV8ibq91DOLQ22pMs8HgFpmzqTgyZy4cOw==
"@szmarczak/http-timer@^1.1.2":
version "1.1.2"
resolved "https://registry.yarnpkg.com/@szmarczak/http-timer/-/http-timer-1.1.2.tgz#b1665e2c461a2cd92f4c1bbf50d5454de0d4b421"