Compare commits

...

23 Commits

Author SHA1 Message Date
Julien Fontanet
7c486f1159 Merge branch 'master' into nr-delete-fs-createOutputStream 2021-02-17 16:48:14 +01:00
Nicolas Raynaud
3079e1689c update CHANGELOG.unreleased.md 2021-02-15 09:26:45 +01:00
Nicolas Raynaud
90b8eed038 Merge branch 'master' into nr-delete-fs-createOutputStream 2021-02-15 08:57:29 +01:00
Nicolas Raynaud
6f43d4f091 fix tests 2021-02-06 13:14:07 +01:00
Nicolas Raynaud
a33fc27313 fix tests 2021-02-05 19:50:26 +01:00
Nicolas Raynaud
cad5f74d45 fix tests 2021-02-05 16:50:40 +01:00
Nicolas Raynaud
7787f39505 fix tests 2021-02-05 13:43:22 +01:00
Nicolas Raynaud
548a15214b try to fix travis tests 2021-02-04 17:20:13 +01:00
Nicolas Raynaud
de6efe182b fix AbstractHandler.outputStream() argument order to be compatible with createPrefixWrapperMethods() 2021-02-04 16:43:53 +01:00
Nicolas Raynaud
834fd5dd07 fix s3._rmtree() 2021-02-03 17:38:31 +01:00
Nicolas Raynaud
c1e72697b0 restore S3 ssl 2021-02-03 15:49:12 +01:00
Nicolas Raynaud
78dc03e23e make cancelToken an keyword optional argument. 2021-02-03 15:43:47 +01:00
Nicolas Raynaud
2f7af5c05a restore docs files 2021-02-03 14:31:42 +01:00
Nicolas Raynaud
877d27a433 propagate the cancelToken to fs 2021-02-03 14:25:24 +01:00
Nicolas Raynaud
37d1b48c1b Merge branch 'master' into nr-delete-fs-createOutputStream 2021-02-02 17:13:23 +01:00
Nicolas Raynaud
7d6a689542 merge master 2021-01-15 15:29:14 +01:00
Nicolas Raynaud
b75c06f7fe convert #createOutputStream() test to #outputStream() test 2020-11-17 14:30:33 +01:00
Nicolas Raynaud
e8bd2ae1e0 fix parameter orders, add S3 rmtree() 2020-11-16 18:28:41 +01:00
Nicolas Raynaud
6e2396e5f4 fix parameter orders, add S3 rmtree() 2020-11-16 18:26:28 +01:00
Nicolas Raynaud
01ceed9e99 Merge branch 'master' into nr-delete-fs-createOutputStream 2020-11-16 17:00:05 +01:00
Nicolas Raynaud
edd3628a67 delete fs.createOutputStream() 2020-11-13 09:04:24 +01:00
Nicolas Raynaud
4a3b2a2a5a Apply suggestions from code review
Co-authored-by: Julien Fontanet <julien.fontanet@isonoe.net>
2020-11-13 08:39:21 +01:00
Nicolas Raynaud
1a7b49ff39 delete fs.createOutputStream() 2020-11-12 16:38:30 +01:00
9 changed files with 112 additions and 90 deletions

View File

