Compare commits
23 Commits
florent-we
...
nr-delete-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7c486f1159 | ||
|
|
3079e1689c | ||
|
|
90b8eed038 | ||
|
|
6f43d4f091 | ||
|
|
a33fc27313 | ||
|
|
cad5f74d45 | ||
|
|
7787f39505 | ||
|
|
548a15214b | ||
|
|
de6efe182b | ||
|
|
834fd5dd07 | ||
|
|
c1e72697b0 | ||
|
|
78dc03e23e | ||
|
|
2f7af5c05a | ||
|
|
877d27a433 | ||
|
|
37d1b48c1b | ||
|
|
7d6a689542 | ||
|
|
b75c06f7fe | ||
|
|
e8bd2ae1e0 | ||
|
|
6e2396e5f4 | ||
|
|
01ceed9e99 | ||
|
|
edd3628a67 | ||
|
|
4a3b2a2a5a | ||
|
|
1a7b49ff39 |
@@ -4,6 +4,7 @@
|
||||
import getStream from 'get-stream'
|
||||
|
||||
import asyncMap from '@xen-orchestra/async-map'
|
||||
import CancelToken from 'promise-toolbox/CancelToken'
|
||||
import limit from 'limit-concurrency-decorator'
|
||||
import path, { basename } from 'path'
|
||||
import synchronized from 'decorator-synchronized'
|
||||
@@ -119,42 +120,6 @@ export default class RemoteHandlerAbstract {
|
||||
await this.__closeFile(fd)
|
||||
}
|
||||
|
||||
// TODO: remove method
|
||||
async createOutputStream(file: File, { checksum = false, dirMode, ...options }: Object = {}): Promise<LaxWritable> {
|
||||
if (typeof file === 'string') {
|
||||
file = normalizePath(file)
|
||||
}
|
||||
const path = typeof file === 'string' ? file : file.path
|
||||
const streamP = timeout.call(
|
||||
this._createOutputStream(file, {
|
||||
dirMode,
|
||||
flags: 'wx',
|
||||
...options,
|
||||
}),
|
||||
this._timeout
|
||||
)
|
||||
|
||||
if (!checksum) {
|
||||
return streamP
|
||||
}
|
||||
|
||||
const checksumStream = createChecksumStream()
|
||||
const forwardError = error => {
|
||||
checksumStream.emit('error', error)
|
||||
}
|
||||
|
||||
const stream = await streamP
|
||||
stream.on('error', forwardError)
|
||||
checksumStream.pipe(stream)
|
||||
|
||||
// $FlowFixMe
|
||||
checksumStream.checksumWritten = checksumStream.checksum
|
||||
.then(value => this._outputFile(checksumFile(path), value, { flags: 'wx' }))
|
||||
.catch(forwardError)
|
||||
|
||||
return checksumStream
|
||||
}
|
||||
|
||||
createReadStream(
|
||||
file: File,
|
||||
{ checksum = false, ignoreMissingChecksum = false, ...options }: Object = {}
|
||||
@@ -209,14 +174,15 @@ export default class RemoteHandlerAbstract {
|
||||
|
||||
// write a stream to a file using a temporary file
|
||||
async outputStream(
|
||||
input: Readable | Promise<Readable>,
|
||||
path: string,
|
||||
{ checksum = true, dirMode }: { checksum?: boolean, dirMode?: number } = {}
|
||||
input: Readable | Promise<Readable>,
|
||||
{ checksum = true, dirMode, cancelToken = CancelToken.none }: { checksum?: boolean, dirMode?: number } = {}
|
||||
): Promise<void> {
|
||||
path = normalizePath(path)
|
||||
return this._outputStream(await input, normalizePath(path), {
|
||||
checksum,
|
||||
dirMode,
|
||||
cancelToken,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -477,13 +443,51 @@ export default class RemoteHandlerAbstract {
|
||||
return this._outputFile(file, data, { flags })
|
||||
}
|
||||
|
||||
async _outputStream(input: Readable, path: string, { checksum, dirMode }: { checksum?: boolean, dirMode?: number }) {
|
||||
async _createOutputStreamChecksum(file: File, { checksum = false, ...options }: Object = {}): Promise<LaxWritable> {
|
||||
if (typeof file === 'string') {
|
||||
file = normalizePath(file)
|
||||
}
|
||||
const path = typeof file === 'string' ? file : file.path
|
||||
const streamP = timeout.call(
|
||||
this._createOutputStream(file, {
|
||||
flags: 'wx',
|
||||
...options,
|
||||
}),
|
||||
this._timeout
|
||||
)
|
||||
|
||||
if (!checksum) {
|
||||
return streamP
|
||||
}
|
||||
|
||||
const checksumStream = createChecksumStream()
|
||||
const forwardError = error => {
|
||||
checksumStream.emit('error', error)
|
||||
}
|
||||
|
||||
const stream = await streamP
|
||||
stream.on('error', forwardError)
|
||||
checksumStream.pipe(stream)
|
||||
|
||||
// $FlowFixMe
|
||||
checksumStream.checksumWritten = checksumStream.checksum
|
||||
.then(value => this._outputFile(checksumFile(path), value, { flags: 'wx' }))
|
||||
.catch(forwardError)
|
||||
|
||||
return checksumStream
|
||||
}
|
||||
|
||||
async _outputStream(
|
||||
input: Readable,
|
||||
path: string,
|
||||
{ checksum, dirMode, cancelToken = CancelToken.none }: { checksum?: boolean, dirMode?: number }
|
||||
) {
|
||||
const tmpPath = `${dirname(path)}/.${basename(path)}`
|
||||
const output = await this.createOutputStream(tmpPath, {
|
||||
checksum,
|
||||
dirMode,
|
||||
})
|
||||
const output = await this._createOutputStreamChecksum(tmpPath, { checksum })
|
||||
try {
|
||||
cancelToken.promise.then(reason => {
|
||||
input.destroy(reason)
|
||||
})
|
||||
input.pipe(output)
|
||||
await fromEvent(output, 'finish')
|
||||
await output.checksumWritten
|
||||
|
||||
@@ -30,18 +30,6 @@ describe('closeFile()', () => {
|
||||
})
|
||||
})
|
||||
|
||||
describe('createOutputStream()', () => {
|
||||
it(`throws in case of timeout`, async () => {
|
||||
const testHandler = new TestHandler({
|
||||
createOutputStream: () => new Promise(() => {}),
|
||||
})
|
||||
|
||||
const promise = testHandler.createOutputStream('File')
|
||||
jest.advanceTimersByTime(TIMEOUT)
|
||||
await expect(promise).rejects.toThrowError(TimeoutError)
|
||||
})
|
||||
})
|
||||
|
||||
describe('getInfo()', () => {
|
||||
it('throws in case of timeout', async () => {
|
||||
const testHandler = new TestHandler({
|
||||
|
||||
@@ -3,11 +3,9 @@
|
||||
import 'dotenv/config'
|
||||
import asyncIteratorToStream from 'async-iterator-to-stream'
|
||||
import { forOwn, random } from 'lodash'
|
||||
import { fromCallback } from 'promise-toolbox'
|
||||
import { pipeline } from 'readable-stream'
|
||||
import { tmpdir } from 'os'
|
||||
|
||||
import { getHandler } from '.'
|
||||
import { getHandler } from './'
|
||||
|
||||
// https://gist.github.com/julien-f/3228c3f34fdac01ade09
|
||||
const unsecureRandomBytes = n => {
|
||||
@@ -82,10 +80,9 @@ handlers.forEach(url => {
|
||||
})
|
||||
})
|
||||
|
||||
describe('#createOutputStream()', () => {
|
||||
describe('#outputStream()', () => {
|
||||
it('creates parent dir if missing', async () => {
|
||||
const stream = await handler.createOutputStream('dir/file')
|
||||
await fromCallback(pipeline, createTestDataStream(), stream)
|
||||
await handler.outputStream('dir/file', createTestDataStream())
|
||||
await expect(await handler.readFile('dir/file')).toEqual(TEST_DATA)
|
||||
})
|
||||
})
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
import aws from '@sullux/aws-sdk'
|
||||
import assert from 'assert'
|
||||
import http from 'http'
|
||||
import { parse } from 'xo-remote-parser'
|
||||
|
||||
import RemoteHandlerAbstract from './abstract'
|
||||
import { createChecksumStream } from './checksum'
|
||||
import CancelToken from 'promise-toolbox/CancelToken'
|
||||
|
||||
// endpoints https://docs.aws.amazon.com/general/latest/gr/s3.html
|
||||
|
||||
@@ -13,12 +15,14 @@ const MAX_PART_SIZE = 1024 * 1024 * 1024 * 5 // 5GB
|
||||
const MAX_PARTS_COUNT = 10000
|
||||
const MAX_OBJECT_SIZE = 1024 * 1024 * 1024 * 1024 * 5 // 5TB
|
||||
const IDEAL_FRAGMENT_SIZE = Math.ceil(MAX_OBJECT_SIZE / MAX_PARTS_COUNT) // the smallest fragment size that still allows a 5TB upload in 10000 fragments, about 524MB
|
||||
|
||||
const USE_SSL = true
|
||||
export default class S3Handler extends RemoteHandlerAbstract {
|
||||
constructor(remote, _opts) {
|
||||
super(remote)
|
||||
const { host, path, username, password } = parse(remote.url)
|
||||
// https://www.zenko.io/blog/first-things-first-getting-started-scality-s3-server/
|
||||
this._s3 = aws({
|
||||
const params = {
|
||||
accessKeyId: username,
|
||||
apiVersion: '2006-03-01',
|
||||
endpoint: host,
|
||||
@@ -28,7 +32,12 @@ export default class S3Handler extends RemoteHandlerAbstract {
|
||||
httpOptions: {
|
||||
timeout: 600000,
|
||||
},
|
||||
}).s3
|
||||
}
|
||||
if (!USE_SSL) {
|
||||
params.httpOptions.agent = new http.Agent()
|
||||
params.sslEnabled = false
|
||||
}
|
||||
this._s3 = aws(params).s3
|
||||
|
||||
const splitPath = path.split('/').filter(s => s.length)
|
||||
this._bucket = splitPath.shift()
|
||||
@@ -43,7 +52,10 @@ export default class S3Handler extends RemoteHandlerAbstract {
|
||||
return { Bucket: this._bucket, Key: this._dir + file }
|
||||
}
|
||||
|
||||
async _outputStream(input, path, { checksum }) {
|
||||
async _outputStream(input, path, { checksum, cancelToken = CancelToken.none }) {
|
||||
cancelToken.promise.then(reason => {
|
||||
input.destroy(reason)
|
||||
})
|
||||
let inputStream = input
|
||||
if (checksum) {
|
||||
const checksumStream = createChecksumStream()
|
||||
@@ -266,4 +278,26 @@ export default class S3Handler extends RemoteHandlerAbstract {
|
||||
}
|
||||
|
||||
async _closeFile(fd) {}
|
||||
|
||||
// https://stackoverflow.com/a/48955582/72637
|
||||
async _rmtree(dir) {
|
||||
const listParams = {
|
||||
Bucket: this._bucket,
|
||||
Prefix: this._dir + dir,
|
||||
}
|
||||
let listedObjects = {}
|
||||
do {
|
||||
listedObjects = await this._s3.listObjectsV2({
|
||||
...listParams,
|
||||
ContinuationToken: listedObjects.NextContinuationToken,
|
||||
})
|
||||
if (listedObjects.Contents.length === 0) {
|
||||
return
|
||||
}
|
||||
await this._s3.deleteObjects({
|
||||
Bucket: this._bucket,
|
||||
Delete: { Objects: listedObjects.Contents.map(({ Key }) => ({ Key })) },
|
||||
})
|
||||
} while (listedObjects.IsTruncated)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
- [Import] Ignore case when detecting file type (PR [#5574](https://github.com/vatesfr/xen-orchestra/pull/5574))
|
||||
- [Backup] Ability to set a specific schedule to always run full backups [#5541](https://github.com/vatesfr/xen-orchestra/issues/5541) (PR [#5546](https://github.com/vatesfr/xen-orchestra/pull/5546))
|
||||
- [Proxy] Log VM backup restoration (PR [#5576](https://github.com/vatesfr/xen-orchestra/pull/5576))
|
||||
- [Backup/S3] Allow backup of metadata to Amazon Web Services S3 (PR [#5373](https://github.com/vatesfr/xen-orchestra/pull/5373))
|
||||
|
||||
### Bug fixes
|
||||
|
||||
@@ -40,7 +41,8 @@
|
||||
>
|
||||
> In case of conflict, the highest (lowest in previous list) `$version` wins.
|
||||
|
||||
- @xen-orchestra/fs minor
|
||||
- @xen-orchestra/fs major
|
||||
- vhd-lib minor
|
||||
- xen-api patch
|
||||
- xo-common minor
|
||||
- xo-server minor
|
||||
|
||||
@@ -1374,7 +1374,7 @@ export default class BackupNg {
|
||||
parentId: taskId,
|
||||
result: () => ({ size: xva.size }),
|
||||
},
|
||||
handler.outputStream(fork, dataFilename, {
|
||||
handler.outputStream(dataFilename, fork, {
|
||||
dirMode,
|
||||
})
|
||||
)
|
||||
@@ -1712,7 +1712,7 @@ export default class BackupNg {
|
||||
}
|
||||
|
||||
// FIXME: should only be renamed after the metadata file has been written
|
||||
await handler.outputStream(fork.streams[`${id}.vhd`](), path, {
|
||||
await handler.outputStream(path, fork.streams[`${id}.vhd`](), {
|
||||
// no checksum for VHDs, because they will be invalidated by
|
||||
// merges and chainings
|
||||
checksum: false,
|
||||
|
||||
@@ -548,11 +548,7 @@ export default class {
|
||||
const sizeStream = createSizeStream()
|
||||
|
||||
try {
|
||||
const targetStream = await handler.createOutputStream(backupFullPath)
|
||||
|
||||
stream.on('error', error => targetStream.emit('error', error))
|
||||
|
||||
await Promise.all([fromEvent(stream.pipe(sizeStream).pipe(targetStream), 'finish'), stream.task])
|
||||
await Promise.all([handler.outputStream(backupFullPath, sizeStream), stream.task])
|
||||
} catch (error) {
|
||||
// Remove new backup. (corrupt).
|
||||
await handler.unlink(backupFullPath)::ignoreErrors()
|
||||
@@ -782,9 +778,7 @@ export default class {
|
||||
|
||||
@deferrable
|
||||
async _backupVm($defer, vm, handler, file, { compress }) {
|
||||
const targetStream = await handler.createOutputStream(file)
|
||||
$defer.onFailure.call(handler, 'unlink', file)
|
||||
$defer.onFailure.call(targetStream, 'close')
|
||||
|
||||
const sourceStream = await this._xo.getXapi(vm).exportVm(vm._xapiId, {
|
||||
compress,
|
||||
@@ -792,9 +786,9 @@ export default class {
|
||||
|
||||
const sizeStream = createSizeStream()
|
||||
|
||||
sourceStream.pipe(sizeStream).pipe(targetStream)
|
||||
sourceStream.pipe(sizeStream)
|
||||
|
||||
await Promise.all([sourceStream.task, fromEvent(targetStream, 'finish')])
|
||||
await Promise.all([sourceStream.task, handler.outputStream(file, sizeStream)])
|
||||
|
||||
return {
|
||||
transferSize: sizeStream.size,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
// @flow
|
||||
import asyncMap from '@xen-orchestra/async-map'
|
||||
import createLogger from '@xen-orchestra/log'
|
||||
import { fromEvent, ignoreErrors, timeout } from 'promise-toolbox'
|
||||
import { ignoreErrors, timeout } from 'promise-toolbox'
|
||||
import { parseDuration } from '@vates/parse-duration'
|
||||
|
||||
import { debounceWithKey, REMOVE_CACHE_ENTRY } from '../_pDebounceWithKey'
|
||||
@@ -304,20 +304,13 @@ export default class metadataBackup {
|
||||
}
|
||||
)
|
||||
|
||||
let outputStream
|
||||
try {
|
||||
const { dirMode } = this._backupOptions
|
||||
await waitAll([
|
||||
(async () => {
|
||||
outputStream = await handler.createOutputStream(fileName, {
|
||||
dirMode,
|
||||
})
|
||||
|
||||
// 'readable-stream/pipeline' not call the callback when an error throws
|
||||
// from the readable stream
|
||||
stream.pipe(outputStream)
|
||||
return timeout.call(
|
||||
fromEvent(stream, 'end').catch(error => {
|
||||
handler.outputStream(fileName, stream, { cancelToken }).catch(error => {
|
||||
stream.destroy()
|
||||
if (error.message !== 'aborted') {
|
||||
throw error
|
||||
}
|
||||
@@ -353,9 +346,6 @@ export default class metadataBackup {
|
||||
|
||||
this._listPoolMetadataBackups(REMOVE_CACHE_ENTRY, remoteId)
|
||||
} catch (error) {
|
||||
if (outputStream !== undefined) {
|
||||
outputStream.destroy()
|
||||
}
|
||||
await handler.rmtree(dir).catch(error => {
|
||||
logger.warning(`unable to delete the folder ${dir}`, {
|
||||
event: 'task.warning',
|
||||
|
||||
@@ -24,6 +24,15 @@ afterEach(async () => {
|
||||
await pFromCallback(cb => rimraf(tmpDir, cb))
|
||||
})
|
||||
|
||||
function bufferToArray(buffer) {
|
||||
const view = new DataView(buffer)
|
||||
const res = []
|
||||
for (let i = 0; i < buffer.byteLength; i += 4) {
|
||||
res.push(view.getUint32(i, true))
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
function createFileAccessor(file) {
|
||||
return async (start, end) => {
|
||||
if (start < 0 || end < 0) {
|
||||
@@ -52,7 +61,11 @@ test('VMDK to VHD can convert a random data file with VMDKDirectParser', async (
|
||||
})
|
||||
const result = await readVmdkGrainTable(createFileAccessor(vmdkFileName))
|
||||
const pipe = (
|
||||
await vmdkToVhd(createReadStream(vmdkFileName), result.grainLogicalAddressList, result.grainFileOffsetList)
|
||||
await vmdkToVhd(
|
||||
createReadStream(vmdkFileName),
|
||||
bufferToArray(result.grainLogicalAddressList),
|
||||
bufferToArray(result.grainFileOffsetList)
|
||||
)
|
||||
).pipe(createWriteStream(vhdFileName))
|
||||
await eventToPromise(pipe, 'finish')
|
||||
await execa('vhd-util', ['check', '-p', '-b', '-t', '-n', vhdFileName])
|
||||
|
||||
Reference in New Issue
Block a user