Compare commits
32 Commits
nr-fix-s3-
...
nr-delete-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7c486f1159 | ||
|
|
819c798e99 | ||
|
|
8560ca0661 | ||
|
|
82cdfe7014 | ||
|
|
52642f5854 | ||
|
|
6c6f9f5a44 | ||
|
|
039ce15253 | ||
|
|
695a4c785c | ||
|
|
7d7f160159 | ||
|
|
b454b4dff1 | ||
|
|
3079e1689c | ||
|
|
90b8eed038 | ||
|
|
6f43d4f091 | ||
|
|
a33fc27313 | ||
|
|
cad5f74d45 | ||
|
|
7787f39505 | ||
|
|
548a15214b | ||
|
|
de6efe182b | ||
|
|
834fd5dd07 | ||
|
|
c1e72697b0 | ||
|
|
78dc03e23e | ||
|
|
2f7af5c05a | ||
|
|
877d27a433 | ||
|
|
37d1b48c1b | ||
|
|
7d6a689542 | ||
|
|
b75c06f7fe | ||
|
|
e8bd2ae1e0 | ||
|
|
6e2396e5f4 | ||
|
|
01ceed9e99 | ||
|
|
edd3628a67 | ||
|
|
4a3b2a2a5a | ||
|
|
1a7b49ff39 |
@@ -1,7 +1,7 @@
|
||||
#!/usr/bin/env node
|
||||
|
||||
// assigned when options are parsed by the main function
|
||||
let force, merge
|
||||
let merge, remove
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
@@ -80,12 +80,12 @@ const mergeVhdChain = limitConcurrency(1)(async function mergeVhdChain(chain) {
|
||||
}
|
||||
|
||||
await Promise.all([
|
||||
force && fs.rename(parent, child),
|
||||
remove && fs.rename(parent, child),
|
||||
asyncMap(children.slice(0, -1), child => {
|
||||
console.warn('Unused VHD', child)
|
||||
force && console.warn(' deleting…')
|
||||
remove && console.warn(' deleting…')
|
||||
console.warn('')
|
||||
return force && handler.unlink(child)
|
||||
return remove && handler.unlink(child)
|
||||
}),
|
||||
])
|
||||
})
|
||||
@@ -127,9 +127,9 @@ async function handleVm(vmDir) {
|
||||
console.warn('Error while checking VHD', path)
|
||||
console.warn(' ', error)
|
||||
if (error != null && error.code === 'ERR_ASSERTION') {
|
||||
force && console.warn(' deleting…')
|
||||
remove && console.warn(' deleting…')
|
||||
console.warn('')
|
||||
force && (await handler.unlink(path))
|
||||
remove && (await handler.unlink(path))
|
||||
}
|
||||
}
|
||||
})
|
||||
@@ -155,9 +155,9 @@ async function handleVm(vmDir) {
|
||||
|
||||
console.warn('Error while checking VHD', vhd)
|
||||
console.warn(' missing parent', parent)
|
||||
force && console.warn(' deleting…')
|
||||
remove && console.warn(' deleting…')
|
||||
console.warn('')
|
||||
force && deletions.push(handler.unlink(vhd))
|
||||
remove && deletions.push(handler.unlink(vhd))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -205,9 +205,9 @@ async function handleVm(vmDir) {
|
||||
} else {
|
||||
console.warn('Error while checking backup', json)
|
||||
console.warn(' missing file', linkedXva)
|
||||
force && console.warn(' deleting…')
|
||||
remove && console.warn(' deleting…')
|
||||
console.warn('')
|
||||
force && (await handler.unlink(json))
|
||||
remove && (await handler.unlink(json))
|
||||
}
|
||||
} else if (mode === 'delta') {
|
||||
const linkedVhds = (() => {
|
||||
@@ -226,9 +226,9 @@ async function handleVm(vmDir) {
|
||||
missingVhds.forEach(vhd => {
|
||||
console.warn(' ', vhd)
|
||||
})
|
||||
force && console.warn(' deleting…')
|
||||
remove && console.warn(' deleting…')
|
||||
console.warn('')
|
||||
force && (await handler.unlink(json))
|
||||
remove && (await handler.unlink(json))
|
||||
}
|
||||
}
|
||||
})
|
||||
@@ -266,9 +266,9 @@ async function handleVm(vmDir) {
|
||||
}
|
||||
|
||||
console.warn('Unused VHD', vhd)
|
||||
force && console.warn(' deleting…')
|
||||
remove && console.warn(' deleting…')
|
||||
console.warn('')
|
||||
force && unusedVhdsDeletion.push(handler.unlink(vhd))
|
||||
remove && unusedVhdsDeletion.push(handler.unlink(vhd))
|
||||
}
|
||||
|
||||
toCheck.forEach(vhd => {
|
||||
@@ -287,17 +287,17 @@ async function handleVm(vmDir) {
|
||||
unusedVhdsDeletion,
|
||||
asyncMap(unusedXvas, path => {
|
||||
console.warn('Unused XVA', path)
|
||||
force && console.warn(' deleting…')
|
||||
remove && console.warn(' deleting…')
|
||||
console.warn('')
|
||||
return force && handler.unlink(path)
|
||||
return remove && handler.unlink(path)
|
||||
}),
|
||||
asyncMap(xvaSums, path => {
|
||||
// no need to handle checksums for XVAs deleted by the script, they will be handled by `unlink()`
|
||||
if (!xvas.has(path.slice(0, -'.checksum'.length))) {
|
||||
console.warn('Unused XVA checksum', path)
|
||||
force && console.warn(' deleting…')
|
||||
remove && console.warn(' deleting…')
|
||||
console.warn('')
|
||||
return force && handler.unlink(path)
|
||||
return remove && handler.unlink(path)
|
||||
}
|
||||
}),
|
||||
])
|
||||
@@ -308,17 +308,17 @@ async function handleVm(vmDir) {
|
||||
module.exports = async function main(args) {
|
||||
const opts = getopts(args, {
|
||||
alias: {
|
||||
force: 'f',
|
||||
remove: 'r',
|
||||
merge: 'm',
|
||||
},
|
||||
boolean: ['force', 'merge'],
|
||||
boolean: ['merge', 'remove'],
|
||||
default: {
|
||||
force: false,
|
||||
merge: false,
|
||||
remove: false,
|
||||
},
|
||||
})
|
||||
|
||||
;({ force, merge } = opts)
|
||||
;({ remove, merge } = opts)
|
||||
await asyncMap(opts._, async vmDir => {
|
||||
vmDir = resolve(vmDir)
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
import getStream from 'get-stream'
|
||||
|
||||
import asyncMap from '@xen-orchestra/async-map'
|
||||
import CancelToken from 'promise-toolbox/CancelToken'
|
||||
import limit from 'limit-concurrency-decorator'
|
||||
import path, { basename } from 'path'
|
||||
import synchronized from 'decorator-synchronized'
|
||||
@@ -119,42 +120,6 @@ export default class RemoteHandlerAbstract {
|
||||
await this.__closeFile(fd)
|
||||
}
|
||||
|
||||
// TODO: remove method
|
||||
async createOutputStream(file: File, { checksum = false, dirMode, ...options }: Object = {}): Promise<LaxWritable> {
|
||||
if (typeof file === 'string') {
|
||||
file = normalizePath(file)
|
||||
}
|
||||
const path = typeof file === 'string' ? file : file.path
|
||||
const streamP = timeout.call(
|
||||
this._createOutputStream(file, {
|
||||
dirMode,
|
||||
flags: 'wx',
|
||||
...options,
|
||||
}),
|
||||
this._timeout
|
||||
)
|
||||
|
||||
if (!checksum) {
|
||||
return streamP
|
||||
}
|
||||
|
||||
const checksumStream = createChecksumStream()
|
||||
const forwardError = error => {
|
||||
checksumStream.emit('error', error)
|
||||
}
|
||||
|
||||
const stream = await streamP
|
||||
stream.on('error', forwardError)
|
||||
checksumStream.pipe(stream)
|
||||
|
||||
// $FlowFixMe
|
||||
checksumStream.checksumWritten = checksumStream.checksum
|
||||
.then(value => this._outputFile(checksumFile(path), value, { flags: 'wx' }))
|
||||
.catch(forwardError)
|
||||
|
||||
return checksumStream
|
||||
}
|
||||
|
||||
createReadStream(
|
||||
file: File,
|
||||
{ checksum = false, ignoreMissingChecksum = false, ...options }: Object = {}
|
||||
@@ -209,14 +174,15 @@ export default class RemoteHandlerAbstract {
|
||||
|
||||
// write a stream to a file using a temporary file
|
||||
async outputStream(
|
||||
input: Readable | Promise<Readable>,
|
||||
path: string,
|
||||
{ checksum = true, dirMode }: { checksum?: boolean, dirMode?: number } = {}
|
||||
input: Readable | Promise<Readable>,
|
||||
{ checksum = true, dirMode, cancelToken = CancelToken.none }: { checksum?: boolean, dirMode?: number } = {}
|
||||
): Promise<void> {
|
||||
path = normalizePath(path)
|
||||
return this._outputStream(await input, normalizePath(path), {
|
||||
checksum,
|
||||
dirMode,
|
||||
cancelToken,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -477,13 +443,51 @@ export default class RemoteHandlerAbstract {
|
||||
return this._outputFile(file, data, { flags })
|
||||
}
|
||||
|
||||
async _outputStream(input: Readable, path: string, { checksum, dirMode }: { checksum?: boolean, dirMode?: number }) {
|
||||
async _createOutputStreamChecksum(file: File, { checksum = false, ...options }: Object = {}): Promise<LaxWritable> {
|
||||
if (typeof file === 'string') {
|
||||
file = normalizePath(file)
|
||||
}
|
||||
const path = typeof file === 'string' ? file : file.path
|
||||
const streamP = timeout.call(
|
||||
this._createOutputStream(file, {
|
||||
flags: 'wx',
|
||||
...options,
|
||||
}),
|
||||
this._timeout
|
||||
)
|
||||
|
||||
if (!checksum) {
|
||||
return streamP
|
||||
}
|
||||
|
||||
const checksumStream = createChecksumStream()
|
||||
const forwardError = error => {
|
||||
checksumStream.emit('error', error)
|
||||
}
|
||||
|
||||
const stream = await streamP
|
||||
stream.on('error', forwardError)
|
||||
checksumStream.pipe(stream)
|
||||
|
||||
// $FlowFixMe
|
||||
checksumStream.checksumWritten = checksumStream.checksum
|
||||
.then(value => this._outputFile(checksumFile(path), value, { flags: 'wx' }))
|
||||
.catch(forwardError)
|
||||
|
||||
return checksumStream
|
||||
}
|
||||
|
||||
async _outputStream(
|
||||
input: Readable,
|
||||
path: string,
|
||||
{ checksum, dirMode, cancelToken = CancelToken.none }: { checksum?: boolean, dirMode?: number }
|
||||
) {
|
||||
const tmpPath = `${dirname(path)}/.${basename(path)}`
|
||||
const output = await this.createOutputStream(tmpPath, {
|
||||
checksum,
|
||||
dirMode,
|
||||
})
|
||||
const output = await this._createOutputStreamChecksum(tmpPath, { checksum })
|
||||
try {
|
||||
cancelToken.promise.then(reason => {
|
||||
input.destroy(reason)
|
||||
})
|
||||
input.pipe(output)
|
||||
await fromEvent(output, 'finish')
|
||||
await output.checksumWritten
|
||||
|
||||
@@ -30,18 +30,6 @@ describe('closeFile()', () => {
|
||||
})
|
||||
})
|
||||
|
||||
describe('createOutputStream()', () => {
|
||||
it(`throws in case of timeout`, async () => {
|
||||
const testHandler = new TestHandler({
|
||||
createOutputStream: () => new Promise(() => {}),
|
||||
})
|
||||
|
||||
const promise = testHandler.createOutputStream('File')
|
||||
jest.advanceTimersByTime(TIMEOUT)
|
||||
await expect(promise).rejects.toThrowError(TimeoutError)
|
||||
})
|
||||
})
|
||||
|
||||
describe('getInfo()', () => {
|
||||
it('throws in case of timeout', async () => {
|
||||
const testHandler = new TestHandler({
|
||||
|
||||
@@ -3,11 +3,9 @@
|
||||
import 'dotenv/config'
|
||||
import asyncIteratorToStream from 'async-iterator-to-stream'
|
||||
import { forOwn, random } from 'lodash'
|
||||
import { fromCallback } from 'promise-toolbox'
|
||||
import { pipeline } from 'readable-stream'
|
||||
import { tmpdir } from 'os'
|
||||
|
||||
import { getHandler } from '.'
|
||||
import { getHandler } from './'
|
||||
|
||||
// https://gist.github.com/julien-f/3228c3f34fdac01ade09
|
||||
const unsecureRandomBytes = n => {
|
||||
@@ -82,10 +80,9 @@ handlers.forEach(url => {
|
||||
})
|
||||
})
|
||||
|
||||
describe('#createOutputStream()', () => {
|
||||
describe('#outputStream()', () => {
|
||||
it('creates parent dir if missing', async () => {
|
||||
const stream = await handler.createOutputStream('dir/file')
|
||||
await fromCallback(pipeline, createTestDataStream(), stream)
|
||||
await handler.outputStream('dir/file', createTestDataStream())
|
||||
await expect(await handler.readFile('dir/file')).toEqual(TEST_DATA)
|
||||
})
|
||||
})
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
import aws from '@sullux/aws-sdk'
|
||||
import assert from 'assert'
|
||||
import http from 'http'
|
||||
import { parse } from 'xo-remote-parser'
|
||||
|
||||
import RemoteHandlerAbstract from './abstract'
|
||||
import { createChecksumStream } from './checksum'
|
||||
import CancelToken from 'promise-toolbox/CancelToken'
|
||||
|
||||
// endpoints https://docs.aws.amazon.com/general/latest/gr/s3.html
|
||||
|
||||
@@ -13,12 +15,14 @@ const MAX_PART_SIZE = 1024 * 1024 * 1024 * 5 // 5GB
|
||||
const MAX_PARTS_COUNT = 10000
|
||||
const MAX_OBJECT_SIZE = 1024 * 1024 * 1024 * 1024 * 5 // 5TB
|
||||
const IDEAL_FRAGMENT_SIZE = Math.ceil(MAX_OBJECT_SIZE / MAX_PARTS_COUNT) // the smallest fragment size that still allows a 5TB upload in 10000 fragments, about 524MB
|
||||
|
||||
const USE_SSL = true
|
||||
export default class S3Handler extends RemoteHandlerAbstract {
|
||||
constructor(remote, _opts) {
|
||||
super(remote)
|
||||
const { host, path, username, password } = parse(remote.url)
|
||||
// https://www.zenko.io/blog/first-things-first-getting-started-scality-s3-server/
|
||||
this._s3 = aws({
|
||||
const params = {
|
||||
accessKeyId: username,
|
||||
apiVersion: '2006-03-01',
|
||||
endpoint: host,
|
||||
@@ -28,7 +32,12 @@ export default class S3Handler extends RemoteHandlerAbstract {
|
||||
httpOptions: {
|
||||
timeout: 600000,
|
||||
},
|
||||
}).s3
|
||||
}
|
||||
if (!USE_SSL) {
|
||||
params.httpOptions.agent = new http.Agent()
|
||||
params.sslEnabled = false
|
||||
}
|
||||
this._s3 = aws(params).s3
|
||||
|
||||
const splitPath = path.split('/').filter(s => s.length)
|
||||
this._bucket = splitPath.shift()
|
||||
@@ -43,7 +52,10 @@ export default class S3Handler extends RemoteHandlerAbstract {
|
||||
return { Bucket: this._bucket, Key: this._dir + file }
|
||||
}
|
||||
|
||||
async _outputStream(input, path, { checksum }) {
|
||||
async _outputStream(input, path, { checksum, cancelToken = CancelToken.none }) {
|
||||
cancelToken.promise.then(reason => {
|
||||
input.destroy(reason)
|
||||
})
|
||||
let inputStream = input
|
||||
if (checksum) {
|
||||
const checksumStream = createChecksumStream()
|
||||
@@ -266,4 +278,26 @@ export default class S3Handler extends RemoteHandlerAbstract {
|
||||
}
|
||||
|
||||
async _closeFile(fd) {}
|
||||
|
||||
// https://stackoverflow.com/a/48955582/72637
|
||||
async _rmtree(dir) {
|
||||
const listParams = {
|
||||
Bucket: this._bucket,
|
||||
Prefix: this._dir + dir,
|
||||
}
|
||||
let listedObjects = {}
|
||||
do {
|
||||
listedObjects = await this._s3.listObjectsV2({
|
||||
...listParams,
|
||||
ContinuationToken: listedObjects.NextContinuationToken,
|
||||
})
|
||||
if (listedObjects.Contents.length === 0) {
|
||||
return
|
||||
}
|
||||
await this._s3.deleteObjects({
|
||||
Bucket: this._bucket,
|
||||
Delete: { Objects: listedObjects.Contents.map(({ Key }) => ({ Key })) },
|
||||
})
|
||||
} while (listedObjects.IsTruncated)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,8 @@
|
||||
- [Backup/restore] Allow backup restore to any licence even if XOA isn't registered (PR [#5547](https://github.com/vatesfr/xen-orchestra/pull/5547))
|
||||
- [Import] Ignore case when detecting file type (PR [#5574](https://github.com/vatesfr/xen-orchestra/pull/5574))
|
||||
- [Backup] Ability to set a specific schedule to always run full backups [#5541](https://github.com/vatesfr/xen-orchestra/issues/5541) (PR [#5546](https://github.com/vatesfr/xen-orchestra/pull/5546))
|
||||
- [Proxy] Log VM backup restoration (PR [#5576](https://github.com/vatesfr/xen-orchestra/pull/5576))
|
||||
- [Backup/S3] Allow backup of metadata to Amazon Web Services S3 (PR [#5373](https://github.com/vatesfr/xen-orchestra/pull/5373))
|
||||
|
||||
### Bug fixes
|
||||
|
||||
@@ -20,6 +22,7 @@
|
||||
- [VM/Snapshot export] Fix `Error: no available place in queue` on canceling an export via browser then starting a new one when the concurrency threshold is reached [#5535](https://github.com/vatesfr/xen-orchestra/issues/5535) (PR [#5538](https://github.com/vatesfr/xen-orchestra/pull/5538))
|
||||
- [Servers] Hide pool's objects if its master is unreachable [#5475](https://github.com/vatesfr/xen-orchestra/issues/5475) (PR [#5526](https://github.com/vatesfr/xen-orchestra/pull/5526))
|
||||
- [Host] Restart toolstack: fix `ECONNREFUSED` error (PR [#5553](https://github.com/vatesfr/xen-orchestra/pull/5553))
|
||||
- [VM migration] Intra-pool: don't automatically select migration network if no default migration network is defined on the pool (PR [#5564](https://github.com/vatesfr/xen-orchestra/pull/5564))
|
||||
|
||||
### Packages to release
|
||||
|
||||
@@ -38,7 +41,8 @@
|
||||
>
|
||||
> In case of conflict, the highest (lowest in previous list) `$version` wins.
|
||||
|
||||
- @xen-orchestra/fs minor
|
||||
- @xen-orchestra/fs major
|
||||
- vhd-lib minor
|
||||
- xen-api patch
|
||||
- xo-common minor
|
||||
- xo-server minor
|
||||
|
||||
@@ -436,8 +436,8 @@ If you are using HA, maintenance mode will be required before doing any reboot o
|
||||
|
||||
Maintenance mode will trigger two actions internally:
|
||||
|
||||
* disabling the host (no new VMs could start on it)
|
||||
* evacuate VMs that can be evacuated ("agile" VMs, which could be live migrated)
|
||||
- disabling the host (no new VMs could start on it)
|
||||
- evacuate VMs that can be evacuated ("agile" VMs, which could be live migrated)
|
||||
|
||||
It's perfect if you want to shutdown the host for hardware replacement, or if you want to do some other operations without disrupting your production.
|
||||
|
||||
|
||||
@@ -82,8 +82,8 @@ member: 348
|
||||
|
||||
The plugin needs to know that Bruce Wayne belongs to the heroes group. To do so, you need to set 2 entries in the configuration:
|
||||
|
||||
- **Group attribute**, which is the name of the *group* attribute that is used to list users within a group. In this example, it would be `member`.
|
||||
- **User attribute**, which is the name of the *user* attribute that is used to reference users in groups. In this example, it would be `uid` since `347`, `348`, etc. are user `uid`s.
|
||||
- **Group attribute**, which is the name of the _group_ attribute that is used to list users within a group. In this example, it would be `member`.
|
||||
- **User attribute**, which is the name of the _user_ attribute that is used to reference users in groups. In this example, it would be `uid` since `347`, `348`, etc. are user `uid`s.
|
||||
|
||||
Save the configuration and you're good to go. From now on, every time an LDAP user logs into XO, the plugin will automatically create or update that user's groups and add them to those groups. If you need to import all the groups at once, you can do so from Settings > Groups > Synchronize LDAP Groups. This can be useful if you want to assign ACLs on groups without having to wait for a member of the group to log in.
|
||||
|
||||
|
||||
@@ -11,9 +11,9 @@ import { pFromCallback } from 'promise-toolbox'
|
||||
import { pipeline } from 'readable-stream'
|
||||
import { randomBytes } from 'crypto'
|
||||
|
||||
import Vhd, { chainVhd, createSyntheticStream, mergeVhd as vhdMerge } from './'
|
||||
import Vhd, { chainVhd, createSyntheticStream, mergeVhd as vhdMerge } from './index'
|
||||
|
||||
import { SECTOR_SIZE } from './src/_constants'
|
||||
import { SECTOR_SIZE } from './_constants'
|
||||
|
||||
let tempDir = null
|
||||
|
||||
@@ -8,7 +8,7 @@ import { pipeline } from 'readable-stream'
|
||||
|
||||
import { createReadableRawStream, createReadableSparseStream } from './'
|
||||
|
||||
import { createFooter } from './src/_createFooterHeader'
|
||||
import { createFooter } from './_createFooterHeader'
|
||||
|
||||
let tempDir = null
|
||||
|
||||
@@ -105,7 +105,7 @@ test('ReadableSparseVHDStream can handle a sparse file', async () => {
|
||||
const stream = await createReadableSparseStream(
|
||||
fileSize,
|
||||
blockSize,
|
||||
blocks.map(b => b.logicalAddressBytes),
|
||||
blocks.map(b => b.logicalAddressBytes / blockSize),
|
||||
blocks
|
||||
)
|
||||
expect(stream.length).toEqual(4197888)
|
||||
@@ -178,8 +178,8 @@ export default class Vhd {
|
||||
}
|
||||
|
||||
// return the first sector (bitmap) of a block
|
||||
_getBatEntry(block) {
|
||||
const i = block * 4
|
||||
_getBatEntry(blockId) {
|
||||
const i = blockId * 4
|
||||
const { blockTable } = this
|
||||
return i < blockTable.length ? blockTable.readUInt32BE(i) : BLOCK_UNUSED
|
||||
}
|
||||
|
||||
@@ -85,7 +85,7 @@ When logical volume no longer necessary:
|
||||
|
||||
## Mount block device
|
||||
|
||||
> Tip: `offset` and `sizelimit` are only required on a partionned disk
|
||||
> Tip: `offset` and `sizelimit` are only required on a partionned disk
|
||||
|
||||
```
|
||||
> mkdir /tmp/block-mount
|
||||
|
||||
@@ -12,6 +12,7 @@ import { PassThrough } from 'stream'
|
||||
import { AssertionError } from 'assert'
|
||||
import { basename, dirname } from 'path'
|
||||
import { decorateWith } from '@vates/decorate-with'
|
||||
import { invalidParameters } from 'xo-common/api-errors'
|
||||
import { isValidXva } from '@xen-orchestra/backups/isValidXva'
|
||||
import { parseDuration } from '@vates/parse-duration'
|
||||
import {
|
||||
@@ -29,7 +30,7 @@ import {
|
||||
sum,
|
||||
values,
|
||||
} from 'lodash'
|
||||
import { CancelToken, ignoreErrors, pFinally, timeout } from 'promise-toolbox'
|
||||
import { CancelToken, ignoreErrors, timeout } from 'promise-toolbox'
|
||||
import Vhd, { chainVhd, checkVhdChain, createSyntheticStream as createVhdReadStream } from 'vhd-lib'
|
||||
|
||||
import type Logger from '../logs/loggers/abstract'
|
||||
@@ -631,8 +632,7 @@ export default class BackupNg {
|
||||
}
|
||||
return
|
||||
} catch (error) {
|
||||
// XO API invalid parameters error
|
||||
if (error.code === 10) {
|
||||
if (invalidParameters.is(error)) {
|
||||
delete params.streamLogs
|
||||
return app.callProxyMethod(proxyId, 'backup.run', params)
|
||||
}
|
||||
@@ -820,53 +820,102 @@ export default class BackupNg {
|
||||
|
||||
const { metadataFilename, remoteId } = parseVmBackupId(id)
|
||||
const { proxy, url, options } = await app.getRemoteWithCredentials(remoteId)
|
||||
if (proxy !== undefined) {
|
||||
const { allowUnauthorized, host, password, username } = await app.getXenServer(app.getXenServerIdByObject(sr.$id))
|
||||
return app.callProxyMethod(proxy, 'backup.importVmBackup', {
|
||||
backupId: metadataFilename,
|
||||
remote: {
|
||||
url,
|
||||
options,
|
||||
},
|
||||
srUuid: sr.uuid,
|
||||
xapi: {
|
||||
allowUnauthorized,
|
||||
credentials: {
|
||||
username,
|
||||
password,
|
||||
},
|
||||
url: host,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
const handler = await app.getRemoteHandler(remoteId)
|
||||
const metadata: Metadata = JSON.parse(String(await handler.readFile(metadataFilename)))
|
||||
|
||||
const importer = importers[metadata.mode]
|
||||
if (importer === undefined) {
|
||||
throw new Error(`no importer for backup mode ${metadata.mode}`)
|
||||
}
|
||||
|
||||
const { jobId, timestamp: time } = metadata
|
||||
let rootTaskId
|
||||
const logger = this._logger
|
||||
return wrapTaskFn(
|
||||
{
|
||||
data: {
|
||||
jobId,
|
||||
srId,
|
||||
time,
|
||||
},
|
||||
logger,
|
||||
message: 'restore',
|
||||
},
|
||||
taskId => {
|
||||
this._runningRestores.add(taskId)
|
||||
return importer(handler, metadataFilename, metadata, xapi, sr, taskId, logger)::pFinally(() => {
|
||||
this._runningRestores.delete(taskId)
|
||||
})
|
||||
try {
|
||||
if (proxy !== undefined) {
|
||||
const { allowUnauthorized, host, password, username } = await app.getXenServer(
|
||||
app.getXenServerIdByObject(sr.$id)
|
||||
)
|
||||
|
||||
const params = {
|
||||
backupId: metadataFilename,
|
||||
remote: {
|
||||
url,
|
||||
options,
|
||||
},
|
||||
srUuid: sr.uuid,
|
||||
streamLogs: true,
|
||||
xapi: {
|
||||
allowUnauthorized,
|
||||
credentials: {
|
||||
username,
|
||||
password,
|
||||
},
|
||||
url: host,
|
||||
},
|
||||
}
|
||||
|
||||
try {
|
||||
const logsStream = await app.callProxyMethod(proxy, 'backup.importVmBackup', params, {
|
||||
assertType: 'iterator',
|
||||
})
|
||||
|
||||
const localTaskIds = { __proto__: null }
|
||||
for await (const log of logsStream) {
|
||||
const { event, message, taskId } = log
|
||||
|
||||
const common = {
|
||||
data: log.data,
|
||||
event: 'task.' + event,
|
||||
result: log.result,
|
||||
status: log.status,
|
||||
}
|
||||
|
||||
if (event === 'start') {
|
||||
const { parentId } = log
|
||||
if (parentId === undefined) {
|
||||
rootTaskId = localTaskIds[taskId] = logger.notice(message, common)
|
||||
this._runningRestores.add(rootTaskId)
|
||||
} else {
|
||||
common.parentId = localTaskIds[parentId]
|
||||
localTaskIds[taskId] = logger.notice(message, common)
|
||||
}
|
||||
} else {
|
||||
common.taskId = localTaskIds[taskId]
|
||||
logger.notice(message, common)
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
} catch (error) {
|
||||
if (invalidParameters.is(error)) {
|
||||
delete params.streamLogs
|
||||
return app.callProxyMethod(proxy, 'backup.importVmBackup', params)
|
||||
}
|
||||
throw error
|
||||
}
|
||||
}
|
||||
)()
|
||||
|
||||
const handler = await app.getRemoteHandler(remoteId)
|
||||
const metadata: Metadata = JSON.parse(String(await handler.readFile(metadataFilename)))
|
||||
|
||||
const importer = importers[metadata.mode]
|
||||
if (importer === undefined) {
|
||||
throw new Error(`no importer for backup mode ${metadata.mode}`)
|
||||
}
|
||||
|
||||
const { jobId, timestamp: time } = metadata
|
||||
return wrapTaskFn(
|
||||
{
|
||||
data: {
|
||||
jobId,
|
||||
srId,
|
||||
time,
|
||||
},
|
||||
logger,
|
||||
message: 'restore',
|
||||
},
|
||||
taskId => {
|
||||
rootTaskId = taskId
|
||||
this._runningRestores.add(taskId)
|
||||
return importer(handler, metadataFilename, metadata, xapi, sr, taskId, logger)
|
||||
}
|
||||
)()
|
||||
} finally {
|
||||
this._runningRestores.delete(rootTaskId)
|
||||
}
|
||||
}
|
||||
|
||||
@decorateWith(
|
||||
@@ -1325,7 +1374,7 @@ export default class BackupNg {
|
||||
parentId: taskId,
|
||||
result: () => ({ size: xva.size }),
|
||||
},
|
||||
handler.outputStream(fork, dataFilename, {
|
||||
handler.outputStream(dataFilename, fork, {
|
||||
dirMode,
|
||||
})
|
||||
)
|
||||
@@ -1663,7 +1712,7 @@ export default class BackupNg {
|
||||
}
|
||||
|
||||
// FIXME: should only be renamed after the metadata file has been written
|
||||
await handler.outputStream(fork.streams[`${id}.vhd`](), path, {
|
||||
await handler.outputStream(path, fork.streams[`${id}.vhd`](), {
|
||||
// no checksum for VHDs, because they will be invalidated by
|
||||
// merges and chainings
|
||||
checksum: false,
|
||||
|
||||
@@ -548,11 +548,7 @@ export default class {
|
||||
const sizeStream = createSizeStream()
|
||||
|
||||
try {
|
||||
const targetStream = await handler.createOutputStream(backupFullPath)
|
||||
|
||||
stream.on('error', error => targetStream.emit('error', error))
|
||||
|
||||
await Promise.all([fromEvent(stream.pipe(sizeStream).pipe(targetStream), 'finish'), stream.task])
|
||||
await Promise.all([handler.outputStream(backupFullPath, sizeStream), stream.task])
|
||||
} catch (error) {
|
||||
// Remove new backup. (corrupt).
|
||||
await handler.unlink(backupFullPath)::ignoreErrors()
|
||||
@@ -782,9 +778,7 @@ export default class {
|
||||
|
||||
@deferrable
|
||||
async _backupVm($defer, vm, handler, file, { compress }) {
|
||||
const targetStream = await handler.createOutputStream(file)
|
||||
$defer.onFailure.call(handler, 'unlink', file)
|
||||
$defer.onFailure.call(targetStream, 'close')
|
||||
|
||||
const sourceStream = await this._xo.getXapi(vm).exportVm(vm._xapiId, {
|
||||
compress,
|
||||
@@ -792,9 +786,9 @@ export default class {
|
||||
|
||||
const sizeStream = createSizeStream()
|
||||
|
||||
sourceStream.pipe(sizeStream).pipe(targetStream)
|
||||
sourceStream.pipe(sizeStream)
|
||||
|
||||
await Promise.all([sourceStream.task, fromEvent(targetStream, 'finish')])
|
||||
await Promise.all([sourceStream.task, handler.outputStream(file, sizeStream)])
|
||||
|
||||
return {
|
||||
transferSize: sizeStream.size,
|
||||
|
||||
@@ -278,6 +278,7 @@ export default class Jobs {
|
||||
method: 'backupNg.runJob',
|
||||
params: {
|
||||
id: job.id,
|
||||
proxy: job.proxy,
|
||||
schedule: schedule?.id,
|
||||
settings: job.settings,
|
||||
vms: job.vms,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
// @flow
|
||||
import asyncMap from '@xen-orchestra/async-map'
|
||||
import createLogger from '@xen-orchestra/log'
|
||||
import { fromEvent, ignoreErrors, timeout } from 'promise-toolbox'
|
||||
import { ignoreErrors, timeout } from 'promise-toolbox'
|
||||
import { parseDuration } from '@vates/parse-duration'
|
||||
|
||||
import { debounceWithKey, REMOVE_CACHE_ENTRY } from '../_pDebounceWithKey'
|
||||
@@ -304,20 +304,13 @@ export default class metadataBackup {
|
||||
}
|
||||
)
|
||||
|
||||
let outputStream
|
||||
try {
|
||||
const { dirMode } = this._backupOptions
|
||||
await waitAll([
|
||||
(async () => {
|
||||
outputStream = await handler.createOutputStream(fileName, {
|
||||
dirMode,
|
||||
})
|
||||
|
||||
// 'readable-stream/pipeline' not call the callback when an error throws
|
||||
// from the readable stream
|
||||
stream.pipe(outputStream)
|
||||
return timeout.call(
|
||||
fromEvent(stream, 'end').catch(error => {
|
||||
handler.outputStream(fileName, stream, { cancelToken }).catch(error => {
|
||||
stream.destroy()
|
||||
if (error.message !== 'aborted') {
|
||||
throw error
|
||||
}
|
||||
@@ -353,9 +346,6 @@ export default class metadataBackup {
|
||||
|
||||
this._listPoolMetadataBackups(REMOVE_CACHE_ENTRY, remoteId)
|
||||
} catch (error) {
|
||||
if (outputStream !== undefined) {
|
||||
outputStream.destroy()
|
||||
}
|
||||
await handler.rmtree(dir).catch(error => {
|
||||
logger.warning(`unable to delete the folder ${dir}`, {
|
||||
event: 'task.warning',
|
||||
|
||||
@@ -24,6 +24,15 @@ afterEach(async () => {
|
||||
await pFromCallback(cb => rimraf(tmpDir, cb))
|
||||
})
|
||||
|
||||
function bufferToArray(buffer) {
|
||||
const view = new DataView(buffer)
|
||||
const res = []
|
||||
for (let i = 0; i < buffer.byteLength; i += 4) {
|
||||
res.push(view.getUint32(i, true))
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
function createFileAccessor(file) {
|
||||
return async (start, end) => {
|
||||
if (start < 0 || end < 0) {
|
||||
@@ -52,7 +61,11 @@ test('VMDK to VHD can convert a random data file with VMDKDirectParser', async (
|
||||
})
|
||||
const result = await readVmdkGrainTable(createFileAccessor(vmdkFileName))
|
||||
const pipe = (
|
||||
await vmdkToVhd(createReadStream(vmdkFileName), result.grainLogicalAddressList, result.grainFileOffsetList)
|
||||
await vmdkToVhd(
|
||||
createReadStream(vmdkFileName),
|
||||
bufferToArray(result.grainLogicalAddressList),
|
||||
bufferToArray(result.grainFileOffsetList)
|
||||
)
|
||||
).pipe(createWriteStream(vhdFileName))
|
||||
await eventToPromise(pipe, 'finish')
|
||||
await execa('vhd-util', ['check', '-p', '-b', '-t', '-n', vhdFileName])
|
||||
|
||||
@@ -163,7 +163,7 @@ export default class MigrateVmModalBody extends BaseComponent {
|
||||
host,
|
||||
intraPool,
|
||||
mapVifsNetworks: undefined,
|
||||
migrationNetworkId: getDefaultMigrationNetwork(host, pools, pifs),
|
||||
migrationNetworkId: getDefaultMigrationNetwork(intraPool, host, pools, pifs),
|
||||
targetSrs: {},
|
||||
})
|
||||
return
|
||||
@@ -190,7 +190,7 @@ export default class MigrateVmModalBody extends BaseComponent {
|
||||
host,
|
||||
intraPool,
|
||||
mapVifsNetworks: defaultNetworksForVif,
|
||||
migrationNetworkId: getDefaultMigrationNetwork(host, pools, pifs),
|
||||
migrationNetworkId: getDefaultMigrationNetwork(intraPool, host, pools, pifs),
|
||||
targetSrs: { mainSr: pools[host.$pool].default_SR },
|
||||
})
|
||||
}
|
||||
|
||||
@@ -180,11 +180,11 @@ export default class MigrateVmsModalBody extends BaseComponent {
|
||||
return
|
||||
}
|
||||
const { pools, pifs } = this.props
|
||||
const defaultMigrationNetworkId = getDefaultMigrationNetwork(host, pools, pifs)
|
||||
const intraPool = every(this.props.vms, vm => vm.$pool === host.$pool)
|
||||
const defaultMigrationNetworkId = getDefaultMigrationNetwork(intraPool, host, pools, pifs)
|
||||
const defaultSrId = pools[host.$pool].default_SR
|
||||
const defaultSrConnectedToHost = some(host.$PBDs, pbd => this._getObject(pbd).SR === defaultSrId)
|
||||
|
||||
const intraPool = every(this.props.vms, vm => vm.$pool === host.$pool)
|
||||
const doNotMigrateVmVdis = {}
|
||||
const doNotMigrateVdi = {}
|
||||
let noVdisMigration = false
|
||||
|
||||
@@ -24,7 +24,7 @@ export const getDefaultNetworkForVif = (vif, destHost, pifs, networks) => {
|
||||
return destNetworkId
|
||||
}
|
||||
|
||||
export const getDefaultMigrationNetwork = (destHost, pools, pifs) => {
|
||||
export const getDefaultMigrationNetwork = (intraPool, destHost, pools, pifs) => {
|
||||
const migrationNetwork = pools[destHost.$pool].otherConfig['xo:migrationNetwork']
|
||||
let defaultPif
|
||||
return defined(
|
||||
@@ -38,6 +38,6 @@ export const getDefaultMigrationNetwork = (destHost, pools, pifs) => {
|
||||
}
|
||||
}
|
||||
}),
|
||||
defaultPif
|
||||
intraPool ? {} : defaultPif
|
||||
).$network
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user