@@ -4,6 +4,7 @@
import getStream from 'get-stream'
import asyncMap from '@xen-orchestra/async-map'
import CancelToken from 'promise-toolbox/CancelToken'
import limit from 'limit-concurrency-decorator'
import path, { basename } from 'path'
import synchronized from 'decorator-synchronized'
@@ -119,42 +120,6 @@ export default class RemoteHandlerAbstract {
await this.__closeFile(fd)
}
// TODO: remove method
async createOutputStream(file: File, { checksum = false, dirMode, ...options }: Object = {}): Promise<LaxWritable> {
if (typeof file === 'string') {
file = normalizePath(file)
}
const path = typeof file === 'string' ? file : file.path
const streamP = timeout.call(
this._createOutputStream(file, {
dirMode,
flags: 'wx',
...options,
}),
this._timeout
)
if (!checksum) {
return streamP
}
const checksumStream = createChecksumStream()
const forwardError = error => {
checksumStream.emit('error', error)
}
const stream = await streamP
stream.on('error', forwardError)
checksumStream.pipe(stream)
// $FlowFixMe
checksumStream.checksumWritten = checksumStream.checksum
.then(value => this._outputFile(checksumFile(path), value, { flags: 'wx' }))
.catch(forwardError)
return checksumStream
}
createReadStream(
file: File,
{ checksum = false, ignoreMissingChecksum = false, ...options }: Object = {}
@@ -209,14 +174,15 @@ export default class RemoteHandlerAbstract {
// write a stream to a file using a temporary file
async outputStream(
input: Readable | Promise<Readable>,
path: string,
{ checksum = true, dirMode }: { checksum?: boolean, dirMode?: number } = {}
input: Readable | Promise<Readable>,
{ checksum = true, dirMode, cancelToken = CancelToken.none }: { checksum?: boolean, dirMode?: number } = {}
): Promise<void> {
path = normalizePath(path)
return this._outputStream(await input, normalizePath(path), {
checksum,
dirMode,
cancelToken,
})
}
@@ -477,13 +443,51 @@ export default class RemoteHandlerAbstract {
return this._outputFile(file, data, { flags })
}
async _outputStream(input: Readable, path: string, { checksum, dirMode }: { checksum?: boolean, dirMode?: number }) {
async _createOutputStreamChecksum(file: File, { checksum = false, ...options }: Object = {}): Promise<LaxWritable> {
if (typeof file === 'string') {
file = normalizePath(file)
}
const path = typeof file === 'string' ? file : file.path
const streamP = timeout.call(
this._createOutputStream(file, {
flags: 'wx',
...options,
}),
this._timeout
)
if (!checksum) {
return streamP
}
const checksumStream = createChecksumStream()
const forwardError = error => {
checksumStream.emit('error', error)
}
const stream = await streamP
stream.on('error', forwardError)
checksumStream.pipe(stream)
// $FlowFixMe
checksumStream.checksumWritten = checksumStream.checksum
.then(value => this._outputFile(checksumFile(path), value, { flags: 'wx' }))
.catch(forwardError)
return checksumStream
}
async _outputStream(
input: Readable,
path: string,
{ checksum, dirMode, cancelToken = CancelToken.none }: { checksum?: boolean, dirMode?: number }
) {
const tmpPath = `${dirname(path)}/.${basename(path)}`
const output = await this.createOutputStream(tmpPath, {
checksum,
dirMode,
})
const output = await this._createOutputStreamChecksum(tmpPath, { checksum })
try {
cancelToken.promise.then(reason => {
input.destroy(reason)
})
input.pipe(output)
await fromEvent(output, 'finish')
await output.checksumWritten

View File

@@ -30,18 +30,6 @@ describe('closeFile()', () => {
})
})
describe('createOutputStream()', () => {
it(`throws in case of timeout`, async () => {
const testHandler = new TestHandler({
createOutputStream: () => new Promise(() => {}),
})
const promise = testHandler.createOutputStream('File')
jest.advanceTimersByTime(TIMEOUT)
await expect(promise).rejects.toThrowError(TimeoutError)
})
})
describe('getInfo()', () => {
it('throws in case of timeout', async () => {
const testHandler = new TestHandler({

View File

@@ -3,11 +3,9 @@
import 'dotenv/config'
import asyncIteratorToStream from 'async-iterator-to-stream'
import { forOwn, random } from 'lodash'
import { fromCallback } from 'promise-toolbox'
import { pipeline } from 'readable-stream'
import { tmpdir } from 'os'
import { getHandler } from '.'
import { getHandler } from './'
// https://gist.github.com/julien-f/3228c3f34fdac01ade09
const unsecureRandomBytes = n => {
@@ -82,10 +80,9 @@ handlers.forEach(url => {
})
})
describe('#createOutputStream()', () => {
describe('#outputStream()', () => {
it('creates parent dir if missing', async () => {
const stream = await handler.createOutputStream('dir/file')
await fromCallback(pipeline, createTestDataStream(), stream)
await handler.outputStream('dir/file', createTestDataStream())
await expect(await handler.readFile('dir/file')).toEqual(TEST_DATA)
})
})

View File

@@ -1,9 +1,11 @@
import aws from '@sullux/aws-sdk'
import assert from 'assert'
import http from 'http'
import { parse } from 'xo-remote-parser'
import RemoteHandlerAbstract from './abstract'
import { createChecksumStream } from './checksum'
import CancelToken from 'promise-toolbox/CancelToken'
// endpoints https://docs.aws.amazon.com/general/latest/gr/s3.html
@@ -13,12 +15,14 @@ const MAX_PART_SIZE = 1024 * 1024 * 1024 * 5 // 5GB
const MAX_PARTS_COUNT = 10000
const MAX_OBJECT_SIZE = 1024 * 1024 * 1024 * 1024 * 5 // 5TB
const IDEAL_FRAGMENT_SIZE = Math.ceil(MAX_OBJECT_SIZE / MAX_PARTS_COUNT) // the smallest fragment size that still allows a 5TB upload in 10000 fragments, about 524MB
const USE_SSL = true
export default class S3Handler extends RemoteHandlerAbstract {
constructor(remote, _opts) {
super(remote)
const { host, path, username, password } = parse(remote.url)
// https://www.zenko.io/blog/first-things-first-getting-started-scality-s3-server/
this._s3 = aws({
const params = {
accessKeyId: username,
apiVersion: '2006-03-01',
endpoint: host,
@@ -28,7 +32,12 @@ export default class S3Handler extends RemoteHandlerAbstract {
httpOptions: {
timeout: 600000,
},
}).s3
}
if (!USE_SSL) {
params.httpOptions.agent = new http.Agent()
params.sslEnabled = false
}
this._s3 = aws(params).s3
const splitPath = path.split('/').filter(s => s.length)
this._bucket = splitPath.shift()
@@ -43,7 +52,10 @@ export default class S3Handler extends RemoteHandlerAbstract {
return { Bucket: this._bucket, Key: this._dir + file }
}
async _outputStream(input, path, { checksum }) {
async _outputStream(input, path, { checksum, cancelToken = CancelToken.none }) {
cancelToken.promise.then(reason => {
input.destroy(reason)
})
let inputStream = input
if (checksum) {
const checksumStream = createChecksumStream()
@@ -266,4 +278,26 @@ export default class S3Handler extends RemoteHandlerAbstract {
}
async _closeFile(fd) {}
// https://stackoverflow.com/a/48955582/72637
async _rmtree(dir) {
const listParams = {
Bucket: this._bucket,
Prefix: this._dir + dir,
}
let listedObjects = {}
do {
listedObjects = await this._s3.listObjectsV2({
...listParams,
ContinuationToken: listedObjects.NextContinuationToken,
})
if (listedObjects.Contents.length === 0) {
return
}
await this._s3.deleteObjects({
Bucket: this._bucket,
Delete: { Objects: listedObjects.Contents.map(({ Key }) => ({ Key })) },
})
} while (listedObjects.IsTruncated)
}
}

View File

@@ -13,6 +13,7 @@
- [Import] Ignore case when detecting file type (PR [#5574](https://github.com/vatesfr/xen-orchestra/pull/5574))
- [Backup] Ability to set a specific schedule to always run full backups [#5541](https://github.com/vatesfr/xen-orchestra/issues/5541) (PR [#5546](https://github.com/vatesfr/xen-orchestra/pull/5546))
- [Proxy] Log VM backup restoration (PR [#5576](https://github.com/vatesfr/xen-orchestra/pull/5576))
- [Backup/S3] Allow backup of metadata to Amazon Web Services S3 (PR [#5373](https://github.com/vatesfr/xen-orchestra/pull/5373))
### Bug fixes
@@ -40,7 +41,8 @@
>
> In case of conflict, the highest (lowest in previous list) `$version` wins.
- @xen-orchestra/fs minor
- @xen-orchestra/fs major
- vhd-lib minor
- xen-api patch
- xo-common minor
- xo-server minor

View File

@@ -1374,7 +1374,7 @@ export default class BackupNg {
parentId: taskId,
result: () => ({ size: xva.size }),
},
handler.outputStream(fork, dataFilename, {
handler.outputStream(dataFilename, fork, {
dirMode,
})
)
@@ -1712,7 +1712,7 @@ export default class BackupNg {
}
// FIXME: should only be renamed after the metadata file has been written
await handler.outputStream(fork.streams[`${id}.vhd`](), path, {
await handler.outputStream(path, fork.streams[`${id}.vhd`](), {
// no checksum for VHDs, because they will be invalidated by
// merges and chainings
checksum: false,

View File

@@ -548,11 +548,7 @@ export default class {
const sizeStream = createSizeStream()
try {
const targetStream = await handler.createOutputStream(backupFullPath)
stream.on('error', error => targetStream.emit('error', error))
await Promise.all([fromEvent(stream.pipe(sizeStream).pipe(targetStream), 'finish'), stream.task])
await Promise.all([handler.outputStream(backupFullPath, sizeStream), stream.task])
} catch (error) {
// Remove new backup. (corrupt).
await handler.unlink(backupFullPath)::ignoreErrors()
@@ -782,9 +778,7 @@ export default class {
@deferrable
async _backupVm($defer, vm, handler, file, { compress }) {
const targetStream = await handler.createOutputStream(file)
$defer.onFailure.call(handler, 'unlink', file)
$defer.onFailure.call(targetStream, 'close')
const sourceStream = await this._xo.getXapi(vm).exportVm(vm._xapiId, {
compress,
@@ -792,9 +786,9 @@ export default class {
const sizeStream = createSizeStream()
sourceStream.pipe(sizeStream).pipe(targetStream)
sourceStream.pipe(sizeStream)
await Promise.all([sourceStream.task, fromEvent(targetStream, 'finish')])
await Promise.all([sourceStream.task, handler.outputStream(file, sizeStream)])
return {
transferSize: sizeStream.size,

View File

@@ -1,7 +1,7 @@
// @flow
import asyncMap from '@xen-orchestra/async-map'
import createLogger from '@xen-orchestra/log'
import { fromEvent, ignoreErrors, timeout } from 'promise-toolbox'
import { ignoreErrors, timeout } from 'promise-toolbox'
import { parseDuration } from '@vates/parse-duration'
import { debounceWithKey, REMOVE_CACHE_ENTRY } from '../_pDebounceWithKey'
@@ -304,20 +304,13 @@ export default class metadataBackup {
}
)
let outputStream
try {
const { dirMode } = this._backupOptions
await waitAll([
(async () => {
outputStream = await handler.createOutputStream(fileName, {
dirMode,
})
// 'readable-stream/pipeline' not call the callback when an error throws
// from the readable stream
stream.pipe(outputStream)
return timeout.call(
fromEvent(stream, 'end').catch(error => {
handler.outputStream(fileName, stream, { cancelToken }).catch(error => {
stream.destroy()
if (error.message !== 'aborted') {
throw error
}
@@ -353,9 +346,6 @@ export default class metadataBackup {
this._listPoolMetadataBackups(REMOVE_CACHE_ENTRY, remoteId)
} catch (error) {
if (outputStream !== undefined) {
outputStream.destroy()
}
await handler.rmtree(dir).catch(error => {
logger.warning(`unable to delete the folder ${dir}`, {
event: 'task.warning',

View File

@@ -24,6 +24,15 @@ afterEach(async () => {
await pFromCallback(cb => rimraf(tmpDir, cb))
})
function bufferToArray(buffer) {
const view = new DataView(buffer)
const res = []
for (let i = 0; i < buffer.byteLength; i += 4) {
res.push(view.getUint32(i, true))
}
return res
}
function createFileAccessor(file) {
return async (start, end) => {
if (start < 0 || end < 0) {
@@ -52,7 +61,11 @@ test('VMDK to VHD can convert a random data file with VMDKDirectParser', async (
})
const result = await readVmdkGrainTable(createFileAccessor(vmdkFileName))
const pipe = (
await vmdkToVhd(createReadStream(vmdkFileName), result.grainLogicalAddressList, result.grainFileOffsetList)
await vmdkToVhd(
createReadStream(vmdkFileName),
bufferToArray(result.grainLogicalAddressList),
bufferToArray(result.grainFileOffsetList)
)
).pipe(createWriteStream(vhdFileName))
await eventToPromise(pipe, 'finish')
await execa('vhd-util', ['check', '-p', '-b', '-t', '-n', vhdFileName])