Compare commits

...

32 Commits

Author SHA1 Message Date
Julien Fontanet
7c486f1159 Merge branch 'master' into nr-delete-fs-createOutputStream 2021-02-17 16:48:14 +01:00
Julien Fontanet
819c798e99 chore: format mardown files with Prettier 2021-02-17 16:04:22 +01:00
Julien Fontanet
8560ca0661 chore(vhd-lib/Vhd#_getBatEntry): name param blockId
Because it's not a full block but an identifier.
2021-02-17 14:39:02 +01:00
Julien Fontanet
82cdfe7014 chore(vhd-lib): fix a test
From #5481
2021-02-17 14:30:40 +01:00
Julien Fontanet
52642f5854 chore(vhd-lib): move tests into src
From #5481

Fix Jest module mapping.
2021-02-17 14:29:40 +01:00
Julien Fontanet
6c6f9f5a44 feat(backups-cli/clean-vms): rename force flag to remove 2021-02-17 14:01:14 +01:00
Julien Fontanet
039ce15253 chore(xo-server/backups-ng): use invalidParameter.is() 2021-02-17 14:01:14 +01:00
badrAZ
695a4c785c feat(xo-server/backups-ng): handle proxy VM restoration logs (#5576) 2021-02-17 11:03:52 +01:00
Rajaa.BARHTAOUI
7d7f160159 fix(xo-web/migrateVm): don't automatically select migration network (#5564)
See xoa-support#3355
2021-02-17 10:06:03 +01:00
Julien Fontanet
b454b4dff1 feat(xo-server/backups): add proxy id to logs 2021-02-17 09:53:00 +01:00
Nicolas Raynaud
3079e1689c update CHANGELOG.unreleased.md 2021-02-15 09:26:45 +01:00
Nicolas Raynaud
90b8eed038 Merge branch 'master' into nr-delete-fs-createOutputStream 2021-02-15 08:57:29 +01:00
Nicolas Raynaud
6f43d4f091 fix tests 2021-02-06 13:14:07 +01:00
Nicolas Raynaud
a33fc27313 fix tests 2021-02-05 19:50:26 +01:00
Nicolas Raynaud
cad5f74d45 fix tests 2021-02-05 16:50:40 +01:00
Nicolas Raynaud
7787f39505 fix tests 2021-02-05 13:43:22 +01:00
Nicolas Raynaud
548a15214b try to fix travis tests 2021-02-04 17:20:13 +01:00
Nicolas Raynaud
de6efe182b fix AbstractHandler.outputStream() argument order to be compatible with createPrefixWrapperMethods() 2021-02-04 16:43:53 +01:00
Nicolas Raynaud
834fd5dd07 fix s3._rmtree() 2021-02-03 17:38:31 +01:00
Nicolas Raynaud
c1e72697b0 restore S3 ssl 2021-02-03 15:49:12 +01:00
Nicolas Raynaud
78dc03e23e make cancelToken an keyword optional argument. 2021-02-03 15:43:47 +01:00
Nicolas Raynaud
2f7af5c05a restore docs files 2021-02-03 14:31:42 +01:00
Nicolas Raynaud
877d27a433 propagate the cancelToken to fs 2021-02-03 14:25:24 +01:00
Nicolas Raynaud
37d1b48c1b Merge branch 'master' into nr-delete-fs-createOutputStream 2021-02-02 17:13:23 +01:00
Nicolas Raynaud
7d6a689542 merge master 2021-01-15 15:29:14 +01:00
Nicolas Raynaud
b75c06f7fe convert #createOutputStream() test to #outputStream() test 2020-11-17 14:30:33 +01:00
Nicolas Raynaud
e8bd2ae1e0 fix parameter orders, add S3 rmtree() 2020-11-16 18:28:41 +01:00
Nicolas Raynaud
6e2396e5f4 fix parameter orders, add S3 rmtree() 2020-11-16 18:26:28 +01:00
Nicolas Raynaud
01ceed9e99 Merge branch 'master' into nr-delete-fs-createOutputStream 2020-11-16 17:00:05 +01:00
Nicolas Raynaud
edd3628a67 delete fs.createOutputStream() 2020-11-13 09:04:24 +01:00
Nicolas Raynaud
4a3b2a2a5a Apply suggestions from code review
Co-authored-by: Julien Fontanet <julien.fontanet@isonoe.net>
2020-11-13 08:39:21 +01:00
Nicolas Raynaud
1a7b49ff39 delete fs.createOutputStream() 2020-11-12 16:38:30 +01:00
20 changed files with 250 additions and 176 deletions

View File

@@ -1,7 +1,7 @@
#!/usr/bin/env node
// assigned when options are parsed by the main function
let force, merge
let merge, remove
// -----------------------------------------------------------------------------
@@ -80,12 +80,12 @@ const mergeVhdChain = limitConcurrency(1)(async function mergeVhdChain(chain) {
}
await Promise.all([
force && fs.rename(parent, child),
remove && fs.rename(parent, child),
asyncMap(children.slice(0, -1), child => {
console.warn('Unused VHD', child)
force && console.warn(' deleting…')
remove && console.warn(' deleting…')
console.warn('')
return force && handler.unlink(child)
return remove && handler.unlink(child)
}),
])
})
@@ -127,9 +127,9 @@ async function handleVm(vmDir) {
console.warn('Error while checking VHD', path)
console.warn(' ', error)
if (error != null && error.code === 'ERR_ASSERTION') {
force && console.warn(' deleting…')
remove && console.warn(' deleting…')
console.warn('')
force && (await handler.unlink(path))
remove && (await handler.unlink(path))
}
}
})
@@ -155,9 +155,9 @@ async function handleVm(vmDir) {
console.warn('Error while checking VHD', vhd)
console.warn(' missing parent', parent)
force && console.warn(' deleting…')
remove && console.warn(' deleting…')
console.warn('')
force && deletions.push(handler.unlink(vhd))
remove && deletions.push(handler.unlink(vhd))
}
}
@@ -205,9 +205,9 @@ async function handleVm(vmDir) {
} else {
console.warn('Error while checking backup', json)
console.warn(' missing file', linkedXva)
force && console.warn(' deleting…')
remove && console.warn(' deleting…')
console.warn('')
force && (await handler.unlink(json))
remove && (await handler.unlink(json))
}
} else if (mode === 'delta') {
const linkedVhds = (() => {
@@ -226,9 +226,9 @@ async function handleVm(vmDir) {
missingVhds.forEach(vhd => {
console.warn(' ', vhd)
})
force && console.warn(' deleting…')
remove && console.warn(' deleting…')
console.warn('')
force && (await handler.unlink(json))
remove && (await handler.unlink(json))
}
}
})
@@ -266,9 +266,9 @@ async function handleVm(vmDir) {
}
console.warn('Unused VHD', vhd)
force && console.warn(' deleting…')
remove && console.warn(' deleting…')
console.warn('')
force && unusedVhdsDeletion.push(handler.unlink(vhd))
remove && unusedVhdsDeletion.push(handler.unlink(vhd))
}
toCheck.forEach(vhd => {
@@ -287,17 +287,17 @@ async function handleVm(vmDir) {
unusedVhdsDeletion,
asyncMap(unusedXvas, path => {
console.warn('Unused XVA', path)
force && console.warn(' deleting…')
remove && console.warn(' deleting…')
console.warn('')
return force && handler.unlink(path)
return remove && handler.unlink(path)
}),
asyncMap(xvaSums, path => {
// no need to handle checksums for XVAs deleted by the script, they will be handled by `unlink()`
if (!xvas.has(path.slice(0, -'.checksum'.length))) {
console.warn('Unused XVA checksum', path)
force && console.warn(' deleting…')
remove && console.warn(' deleting…')
console.warn('')
return force && handler.unlink(path)
return remove && handler.unlink(path)
}
}),
])
@@ -308,17 +308,17 @@ async function handleVm(vmDir) {
module.exports = async function main(args) {
const opts = getopts(args, {
alias: {
force: 'f',
remove: 'r',
merge: 'm',
},
boolean: ['force', 'merge'],
boolean: ['merge', 'remove'],
default: {
force: false,
merge: false,
remove: false,
},
})
;({ force, merge } = opts)
;({ remove, merge } = opts)
await asyncMap(opts._, async vmDir => {
vmDir = resolve(vmDir)

View File

@@ -4,6 +4,7 @@
import getStream from 'get-stream'
import asyncMap from '@xen-orchestra/async-map'
import CancelToken from 'promise-toolbox/CancelToken'
import limit from 'limit-concurrency-decorator'
import path, { basename } from 'path'
import synchronized from 'decorator-synchronized'
@@ -119,42 +120,6 @@ export default class RemoteHandlerAbstract {
await this.__closeFile(fd)
}
// TODO: remove method
async createOutputStream(file: File, { checksum = false, dirMode, ...options }: Object = {}): Promise<LaxWritable> {
if (typeof file === 'string') {
file = normalizePath(file)
}
const path = typeof file === 'string' ? file : file.path
const streamP = timeout.call(
this._createOutputStream(file, {
dirMode,
flags: 'wx',
...options,
}),
this._timeout
)
if (!checksum) {
return streamP
}
const checksumStream = createChecksumStream()
const forwardError = error => {
checksumStream.emit('error', error)
}
const stream = await streamP
stream.on('error', forwardError)
checksumStream.pipe(stream)
// $FlowFixMe
checksumStream.checksumWritten = checksumStream.checksum
.then(value => this._outputFile(checksumFile(path), value, { flags: 'wx' }))
.catch(forwardError)
return checksumStream
}
createReadStream(
file: File,
{ checksum = false, ignoreMissingChecksum = false, ...options }: Object = {}
@@ -209,14 +174,15 @@ export default class RemoteHandlerAbstract {
// write a stream to a file using a temporary file
async outputStream(
input: Readable | Promise<Readable>,
path: string,
{ checksum = true, dirMode }: { checksum?: boolean, dirMode?: number } = {}
input: Readable | Promise<Readable>,
{ checksum = true, dirMode, cancelToken = CancelToken.none }: { checksum?: boolean, dirMode?: number } = {}
): Promise<void> {
path = normalizePath(path)
return this._outputStream(await input, normalizePath(path), {
checksum,
dirMode,
cancelToken,
})
}
@@ -477,13 +443,51 @@ export default class RemoteHandlerAbstract {
return this._outputFile(file, data, { flags })
}
async _outputStream(input: Readable, path: string, { checksum, dirMode }: { checksum?: boolean, dirMode?: number }) {
async _createOutputStreamChecksum(file: File, { checksum = false, ...options }: Object = {}): Promise<LaxWritable> {
if (typeof file === 'string') {
file = normalizePath(file)
}
const path = typeof file === 'string' ? file : file.path
const streamP = timeout.call(
this._createOutputStream(file, {
flags: 'wx',
...options,
}),
this._timeout
)
if (!checksum) {
return streamP
}
const checksumStream = createChecksumStream()
const forwardError = error => {
checksumStream.emit('error', error)
}
const stream = await streamP
stream.on('error', forwardError)
checksumStream.pipe(stream)
// $FlowFixMe
checksumStream.checksumWritten = checksumStream.checksum
.then(value => this._outputFile(checksumFile(path), value, { flags: 'wx' }))
.catch(forwardError)
return checksumStream
}
async _outputStream(
input: Readable,
path: string,
{ checksum, dirMode, cancelToken = CancelToken.none }: { checksum?: boolean, dirMode?: number }
) {
const tmpPath = `${dirname(path)}/.${basename(path)}`
const output = await this.createOutputStream(tmpPath, {
checksum,
dirMode,
})
const output = await this._createOutputStreamChecksum(tmpPath, { checksum })
try {
cancelToken.promise.then(reason => {
input.destroy(reason)
})
input.pipe(output)
await fromEvent(output, 'finish')
await output.checksumWritten

View File

@@ -30,18 +30,6 @@ describe('closeFile()', () => {
})
})
describe('createOutputStream()', () => {
it(`throws in case of timeout`, async () => {
const testHandler = new TestHandler({
createOutputStream: () => new Promise(() => {}),
})
const promise = testHandler.createOutputStream('File')
jest.advanceTimersByTime(TIMEOUT)
await expect(promise).rejects.toThrowError(TimeoutError)
})
})
describe('getInfo()', () => {
it('throws in case of timeout', async () => {
const testHandler = new TestHandler({

View File

@@ -3,11 +3,9 @@
import 'dotenv/config'
import asyncIteratorToStream from 'async-iterator-to-stream'
import { forOwn, random } from 'lodash'
import { fromCallback } from 'promise-toolbox'
import { pipeline } from 'readable-stream'
import { tmpdir } from 'os'
import { getHandler } from '.'
import { getHandler } from './'
// https://gist.github.com/julien-f/3228c3f34fdac01ade09
const unsecureRandomBytes = n => {
@@ -82,10 +80,9 @@ handlers.forEach(url => {
})
})
describe('#createOutputStream()', () => {
describe('#outputStream()', () => {
it('creates parent dir if missing', async () => {
const stream = await handler.createOutputStream('dir/file')
await fromCallback(pipeline, createTestDataStream(), stream)
await handler.outputStream('dir/file', createTestDataStream())
await expect(await handler.readFile('dir/file')).toEqual(TEST_DATA)
})
})

View File

@@ -1,9 +1,11 @@
import aws from '@sullux/aws-sdk'
import assert from 'assert'
import http from 'http'
import { parse } from 'xo-remote-parser'
import RemoteHandlerAbstract from './abstract'
import { createChecksumStream } from './checksum'
import CancelToken from 'promise-toolbox/CancelToken'
// endpoints https://docs.aws.amazon.com/general/latest/gr/s3.html
@@ -13,12 +15,14 @@ const MAX_PART_SIZE = 1024 * 1024 * 1024 * 5 // 5GB
const MAX_PARTS_COUNT = 10000
const MAX_OBJECT_SIZE = 1024 * 1024 * 1024 * 1024 * 5 // 5TB
const IDEAL_FRAGMENT_SIZE = Math.ceil(MAX_OBJECT_SIZE / MAX_PARTS_COUNT) // the smallest fragment size that still allows a 5TB upload in 10000 fragments, about 524MB
const USE_SSL = true
export default class S3Handler extends RemoteHandlerAbstract {
constructor(remote, _opts) {
super(remote)
const { host, path, username, password } = parse(remote.url)
// https://www.zenko.io/blog/first-things-first-getting-started-scality-s3-server/
this._s3 = aws({
const params = {
accessKeyId: username,
apiVersion: '2006-03-01',
endpoint: host,
@@ -28,7 +32,12 @@ export default class S3Handler extends RemoteHandlerAbstract {
httpOptions: {
timeout: 600000,
},
}).s3
}
if (!USE_SSL) {
params.httpOptions.agent = new http.Agent()
params.sslEnabled = false
}
this._s3 = aws(params).s3
const splitPath = path.split('/').filter(s => s.length)
this._bucket = splitPath.shift()
@@ -43,7 +52,10 @@ export default class S3Handler extends RemoteHandlerAbstract {
return { Bucket: this._bucket, Key: this._dir + file }
}
async _outputStream(input, path, { checksum }) {
async _outputStream(input, path, { checksum, cancelToken = CancelToken.none }) {
cancelToken.promise.then(reason => {
input.destroy(reason)
})
let inputStream = input
if (checksum) {
const checksumStream = createChecksumStream()
@@ -266,4 +278,26 @@ export default class S3Handler extends RemoteHandlerAbstract {
}
async _closeFile(fd) {}
// https://stackoverflow.com/a/48955582/72637
async _rmtree(dir) {
const listParams = {
Bucket: this._bucket,
Prefix: this._dir + dir,
}
let listedObjects = {}
do {
listedObjects = await this._s3.listObjectsV2({
...listParams,
ContinuationToken: listedObjects.NextContinuationToken,
})
if (listedObjects.Contents.length === 0) {
return
}
await this._s3.deleteObjects({
Bucket: this._bucket,
Delete: { Objects: listedObjects.Contents.map(({ Key }) => ({ Key })) },
})
} while (listedObjects.IsTruncated)
}
}

View File

@@ -12,6 +12,8 @@
- [Backup/restore] Allow backup restore to any licence even if XOA isn't registered (PR [#5547](https://github.com/vatesfr/xen-orchestra/pull/5547))
- [Import] Ignore case when detecting file type (PR [#5574](https://github.com/vatesfr/xen-orchestra/pull/5574))
- [Backup] Ability to set a specific schedule to always run full backups [#5541](https://github.com/vatesfr/xen-orchestra/issues/5541) (PR [#5546](https://github.com/vatesfr/xen-orchestra/pull/5546))
- [Proxy] Log VM backup restoration (PR [#5576](https://github.com/vatesfr/xen-orchestra/pull/5576))
- [Backup/S3] Allow backup of metadata to Amazon Web Services S3 (PR [#5373](https://github.com/vatesfr/xen-orchestra/pull/5373))
### Bug fixes
@@ -20,6 +22,7 @@
- [VM/Snapshot export] Fix `Error: no available place in queue` on canceling an export via browser then starting a new one when the concurrency threshold is reached [#5535](https://github.com/vatesfr/xen-orchestra/issues/5535) (PR [#5538](https://github.com/vatesfr/xen-orchestra/pull/5538))
- [Servers] Hide pool's objects if its master is unreachable [#5475](https://github.com/vatesfr/xen-orchestra/issues/5475) (PR [#5526](https://github.com/vatesfr/xen-orchestra/pull/5526))
- [Host] Restart toolstack: fix `ECONNREFUSED` error (PR [#5553](https://github.com/vatesfr/xen-orchestra/pull/5553))
- [VM migration] Intra-pool: don't automatically select migration network if no default migration network is defined on the pool (PR [#5564](https://github.com/vatesfr/xen-orchestra/pull/5564))
### Packages to release
@@ -38,7 +41,8 @@
>
> In case of conflict, the highest (lowest in previous list) `$version` wins.
- @xen-orchestra/fs minor
- @xen-orchestra/fs major
- vhd-lib minor
- xen-api patch
- xo-common minor
- xo-server minor

View File

@@ -436,8 +436,8 @@ If you are using HA, maintenance mode will be required before doing any reboot o
Maintenance mode will trigger two actions internally:
* disabling the host (no new VMs could start on it)
* evacuate VMs that can be evacuated ("agile" VMs, which could be live migrated)
- disabling the host (no new VMs could start on it)
- evacuate VMs that can be evacuated ("agile" VMs, which could be live migrated)
It's perfect if you want to shutdown the host for hardware replacement, or if you want to do some other operations without disrupting your production.

View File

@@ -82,8 +82,8 @@ member: 348
The plugin needs to know that Bruce Wayne belongs to the heroes group. To do so, you need to set 2 entries in the configuration:
- **Group attribute**, which is the name of the *group* attribute that is used to list users within a group. In this example, it would be `member`.
- **User attribute**, which is the name of the *user* attribute that is used to reference users in groups. In this example, it would be `uid` since `347`, `348`, etc. are user `uid`s.
- **Group attribute**, which is the name of the _group_ attribute that is used to list users within a group. In this example, it would be `member`.
- **User attribute**, which is the name of the _user_ attribute that is used to reference users in groups. In this example, it would be `uid` since `347`, `348`, etc. are user `uid`s.
Save the configuration and you're good to go. From now on, every time an LDAP user logs into XO, the plugin will automatically create or update that user's groups and add them to those groups. If you need to import all the groups at once, you can do so from Settings > Groups > Synchronize LDAP Groups. This can be useful if you want to assign ACLs on groups without having to wait for a member of the group to log in.

View File

@@ -11,9 +11,9 @@ import { pFromCallback } from 'promise-toolbox'
import { pipeline } from 'readable-stream'
import { randomBytes } from 'crypto'
import Vhd, { chainVhd, createSyntheticStream, mergeVhd as vhdMerge } from './'
import Vhd, { chainVhd, createSyntheticStream, mergeVhd as vhdMerge } from './index'
import { SECTOR_SIZE } from './src/_constants'
import { SECTOR_SIZE } from './_constants'
let tempDir = null

View File

@@ -8,7 +8,7 @@ import { pipeline } from 'readable-stream'
import { createReadableRawStream, createReadableSparseStream } from './'
import { createFooter } from './src/_createFooterHeader'
import { createFooter } from './_createFooterHeader'
let tempDir = null
@@ -105,7 +105,7 @@ test('ReadableSparseVHDStream can handle a sparse file', async () => {
const stream = await createReadableSparseStream(
fileSize,
blockSize,
blocks.map(b => b.logicalAddressBytes),
blocks.map(b => b.logicalAddressBytes / blockSize),
blocks
)
expect(stream.length).toEqual(4197888)

View File

@@ -178,8 +178,8 @@ export default class Vhd {
}
// return the first sector (bitmap) of a block
_getBatEntry(block) {
const i = block * 4
_getBatEntry(blockId) {
const i = blockId * 4
const { blockTable } = this
return i < blockTable.length ? blockTable.readUInt32BE(i) : BLOCK_UNUSED
}

View File

@@ -85,7 +85,7 @@ When logical volume no longer necessary:
## Mount block device
> Tip: `offset` and `sizelimit` are only required on a partionned disk
> Tip: `offset` and `sizelimit` are only required on a partionned disk
```
> mkdir /tmp/block-mount

View File

@@ -12,6 +12,7 @@ import { PassThrough } from 'stream'
import { AssertionError } from 'assert'
import { basename, dirname } from 'path'
import { decorateWith } from '@vates/decorate-with'
import { invalidParameters } from 'xo-common/api-errors'
import { isValidXva } from '@xen-orchestra/backups/isValidXva'
import { parseDuration } from '@vates/parse-duration'
import {
@@ -29,7 +30,7 @@ import {
sum,
values,
} from 'lodash'
import { CancelToken, ignoreErrors, pFinally, timeout } from 'promise-toolbox'
import { CancelToken, ignoreErrors, timeout } from 'promise-toolbox'
import Vhd, { chainVhd, checkVhdChain, createSyntheticStream as createVhdReadStream } from 'vhd-lib'
import type Logger from '../logs/loggers/abstract'
@@ -631,8 +632,7 @@ export default class BackupNg {
}
return
} catch (error) {
// XO API invalid parameters error
if (error.code === 10) {
if (invalidParameters.is(error)) {
delete params.streamLogs
return app.callProxyMethod(proxyId, 'backup.run', params)
}
@@ -820,53 +820,102 @@ export default class BackupNg {
const { metadataFilename, remoteId } = parseVmBackupId(id)
const { proxy, url, options } = await app.getRemoteWithCredentials(remoteId)
if (proxy !== undefined) {
const { allowUnauthorized, host, password, username } = await app.getXenServer(app.getXenServerIdByObject(sr.$id))
return app.callProxyMethod(proxy, 'backup.importVmBackup', {
backupId: metadataFilename,
remote: {
url,
options,
},
srUuid: sr.uuid,
xapi: {
allowUnauthorized,
credentials: {
username,
password,
},
url: host,
},
})
}
const handler = await app.getRemoteHandler(remoteId)
const metadata: Metadata = JSON.parse(String(await handler.readFile(metadataFilename)))
const importer = importers[metadata.mode]
if (importer === undefined) {
throw new Error(`no importer for backup mode ${metadata.mode}`)
}
const { jobId, timestamp: time } = metadata
let rootTaskId
const logger = this._logger
return wrapTaskFn(
{
data: {
jobId,
srId,
time,
},
logger,
message: 'restore',
},
taskId => {
this._runningRestores.add(taskId)
return importer(handler, metadataFilename, metadata, xapi, sr, taskId, logger)::pFinally(() => {
this._runningRestores.delete(taskId)
})
try {
if (proxy !== undefined) {
const { allowUnauthorized, host, password, username } = await app.getXenServer(
app.getXenServerIdByObject(sr.$id)
)
const params = {
backupId: metadataFilename,
remote: {
url,
options,
},
srUuid: sr.uuid,
streamLogs: true,
xapi: {
allowUnauthorized,
credentials: {
username,
password,
},
url: host,
},
}
try {
const logsStream = await app.callProxyMethod(proxy, 'backup.importVmBackup', params, {
assertType: 'iterator',
})
const localTaskIds = { __proto__: null }
for await (const log of logsStream) {
const { event, message, taskId } = log
const common = {
data: log.data,
event: 'task.' + event,
result: log.result,
status: log.status,
}
if (event === 'start') {
const { parentId } = log
if (parentId === undefined) {
rootTaskId = localTaskIds[taskId] = logger.notice(message, common)
this._runningRestores.add(rootTaskId)
} else {
common.parentId = localTaskIds[parentId]
localTaskIds[taskId] = logger.notice(message, common)
}
} else {
common.taskId = localTaskIds[taskId]
logger.notice(message, common)
}
}
return
} catch (error) {
if (invalidParameters.is(error)) {
delete params.streamLogs
return app.callProxyMethod(proxy, 'backup.importVmBackup', params)
}
throw error
}
}
)()
const handler = await app.getRemoteHandler(remoteId)
const metadata: Metadata = JSON.parse(String(await handler.readFile(metadataFilename)))
const importer = importers[metadata.mode]
if (importer === undefined) {
throw new Error(`no importer for backup mode ${metadata.mode}`)
}
const { jobId, timestamp: time } = metadata
return wrapTaskFn(
{
data: {
jobId,
srId,
time,
},
logger,
message: 'restore',
},
taskId => {
rootTaskId = taskId
this._runningRestores.add(taskId)
return importer(handler, metadataFilename, metadata, xapi, sr, taskId, logger)
}
)()
} finally {
this._runningRestores.delete(rootTaskId)
}
}
@decorateWith(
@@ -1325,7 +1374,7 @@ export default class BackupNg {
parentId: taskId,
result: () => ({ size: xva.size }),
},
handler.outputStream(fork, dataFilename, {
handler.outputStream(dataFilename, fork, {
dirMode,
})
)
@@ -1663,7 +1712,7 @@ export default class BackupNg {
}
// FIXME: should only be renamed after the metadata file has been written
await handler.outputStream(fork.streams[`${id}.vhd`](), path, {
await handler.outputStream(path, fork.streams[`${id}.vhd`](), {
// no checksum for VHDs, because they will be invalidated by
// merges and chainings
checksum: false,

View File

@@ -548,11 +548,7 @@ export default class {
const sizeStream = createSizeStream()
try {
const targetStream = await handler.createOutputStream(backupFullPath)
stream.on('error', error => targetStream.emit('error', error))
await Promise.all([fromEvent(stream.pipe(sizeStream).pipe(targetStream), 'finish'), stream.task])
await Promise.all([handler.outputStream(backupFullPath, sizeStream), stream.task])
} catch (error) {
// Remove new backup. (corrupt).
await handler.unlink(backupFullPath)::ignoreErrors()
@@ -782,9 +778,7 @@ export default class {
@deferrable
async _backupVm($defer, vm, handler, file, { compress }) {
const targetStream = await handler.createOutputStream(file)
$defer.onFailure.call(handler, 'unlink', file)
$defer.onFailure.call(targetStream, 'close')
const sourceStream = await this._xo.getXapi(vm).exportVm(vm._xapiId, {
compress,
@@ -792,9 +786,9 @@ export default class {
const sizeStream = createSizeStream()
sourceStream.pipe(sizeStream).pipe(targetStream)
sourceStream.pipe(sizeStream)
await Promise.all([sourceStream.task, fromEvent(targetStream, 'finish')])
await Promise.all([sourceStream.task, handler.outputStream(file, sizeStream)])
return {
transferSize: sizeStream.size,

View File

@@ -278,6 +278,7 @@ export default class Jobs {
method: 'backupNg.runJob',
params: {
id: job.id,
proxy: job.proxy,
schedule: schedule?.id,
settings: job.settings,
vms: job.vms,

View File

@@ -1,7 +1,7 @@
// @flow
import asyncMap from '@xen-orchestra/async-map'
import createLogger from '@xen-orchestra/log'
import { fromEvent, ignoreErrors, timeout } from 'promise-toolbox'
import { ignoreErrors, timeout } from 'promise-toolbox'
import { parseDuration } from '@vates/parse-duration'
import { debounceWithKey, REMOVE_CACHE_ENTRY } from '../_pDebounceWithKey'
@@ -304,20 +304,13 @@ export default class metadataBackup {
}
)
let outputStream
try {
const { dirMode } = this._backupOptions
await waitAll([
(async () => {
outputStream = await handler.createOutputStream(fileName, {
dirMode,
})
// 'readable-stream/pipeline' not call the callback when an error throws
// from the readable stream
stream.pipe(outputStream)
return timeout.call(
fromEvent(stream, 'end').catch(error => {
handler.outputStream(fileName, stream, { cancelToken }).catch(error => {
stream.destroy()
if (error.message !== 'aborted') {
throw error
}
@@ -353,9 +346,6 @@ export default class metadataBackup {
this._listPoolMetadataBackups(REMOVE_CACHE_ENTRY, remoteId)
} catch (error) {
if (outputStream !== undefined) {
outputStream.destroy()
}
await handler.rmtree(dir).catch(error => {
logger.warning(`unable to delete the folder ${dir}`, {
event: 'task.warning',

View File

@@ -24,6 +24,15 @@ afterEach(async () => {
await pFromCallback(cb => rimraf(tmpDir, cb))
})
function bufferToArray(buffer) {
const view = new DataView(buffer)
const res = []
for (let i = 0; i < buffer.byteLength; i += 4) {
res.push(view.getUint32(i, true))
}
return res
}
function createFileAccessor(file) {
return async (start, end) => {
if (start < 0 || end < 0) {
@@ -52,7 +61,11 @@ test('VMDK to VHD can convert a random data file with VMDKDirectParser', async (
})
const result = await readVmdkGrainTable(createFileAccessor(vmdkFileName))
const pipe = (
await vmdkToVhd(createReadStream(vmdkFileName), result.grainLogicalAddressList, result.grainFileOffsetList)
await vmdkToVhd(
createReadStream(vmdkFileName),
bufferToArray(result.grainLogicalAddressList),
bufferToArray(result.grainFileOffsetList)
)
).pipe(createWriteStream(vhdFileName))
await eventToPromise(pipe, 'finish')
await execa('vhd-util', ['check', '-p', '-b', '-t', '-n', vhdFileName])

View File

@@ -163,7 +163,7 @@ export default class MigrateVmModalBody extends BaseComponent {
host,
intraPool,
mapVifsNetworks: undefined,
migrationNetworkId: getDefaultMigrationNetwork(host, pools, pifs),
migrationNetworkId: getDefaultMigrationNetwork(intraPool, host, pools, pifs),
targetSrs: {},
})
return
@@ -190,7 +190,7 @@ export default class MigrateVmModalBody extends BaseComponent {
host,
intraPool,
mapVifsNetworks: defaultNetworksForVif,
migrationNetworkId: getDefaultMigrationNetwork(host, pools, pifs),
migrationNetworkId: getDefaultMigrationNetwork(intraPool, host, pools, pifs),
targetSrs: { mainSr: pools[host.$pool].default_SR },
})
}

View File

@@ -180,11 +180,11 @@ export default class MigrateVmsModalBody extends BaseComponent {
return
}
const { pools, pifs } = this.props
const defaultMigrationNetworkId = getDefaultMigrationNetwork(host, pools, pifs)
const intraPool = every(this.props.vms, vm => vm.$pool === host.$pool)
const defaultMigrationNetworkId = getDefaultMigrationNetwork(intraPool, host, pools, pifs)
const defaultSrId = pools[host.$pool].default_SR
const defaultSrConnectedToHost = some(host.$PBDs, pbd => this._getObject(pbd).SR === defaultSrId)
const intraPool = every(this.props.vms, vm => vm.$pool === host.$pool)
const doNotMigrateVmVdis = {}
const doNotMigrateVdi = {}
let noVdisMigration = false

View File

@@ -24,7 +24,7 @@ export const getDefaultNetworkForVif = (vif, destHost, pifs, networks) => {
return destNetworkId
}
export const getDefaultMigrationNetwork = (destHost, pools, pifs) => {
export const getDefaultMigrationNetwork = (intraPool, destHost, pools, pifs) => {
const migrationNetwork = pools[destHost.$pool].otherConfig['xo:migrationNetwork']
let defaultPif
return defined(
@@ -38,6 +38,6 @@ export const getDefaultMigrationNetwork = (destHost, pools, pifs) => {
}
}
}),
defaultPif
intraPool ? {} : defaultPif
).$network
}