Compare commits
1 Commits
lite/xo-un
...
lite/neste
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
677a9c958c |
@@ -68,11 +68,6 @@ module.exports = {
|
||||
|
||||
'no-console': ['error', { allow: ['warn', 'error'] }],
|
||||
|
||||
// this rule can prevent race condition bugs like parallel `a += await foo()`
|
||||
//
|
||||
// as it has a lots of false positive, it is only enabled as a warning for now
|
||||
'require-atomic-updates': 'warn',
|
||||
|
||||
strict: 'error',
|
||||
},
|
||||
}
|
||||
|
||||
@@ -1,3 +0,0 @@
|
||||
@xen-orchestra/web
|
||||
@xen-orchestra/web-core
|
||||
@xen-orchestra/web-lite
|
||||
@@ -1,11 +1,8 @@
|
||||
'use strict'
|
||||
|
||||
module.exports = {
|
||||
arrowParens: 'avoid',
|
||||
jsxSingleQuote: true,
|
||||
semi: false,
|
||||
singleQuote: true,
|
||||
trailingComma: 'es5',
|
||||
|
||||
// 2020-11-24: Requested by nraynaud and approved by the rest of the team
|
||||
//
|
||||
|
||||
@@ -33,7 +33,7 @@
|
||||
"test": "node--test"
|
||||
},
|
||||
"devDependencies": {
|
||||
"sinon": "^17.0.1",
|
||||
"sinon": "^15.0.1",
|
||||
"tap": "^16.3.0",
|
||||
"test": "^3.2.1"
|
||||
}
|
||||
|
||||
@@ -13,15 +13,12 @@ describe('decorateWith', () => {
|
||||
const expectedFn = Function.prototype
|
||||
const newFn = () => {}
|
||||
|
||||
const decorator = decorateWith(
|
||||
function wrapper(fn, ...args) {
|
||||
assert.deepStrictEqual(fn, expectedFn)
|
||||
assert.deepStrictEqual(args, expectedArgs)
|
||||
const decorator = decorateWith(function wrapper(fn, ...args) {
|
||||
assert.deepStrictEqual(fn, expectedFn)
|
||||
assert.deepStrictEqual(args, expectedArgs)
|
||||
|
||||
return newFn
|
||||
},
|
||||
...expectedArgs
|
||||
)
|
||||
return newFn
|
||||
}, ...expectedArgs)
|
||||
|
||||
const descriptor = {
|
||||
configurable: true,
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
"url": "https://vates.fr"
|
||||
},
|
||||
"license": "ISC",
|
||||
"version": "0.1.5",
|
||||
"version": "0.1.4",
|
||||
"engines": {
|
||||
"node": ">=8.10"
|
||||
},
|
||||
@@ -23,13 +23,13 @@
|
||||
"test": "node--test"
|
||||
},
|
||||
"dependencies": {
|
||||
"@vates/multi-key-map": "^0.2.0",
|
||||
"@vates/multi-key-map": "^0.1.0",
|
||||
"@xen-orchestra/async-map": "^0.1.2",
|
||||
"@xen-orchestra/log": "^0.6.0",
|
||||
"ensure-array": "^1.0.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"sinon": "^17.0.1",
|
||||
"sinon": "^15.0.1",
|
||||
"test": "^3.2.1"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
import LRU from 'lru-cache'
|
||||
import Fuse from 'fuse-native'
|
||||
import { VhdSynthetic } from 'vhd-lib'
|
||||
import { Disposable, fromCallback } from 'promise-toolbox'
|
||||
'use strict'
|
||||
|
||||
const LRU = require('lru-cache')
|
||||
const Fuse = require('fuse-native')
|
||||
const { VhdSynthetic } = require('vhd-lib')
|
||||
const { Disposable, fromCallback } = require('promise-toolbox')
|
||||
|
||||
// build a s stat object from https://github.com/fuse-friends/fuse-native/blob/master/test/fixtures/stat.js
|
||||
const stat = st => ({
|
||||
@@ -14,7 +16,7 @@ const stat = st => ({
|
||||
gid: st.gid !== undefined ? st.gid : process.getgid(),
|
||||
})
|
||||
|
||||
export const mount = Disposable.factory(async function* mount(handler, diskPath, mountDir) {
|
||||
exports.mount = Disposable.factory(async function* mount(handler, diskPath, mountDir) {
|
||||
const vhd = yield VhdSynthetic.fromVhdChain(handler, diskPath)
|
||||
|
||||
const cache = new LRU({
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@vates/fuse-vhd",
|
||||
"version": "2.0.0",
|
||||
"version": "1.0.0",
|
||||
"license": "ISC",
|
||||
"private": false,
|
||||
"homepage": "https://github.com/vatesfr/xen-orchestra/tree/master/@vates/fuse-vhd",
|
||||
@@ -15,14 +15,13 @@
|
||||
"url": "https://vates.fr"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=14"
|
||||
"node": ">=10.0"
|
||||
},
|
||||
"main": "./index.mjs",
|
||||
"dependencies": {
|
||||
"fuse-native": "^2.2.6",
|
||||
"lru-cache": "^7.14.0",
|
||||
"promise-toolbox": "^0.21.0",
|
||||
"vhd-lib": "^4.8.0"
|
||||
"vhd-lib": "^4.5.0"
|
||||
},
|
||||
"scripts": {
|
||||
"postversion": "npm publish --access public"
|
||||
|
||||
@@ -17,14 +17,4 @@ map.get(['foo', 'bar']) // 2
|
||||
map.get(['bar', 'foo']) // 3
|
||||
map.get([OBJ]) // 4
|
||||
map.get([{}]) // undefined
|
||||
|
||||
map.delete([])
|
||||
|
||||
for (const [key, value] of map.entries() {
|
||||
console.log(key, value)
|
||||
}
|
||||
|
||||
for (const value of map.values()) {
|
||||
console.log(value)
|
||||
}
|
||||
```
|
||||
|
||||
@@ -35,16 +35,6 @@ map.get(['foo', 'bar']) // 2
|
||||
map.get(['bar', 'foo']) // 3
|
||||
map.get([OBJ]) // 4
|
||||
map.get([{}]) // undefined
|
||||
|
||||
map.delete([])
|
||||
|
||||
for (const [key, value] of map.entries() {
|
||||
console.log(key, value)
|
||||
}
|
||||
|
||||
for (const value of map.values()) {
|
||||
console.log(value)
|
||||
}
|
||||
```
|
||||
|
||||
## Contributions
|
||||
|
||||
@@ -36,31 +36,14 @@ function del(node, i, keys) {
|
||||
return node
|
||||
}
|
||||
|
||||
function* entries(node, key) {
|
||||
if (node !== undefined) {
|
||||
if (node instanceof Node) {
|
||||
const { value } = node
|
||||
if (value !== undefined) {
|
||||
yield [key, node.value]
|
||||
}
|
||||
|
||||
for (const [childKey, child] of node.children.entries()) {
|
||||
yield* entries(child, key.concat(childKey))
|
||||
}
|
||||
} else {
|
||||
yield [key, node]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function get(node, i, keys) {
|
||||
return i === keys.length
|
||||
? node instanceof Node
|
||||
? node.value
|
||||
: node
|
||||
: node instanceof Node
|
||||
? get(node.children.get(keys[i]), i + 1, keys)
|
||||
: undefined
|
||||
? get(node.children.get(keys[i]), i + 1, keys)
|
||||
: undefined
|
||||
}
|
||||
|
||||
function set(node, i, keys, value) {
|
||||
@@ -86,22 +69,6 @@ function set(node, i, keys, value) {
|
||||
return node
|
||||
}
|
||||
|
||||
function* values(node) {
|
||||
if (node !== undefined) {
|
||||
if (node instanceof Node) {
|
||||
const { value } = node
|
||||
if (value !== undefined) {
|
||||
yield node.value
|
||||
}
|
||||
for (const child of node.children.values()) {
|
||||
yield* values(child)
|
||||
}
|
||||
} else {
|
||||
yield node
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
exports.MultiKeyMap = class MultiKeyMap {
|
||||
constructor() {
|
||||
// each node is either a value or a Node if it contains children
|
||||
@@ -112,10 +79,6 @@ exports.MultiKeyMap = class MultiKeyMap {
|
||||
this._root = del(this._root, 0, keys)
|
||||
}
|
||||
|
||||
entries() {
|
||||
return entries(this._root, [])
|
||||
}
|
||||
|
||||
get(keys) {
|
||||
return get(this._root, 0, keys)
|
||||
}
|
||||
@@ -123,8 +86,4 @@ exports.MultiKeyMap = class MultiKeyMap {
|
||||
set(keys, value) {
|
||||
this._root = set(this._root, 0, keys, value)
|
||||
}
|
||||
|
||||
values() {
|
||||
return values(this._root)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ describe('MultiKeyMap', () => {
|
||||
// reverse composite key
|
||||
['bar', 'foo'],
|
||||
]
|
||||
const values = keys.map(() => Math.random())
|
||||
const values = keys.map(() => ({}))
|
||||
|
||||
// set all values first to make sure they are all stored and not only the
|
||||
// last one
|
||||
@@ -27,12 +27,6 @@ describe('MultiKeyMap', () => {
|
||||
map.set(key, values[i])
|
||||
})
|
||||
|
||||
assert.deepEqual(
|
||||
Array.from(map.entries()),
|
||||
keys.map((key, i) => [key, values[i]])
|
||||
)
|
||||
assert.deepEqual(Array.from(map.values()), values)
|
||||
|
||||
keys.forEach((key, i) => {
|
||||
// copy the key to make sure the array itself is not the key
|
||||
assert.strictEqual(map.get(key.slice()), values[i])
|
||||
|
||||
@@ -18,7 +18,7 @@
|
||||
"url": "https://vates.fr"
|
||||
},
|
||||
"license": "ISC",
|
||||
"version": "0.2.0",
|
||||
"version": "0.1.0",
|
||||
"engines": {
|
||||
"node": ">=8.10"
|
||||
},
|
||||
|
||||
42
@vates/nbd-client/constants.js
Normal file
42
@vates/nbd-client/constants.js
Normal file
@@ -0,0 +1,42 @@
|
||||
'use strict'
|
||||
exports.INIT_PASSWD = Buffer.from('NBDMAGIC') // "NBDMAGIC" ensure we're connected to a nbd server
|
||||
exports.OPTS_MAGIC = Buffer.from('IHAVEOPT') // "IHAVEOPT" start an option block
|
||||
exports.NBD_OPT_REPLY_MAGIC = 1100100111001001n // magic received during negociation
|
||||
exports.NBD_OPT_EXPORT_NAME = 1
|
||||
exports.NBD_OPT_ABORT = 2
|
||||
exports.NBD_OPT_LIST = 3
|
||||
exports.NBD_OPT_STARTTLS = 5
|
||||
exports.NBD_OPT_INFO = 6
|
||||
exports.NBD_OPT_GO = 7
|
||||
|
||||
exports.NBD_FLAG_HAS_FLAGS = 1 << 0
|
||||
exports.NBD_FLAG_READ_ONLY = 1 << 1
|
||||
exports.NBD_FLAG_SEND_FLUSH = 1 << 2
|
||||
exports.NBD_FLAG_SEND_FUA = 1 << 3
|
||||
exports.NBD_FLAG_ROTATIONAL = 1 << 4
|
||||
exports.NBD_FLAG_SEND_TRIM = 1 << 5
|
||||
|
||||
exports.NBD_FLAG_FIXED_NEWSTYLE = 1 << 0
|
||||
|
||||
exports.NBD_CMD_FLAG_FUA = 1 << 0
|
||||
exports.NBD_CMD_FLAG_NO_HOLE = 1 << 1
|
||||
exports.NBD_CMD_FLAG_DF = 1 << 2
|
||||
exports.NBD_CMD_FLAG_REQ_ONE = 1 << 3
|
||||
exports.NBD_CMD_FLAG_FAST_ZERO = 1 << 4
|
||||
|
||||
exports.NBD_CMD_READ = 0
|
||||
exports.NBD_CMD_WRITE = 1
|
||||
exports.NBD_CMD_DISC = 2
|
||||
exports.NBD_CMD_FLUSH = 3
|
||||
exports.NBD_CMD_TRIM = 4
|
||||
exports.NBD_CMD_CACHE = 5
|
||||
exports.NBD_CMD_WRITE_ZEROES = 6
|
||||
exports.NBD_CMD_BLOCK_STATUS = 7
|
||||
exports.NBD_CMD_RESIZE = 8
|
||||
|
||||
exports.NBD_REQUEST_MAGIC = 0x25609513 // magic number to create a new NBD request to send to the server
|
||||
exports.NBD_REPLY_MAGIC = 0x67446698 // magic number received from the server when reading response to a nbd request
|
||||
exports.NBD_REPLY_ACK = 1
|
||||
|
||||
exports.NBD_DEFAULT_PORT = 10809
|
||||
exports.NBD_DEFAULT_BLOCK_SIZE = 64 * 1024
|
||||
@@ -1,41 +0,0 @@
|
||||
export const INIT_PASSWD = Buffer.from('NBDMAGIC') // "NBDMAGIC" ensure we're connected to a nbd server
|
||||
export const OPTS_MAGIC = Buffer.from('IHAVEOPT') // "IHAVEOPT" start an option block
|
||||
export const NBD_OPT_REPLY_MAGIC = 1100100111001001n // magic received during negociation
|
||||
export const NBD_OPT_EXPORT_NAME = 1
|
||||
export const NBD_OPT_ABORT = 2
|
||||
export const NBD_OPT_LIST = 3
|
||||
export const NBD_OPT_STARTTLS = 5
|
||||
export const NBD_OPT_INFO = 6
|
||||
export const NBD_OPT_GO = 7
|
||||
|
||||
export const NBD_FLAG_HAS_FLAGS = 1 << 0
|
||||
export const NBD_FLAG_READ_ONLY = 1 << 1
|
||||
export const NBD_FLAG_SEND_FLUSH = 1 << 2
|
||||
export const NBD_FLAG_SEND_FUA = 1 << 3
|
||||
export const NBD_FLAG_ROTATIONAL = 1 << 4
|
||||
export const NBD_FLAG_SEND_TRIM = 1 << 5
|
||||
|
||||
export const NBD_FLAG_FIXED_NEWSTYLE = 1 << 0
|
||||
|
||||
export const NBD_CMD_FLAG_FUA = 1 << 0
|
||||
export const NBD_CMD_FLAG_NO_HOLE = 1 << 1
|
||||
export const NBD_CMD_FLAG_DF = 1 << 2
|
||||
export const NBD_CMD_FLAG_REQ_ONE = 1 << 3
|
||||
export const NBD_CMD_FLAG_FAST_ZERO = 1 << 4
|
||||
|
||||
export const NBD_CMD_READ = 0
|
||||
export const NBD_CMD_WRITE = 1
|
||||
export const NBD_CMD_DISC = 2
|
||||
export const NBD_CMD_FLUSH = 3
|
||||
export const NBD_CMD_TRIM = 4
|
||||
export const NBD_CMD_CACHE = 5
|
||||
export const NBD_CMD_WRITE_ZEROES = 6
|
||||
export const NBD_CMD_BLOCK_STATUS = 7
|
||||
export const NBD_CMD_RESIZE = 8
|
||||
|
||||
export const NBD_REQUEST_MAGIC = 0x25609513 // magic number to create a new NBD request to send to the server
|
||||
export const NBD_REPLY_MAGIC = 0x67446698 // magic number received from the server when reading response to a nbd request
|
||||
export const NBD_REPLY_ACK = 1
|
||||
|
||||
export const NBD_DEFAULT_PORT = 10809
|
||||
export const NBD_DEFAULT_BLOCK_SIZE = 64 * 1024
|
||||
@@ -1,10 +1,8 @@
|
||||
import assert from 'node:assert'
|
||||
import { Socket } from 'node:net'
|
||||
import { connect } from 'node:tls'
|
||||
import { fromCallback, pRetry, pDelay, pTimeout, pFromCallback } from 'promise-toolbox'
|
||||
import { readChunkStrict } from '@vates/read-chunk'
|
||||
import { createLogger } from '@xen-orchestra/log'
|
||||
import {
|
||||
'use strict'
|
||||
const assert = require('node:assert')
|
||||
const { Socket } = require('node:net')
|
||||
const { connect } = require('node:tls')
|
||||
const {
|
||||
INIT_PASSWD,
|
||||
NBD_CMD_READ,
|
||||
NBD_DEFAULT_BLOCK_SIZE,
|
||||
@@ -19,12 +17,16 @@ import {
|
||||
NBD_REQUEST_MAGIC,
|
||||
OPTS_MAGIC,
|
||||
NBD_CMD_DISC,
|
||||
} from './constants.mjs'
|
||||
} = require('./constants.js')
|
||||
const { fromCallback, pRetry, pDelay, pTimeout } = require('promise-toolbox')
|
||||
const { readChunkStrict } = require('@vates/read-chunk')
|
||||
const { createLogger } = require('@xen-orchestra/log')
|
||||
|
||||
const { warn } = createLogger('vates:nbd-client')
|
||||
|
||||
// documentation is here : https://github.com/NetworkBlockDevice/nbd/blob/master/doc/proto.md
|
||||
|
||||
export default class NbdClient {
|
||||
module.exports = class NbdClient {
|
||||
#serverAddress
|
||||
#serverCert
|
||||
#serverPort
|
||||
@@ -38,7 +40,6 @@ export default class NbdClient {
|
||||
#readBlockRetries
|
||||
#reconnectRetry
|
||||
#connectTimeout
|
||||
#messageTimeout
|
||||
|
||||
// AFAIK, there is no guaranty the server answers in the same order as the queries
|
||||
// so we handle a backlog of command waiting for response and handle concurrency manually
|
||||
@@ -51,14 +52,7 @@ export default class NbdClient {
|
||||
#reconnectingPromise
|
||||
constructor(
|
||||
{ address, port = NBD_DEFAULT_PORT, exportname, cert },
|
||||
{
|
||||
connectTimeout = 6e4,
|
||||
messageTimeout = 6e4,
|
||||
waitBeforeReconnect = 1e3,
|
||||
readAhead = 10,
|
||||
readBlockRetries = 5,
|
||||
reconnectRetry = 5,
|
||||
} = {}
|
||||
{ connectTimeout = 6e4, waitBeforeReconnect = 1e3, readAhead = 10, readBlockRetries = 5, reconnectRetry = 5 } = {}
|
||||
) {
|
||||
this.#serverAddress = address
|
||||
this.#serverPort = port
|
||||
@@ -69,7 +63,6 @@ export default class NbdClient {
|
||||
this.#readBlockRetries = readBlockRetries
|
||||
this.#reconnectRetry = reconnectRetry
|
||||
this.#connectTimeout = connectTimeout
|
||||
this.#messageTimeout = messageTimeout
|
||||
}
|
||||
|
||||
get exportSize() {
|
||||
@@ -122,27 +115,13 @@ export default class NbdClient {
|
||||
if (!this.#connected) {
|
||||
return
|
||||
}
|
||||
this.#connected = false
|
||||
const socket = this.#serverSocket
|
||||
|
||||
const queryId = this.#nextCommandQueryId
|
||||
this.#nextCommandQueryId++
|
||||
|
||||
const buffer = Buffer.alloc(28)
|
||||
buffer.writeInt32BE(NBD_REQUEST_MAGIC, 0) // it is a nbd request
|
||||
buffer.writeInt16BE(0, 4) // no command flags for a disconnect
|
||||
buffer.writeInt16BE(NBD_CMD_DISC, 6) // we want to disconnect from nbd server
|
||||
buffer.writeBigUInt64BE(queryId, 8)
|
||||
buffer.writeBigUInt64BE(0n, 16)
|
||||
buffer.writeInt32BE(0, 24)
|
||||
const promise = pFromCallback(cb => {
|
||||
socket.end(buffer, 'utf8', cb)
|
||||
})
|
||||
try {
|
||||
await pTimeout.call(promise, this.#messageTimeout)
|
||||
} catch (error) {
|
||||
socket.destroy()
|
||||
}
|
||||
await this.#write(buffer)
|
||||
await this.#serverSocket.destroy()
|
||||
this.#serverSocket = undefined
|
||||
this.#connected = false
|
||||
}
|
||||
@@ -216,13 +195,11 @@ export default class NbdClient {
|
||||
}
|
||||
|
||||
#read(length) {
|
||||
const promise = readChunkStrict(this.#serverSocket, length)
|
||||
return pTimeout.call(promise, this.#messageTimeout)
|
||||
return readChunkStrict(this.#serverSocket, length)
|
||||
}
|
||||
|
||||
#write(buffer) {
|
||||
const promise = fromCallback.call(this.#serverSocket, 'write', buffer)
|
||||
return pTimeout.call(promise, this.#messageTimeout)
|
||||
return fromCallback.call(this.#serverSocket, 'write', buffer)
|
||||
}
|
||||
|
||||
async #readInt32() {
|
||||
@@ -255,20 +232,19 @@ export default class NbdClient {
|
||||
}
|
||||
try {
|
||||
this.#waitingForResponse = true
|
||||
const buffer = await this.#read(16)
|
||||
const magic = buffer.readInt32BE(0)
|
||||
const magic = await this.#readInt32()
|
||||
|
||||
if (magic !== NBD_REPLY_MAGIC) {
|
||||
throw new Error(`magic number for block answer is wrong : ${magic} ${NBD_REPLY_MAGIC}`)
|
||||
}
|
||||
|
||||
const error = buffer.readInt32BE(4)
|
||||
const error = await this.#readInt32()
|
||||
if (error !== 0) {
|
||||
// @todo use error code from constants.mjs
|
||||
throw new Error(`GOT ERROR CODE : ${error}`)
|
||||
}
|
||||
|
||||
const blockQueryId = buffer.readBigUInt64BE(8)
|
||||
const blockQueryId = await this.#readInt64()
|
||||
const query = this.#commandQueryBacklog.get(blockQueryId)
|
||||
if (!query) {
|
||||
throw new Error(` no query associated with id ${blockQueryId}`)
|
||||
@@ -289,7 +265,7 @@ export default class NbdClient {
|
||||
}
|
||||
}
|
||||
|
||||
async #readBlock(index, size) {
|
||||
async readBlock(index, size = NBD_DEFAULT_BLOCK_SIZE) {
|
||||
// we don't want to add anything in backlog while reconnecting
|
||||
if (this.#reconnectingPromise) {
|
||||
await this.#reconnectingPromise
|
||||
@@ -305,13 +281,7 @@ export default class NbdClient {
|
||||
buffer.writeInt16BE(NBD_CMD_READ, 6) // we want to read a data block
|
||||
buffer.writeBigUInt64BE(queryId, 8)
|
||||
// byte offset in the raw disk
|
||||
const offset = BigInt(index) * BigInt(size)
|
||||
const remaining = this.#exportSize - offset
|
||||
if (remaining < BigInt(size)) {
|
||||
size = Number(remaining)
|
||||
}
|
||||
|
||||
buffer.writeBigUInt64BE(offset, 16)
|
||||
buffer.writeBigUInt64BE(BigInt(index) * BigInt(size), 16)
|
||||
buffer.writeInt32BE(size, 24)
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
@@ -337,13 +307,45 @@ export default class NbdClient {
|
||||
})
|
||||
}
|
||||
|
||||
async readBlock(index, size = NBD_DEFAULT_BLOCK_SIZE) {
|
||||
return pRetry(() => this.#readBlock(index, size), {
|
||||
tries: this.#readBlockRetries,
|
||||
onRetry: async err => {
|
||||
warn('will retry reading block ', index, err)
|
||||
await this.reconnect()
|
||||
},
|
||||
})
|
||||
async *readBlocks(indexGenerator) {
|
||||
// default : read all blocks
|
||||
if (indexGenerator === undefined) {
|
||||
const exportSize = this.#exportSize
|
||||
const chunkSize = 2 * 1024 * 1024
|
||||
indexGenerator = function* () {
|
||||
const nbBlocks = Math.ceil(Number(exportSize / BigInt(chunkSize)))
|
||||
for (let index = 0; BigInt(index) < nbBlocks; index++) {
|
||||
yield { index, size: chunkSize }
|
||||
}
|
||||
}
|
||||
}
|
||||
const readAhead = []
|
||||
const readAheadMaxLength = this.#readAhead
|
||||
const makeReadBlockPromise = (index, size) => {
|
||||
const promise = pRetry(() => this.readBlock(index, size), {
|
||||
tries: this.#readBlockRetries,
|
||||
onRetry: async err => {
|
||||
warn('will retry reading block ', index, err)
|
||||
await this.reconnect()
|
||||
},
|
||||
})
|
||||
// error is handled during unshift
|
||||
promise.catch(() => {})
|
||||
return promise
|
||||
}
|
||||
|
||||
// read all blocks, but try to keep readAheadMaxLength promise waiting ahead
|
||||
for (const { index, size } of indexGenerator()) {
|
||||
// stack readAheadMaxLength promises before starting to handle the results
|
||||
if (readAhead.length === readAheadMaxLength) {
|
||||
// any error will stop reading blocks
|
||||
yield readAhead.shift()
|
||||
}
|
||||
|
||||
readAhead.push(makeReadBlockPromise(index, size))
|
||||
}
|
||||
while (readAhead.length > 0) {
|
||||
yield readAhead.shift()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,87 +0,0 @@
|
||||
import { asyncEach } from '@vates/async-each'
|
||||
import { NBD_DEFAULT_BLOCK_SIZE } from './constants.mjs'
|
||||
import NbdClient from './index.mjs'
|
||||
import { createLogger } from '@xen-orchestra/log'
|
||||
|
||||
const { warn } = createLogger('vates:nbd-client:multi')
|
||||
export default class MultiNbdClient {
|
||||
#clients = []
|
||||
#readAhead
|
||||
|
||||
get exportSize() {
|
||||
return this.#clients[0].exportSize
|
||||
}
|
||||
|
||||
constructor(settings, { nbdConcurrency = 8, readAhead = 16, ...options } = {}) {
|
||||
this.#readAhead = readAhead
|
||||
if (!Array.isArray(settings)) {
|
||||
settings = [settings]
|
||||
}
|
||||
for (let i = 0; i < nbdConcurrency; i++) {
|
||||
this.#clients.push(
|
||||
new NbdClient(settings[i % settings.length], { ...options, readAhead: Math.ceil(readAhead / nbdConcurrency) })
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
async connect() {
|
||||
const connectedClients = []
|
||||
for (const clientId in this.#clients) {
|
||||
const client = this.#clients[clientId]
|
||||
try {
|
||||
await client.connect()
|
||||
connectedClients.push(client)
|
||||
} catch (err) {
|
||||
client.disconnect().catch(() => {})
|
||||
warn(`can't connect to one nbd client`, { err })
|
||||
}
|
||||
}
|
||||
if (connectedClients.length === 0) {
|
||||
throw new Error(`Fail to connect to any Nbd client`)
|
||||
}
|
||||
if (connectedClients.length < this.#clients.length) {
|
||||
warn(
|
||||
`incomplete connection by multi Nbd, only ${connectedClients.length} over ${
|
||||
this.#clients.length
|
||||
} expected clients`
|
||||
)
|
||||
this.#clients = connectedClients
|
||||
}
|
||||
}
|
||||
|
||||
async disconnect() {
|
||||
await asyncEach(this.#clients, client => client.disconnect(), {
|
||||
stopOnError: false,
|
||||
})
|
||||
}
|
||||
|
||||
async readBlock(index, size = NBD_DEFAULT_BLOCK_SIZE) {
|
||||
const clientId = index % this.#clients.length
|
||||
return this.#clients[clientId].readBlock(index, size)
|
||||
}
|
||||
|
||||
async *readBlocks(indexGenerator) {
|
||||
// default : read all blocks
|
||||
const readAhead = []
|
||||
const makeReadBlockPromise = (index, size) => {
|
||||
const promise = this.readBlock(index, size)
|
||||
// error is handled during unshift
|
||||
promise.catch(() => {})
|
||||
return promise
|
||||
}
|
||||
|
||||
// read all blocks, but try to keep readAheadMaxLength promise waiting ahead
|
||||
for (const { index, size } of indexGenerator()) {
|
||||
// stack readAheadMaxLength promises before starting to handle the results
|
||||
if (readAhead.length === this.#readAhead) {
|
||||
// any error will stop reading blocks
|
||||
yield readAhead.shift()
|
||||
}
|
||||
|
||||
readAhead.push(makeReadBlockPromise(index, size))
|
||||
}
|
||||
while (readAhead.length > 0) {
|
||||
yield readAhead.shift()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -13,18 +13,17 @@
|
||||
"url": "https://vates.fr"
|
||||
},
|
||||
"license": "ISC",
|
||||
"version": "3.0.0",
|
||||
"version": "1.2.1",
|
||||
"engines": {
|
||||
"node": ">=14.0"
|
||||
},
|
||||
"main": "./index.mjs",
|
||||
"dependencies": {
|
||||
"@vates/async-each": "^1.0.0",
|
||||
"@vates/read-chunk": "^1.2.0",
|
||||
"@vates/read-chunk": "^1.1.1",
|
||||
"@xen-orchestra/async-map": "^0.1.2",
|
||||
"@xen-orchestra/log": "^0.6.0",
|
||||
"promise-toolbox": "^0.21.0",
|
||||
"xen-api": "^2.0.0"
|
||||
"xen-api": "^1.3.3"
|
||||
},
|
||||
"devDependencies": {
|
||||
"tap": "^16.3.0",
|
||||
@@ -32,6 +31,6 @@
|
||||
},
|
||||
"scripts": {
|
||||
"postversion": "npm publish --access public",
|
||||
"test-integration": "tap --lines 97 --functions 95 --branches 74 --statements 97 tests/*.integ.mjs"
|
||||
"test-integration": "tap --lines 97 --functions 95 --branches 74 --statements 97 tests/*.integ.js"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
import { spawn, exec } from 'node:child_process'
|
||||
import fs from 'node:fs/promises'
|
||||
import { test } from 'tap'
|
||||
import tmp from 'tmp'
|
||||
import { pFromCallback } from 'promise-toolbox'
|
||||
import { Socket } from 'node:net'
|
||||
import { NBD_DEFAULT_PORT } from '../constants.mjs'
|
||||
import assert from 'node:assert'
|
||||
import MultiNbdClient from '../multi.mjs'
|
||||
'use strict'
|
||||
const NbdClient = require('../index.js')
|
||||
const { spawn, exec } = require('node:child_process')
|
||||
const fs = require('node:fs/promises')
|
||||
const { test } = require('tap')
|
||||
const tmp = require('tmp')
|
||||
const { pFromCallback } = require('promise-toolbox')
|
||||
const { Socket } = require('node:net')
|
||||
const { NBD_DEFAULT_PORT } = require('../constants.js')
|
||||
const assert = require('node:assert')
|
||||
|
||||
const CHUNK_SIZE = 1024 * 1024 // non default size
|
||||
const FILE_SIZE = 1024 * 1024 * 9.5 // non aligned file size
|
||||
const FILE_SIZE = 10 * 1024 * 1024
|
||||
|
||||
async function createTempFile(size) {
|
||||
const tmpPath = await pFromCallback(cb => tmp.file(cb))
|
||||
@@ -82,7 +82,7 @@ test('it works with unsecured network', async tap => {
|
||||
const path = await createTempFile(FILE_SIZE)
|
||||
|
||||
let nbdServer = await spawnNbdKit(path)
|
||||
const client = new MultiNbdClient(
|
||||
const client = new NbdClient(
|
||||
{
|
||||
address: '127.0.0.1',
|
||||
exportname: 'MY_SECRET_EXPORT',
|
||||
@@ -110,13 +110,13 @@ CYu1Xn/FVPx1HoRgWc7E8wFhDcA/P3SJtfIQWHB9FzSaBflKGR4t8WCE2eE8+cTB
|
||||
`,
|
||||
},
|
||||
{
|
||||
nbdConcurrency: 1,
|
||||
readAhead: 2,
|
||||
}
|
||||
)
|
||||
|
||||
await client.connect()
|
||||
tap.equal(client.exportSize, BigInt(FILE_SIZE))
|
||||
const CHUNK_SIZE = 1024 * 1024 // non default size
|
||||
const indexes = []
|
||||
for (let i = 0; i < FILE_SIZE / CHUNK_SIZE; i++) {
|
||||
indexes.push(i)
|
||||
@@ -128,9 +128,9 @@ CYu1Xn/FVPx1HoRgWc7E8wFhDcA/P3SJtfIQWHB9FzSaBflKGR4t8WCE2eE8+cTB
|
||||
})
|
||||
let i = 0
|
||||
for await (const block of nbdIterator) {
|
||||
let blockOk = block.length === Math.min(CHUNK_SIZE, FILE_SIZE - CHUNK_SIZE * i)
|
||||
let blockOk = true
|
||||
let firstFail
|
||||
for (let j = 0; j < block.length; j += 4) {
|
||||
for (let j = 0; j < CHUNK_SIZE; j += 4) {
|
||||
const wanted = i * CHUNK_SIZE + j
|
||||
const found = block.readUInt32BE(j)
|
||||
blockOk = blockOk && found === wanted
|
||||
@@ -138,7 +138,7 @@ CYu1Xn/FVPx1HoRgWc7E8wFhDcA/P3SJtfIQWHB9FzSaBflKGR4t8WCE2eE8+cTB
|
||||
firstFail = j
|
||||
}
|
||||
}
|
||||
tap.ok(blockOk, `check block ${i} content ${block.length}`)
|
||||
tap.ok(blockOk, `check block ${i} content`)
|
||||
i++
|
||||
|
||||
// flaky server is flaky
|
||||
@@ -148,6 +148,17 @@ CYu1Xn/FVPx1HoRgWc7E8wFhDcA/P3SJtfIQWHB9FzSaBflKGR4t8WCE2eE8+cTB
|
||||
nbdServer = await spawnNbdKit(path)
|
||||
}
|
||||
}
|
||||
|
||||
// we can reuse the conneciton to read other blocks
|
||||
// default iterator
|
||||
const nbdIteratorWithDefaultBlockIterator = client.readBlocks()
|
||||
let nb = 0
|
||||
for await (const block of nbdIteratorWithDefaultBlockIterator) {
|
||||
nb++
|
||||
tap.equal(block.length, 2 * 1024 * 1024)
|
||||
}
|
||||
|
||||
tap.equal(nb, 5)
|
||||
assert.rejects(() => client.readBlock(100, CHUNK_SIZE))
|
||||
|
||||
await client.disconnect()
|
||||
@@ -1,3 +1,4 @@
|
||||
'use strict'
|
||||
/*
|
||||
|
||||
node-vsphere-soap
|
||||
@@ -11,18 +12,17 @@
|
||||
|
||||
*/
|
||||
|
||||
import { EventEmitter } from 'events'
|
||||
import axios from 'axios'
|
||||
import https from 'node:https'
|
||||
import util from 'util'
|
||||
import soap from 'soap'
|
||||
import Cookie from 'soap-cookie' // required for session persistence
|
||||
|
||||
const EventEmitter = require('events').EventEmitter
|
||||
const axios = require('axios')
|
||||
const https = require('node:https')
|
||||
const util = require('util')
|
||||
const soap = require('soap')
|
||||
const Cookie = require('soap-cookie') // required for session persistence
|
||||
// Client class
|
||||
// inherits from EventEmitter
|
||||
// possible events: connect, error, ready
|
||||
|
||||
export function Client(vCenterHostname, username, password, sslVerify) {
|
||||
function Client(vCenterHostname, username, password, sslVerify) {
|
||||
this.status = 'disconnected'
|
||||
this.reconnectCount = 0
|
||||
|
||||
@@ -228,3 +228,4 @@ function _soapErrorHandler(self, emitter, command, args, err) {
|
||||
}
|
||||
|
||||
// end
|
||||
exports.Client = Client
|
||||
@@ -1,8 +1,8 @@
|
||||
{
|
||||
"name": "@vates/node-vsphere-soap",
|
||||
"version": "2.0.0",
|
||||
"version": "1.0.0",
|
||||
"description": "interface to vSphere SOAP/WSDL from node for interfacing with vCenter or ESXi, forked from node-vsphere-soap",
|
||||
"main": "lib/client.mjs",
|
||||
"main": "lib/client.js",
|
||||
"author": "reedog117",
|
||||
"repository": {
|
||||
"directory": "@vates/node-vsphere-soap",
|
||||
@@ -30,7 +30,7 @@
|
||||
"private": false,
|
||||
"homepage": "https://github.com/vatesfr/xen-orchestra/tree/master/@vates/node-vsphere-soap",
|
||||
"engines": {
|
||||
"node": ">=14"
|
||||
"node": ">=8.10"
|
||||
},
|
||||
"scripts": {
|
||||
"postversion": "npm publish --access public"
|
||||
|
||||
@@ -1,11 +1,15 @@
|
||||
'use strict'
|
||||
|
||||
// place your own credentials here for a vCenter or ESXi server
|
||||
// this information will be used for connecting to a vCenter instance
|
||||
// for module testing
|
||||
// name the file config-test.js
|
||||
|
||||
export const vCenterTestCreds = {
|
||||
const vCenterTestCreds = {
|
||||
vCenterIP: 'vcsa',
|
||||
vCenterUser: 'vcuser',
|
||||
vCenterPassword: 'vcpw',
|
||||
vCenter: true,
|
||||
}
|
||||
|
||||
exports.vCenterTestCreds = vCenterTestCreds
|
||||
@@ -1,16 +1,18 @@
|
||||
'use strict'
|
||||
|
||||
/*
|
||||
vsphere-soap.test.js
|
||||
|
||||
tests for the vCenterConnectionInstance class
|
||||
*/
|
||||
|
||||
import assert from 'assert'
|
||||
import { describe, it } from 'test'
|
||||
const assert = require('assert')
|
||||
const { describe, it } = require('test')
|
||||
|
||||
import * as vc from '../lib/client.mjs'
|
||||
const vc = require('../lib/client')
|
||||
|
||||
// eslint-disable-next-line n/no-missing-import
|
||||
import { vCenterTestCreds as TestCreds } from '../config-test.mjs'
|
||||
// eslint-disable-next-line n/no-missing-require
|
||||
const TestCreds = require('../config-test.js').vCenterTestCreds
|
||||
|
||||
const VItest = new vc.Client(TestCreds.vCenterIP, TestCreds.vCenterUser, TestCreds.vCenterPassword, false)
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
'use strict'
|
||||
|
||||
const assert = require('assert')
|
||||
const isUtf8 = require('isutf8')
|
||||
|
||||
/**
|
||||
* Read a chunk of data from a stream.
|
||||
@@ -22,41 +21,41 @@ const readChunk = (stream, size) =>
|
||||
stream.errored != null
|
||||
? Promise.reject(stream.errored)
|
||||
: stream.closed || stream.readableEnded
|
||||
? Promise.resolve(null)
|
||||
: new Promise((resolve, reject) => {
|
||||
if (size !== undefined) {
|
||||
assert(size > 0)
|
||||
? Promise.resolve(null)
|
||||
: new Promise((resolve, reject) => {
|
||||
if (size !== undefined) {
|
||||
assert(size > 0)
|
||||
|
||||
// per Node documentation:
|
||||
// > The size argument must be less than or equal to 1 GiB.
|
||||
assert(size < 1073741824)
|
||||
}
|
||||
// per Node documentation:
|
||||
// > The size argument must be less than or equal to 1 GiB.
|
||||
assert(size < 1073741824)
|
||||
}
|
||||
|
||||
function onEnd() {
|
||||
resolve(null)
|
||||
function onEnd() {
|
||||
resolve(null)
|
||||
removeListeners()
|
||||
}
|
||||
function onError(error) {
|
||||
reject(error)
|
||||
removeListeners()
|
||||
}
|
||||
function onReadable() {
|
||||
const data = stream.read(size)
|
||||
if (data !== null) {
|
||||
resolve(data)
|
||||
removeListeners()
|
||||
}
|
||||
function onError(error) {
|
||||
reject(error)
|
||||
removeListeners()
|
||||
}
|
||||
function onReadable() {
|
||||
const data = stream.read(size)
|
||||
if (data !== null) {
|
||||
resolve(data)
|
||||
removeListeners()
|
||||
}
|
||||
}
|
||||
function removeListeners() {
|
||||
stream.removeListener('end', onEnd)
|
||||
stream.removeListener('error', onError)
|
||||
stream.removeListener('readable', onReadable)
|
||||
}
|
||||
stream.on('end', onEnd)
|
||||
stream.on('error', onError)
|
||||
stream.on('readable', onReadable)
|
||||
onReadable()
|
||||
})
|
||||
}
|
||||
function removeListeners() {
|
||||
stream.removeListener('end', onEnd)
|
||||
stream.removeListener('error', onError)
|
||||
stream.removeListener('readable', onReadable)
|
||||
}
|
||||
stream.on('end', onEnd)
|
||||
stream.on('error', onError)
|
||||
stream.on('readable', onReadable)
|
||||
onReadable()
|
||||
})
|
||||
exports.readChunk = readChunk
|
||||
|
||||
/**
|
||||
@@ -82,13 +81,6 @@ exports.readChunkStrict = async function readChunkStrict(stream, size) {
|
||||
|
||||
if (size !== undefined && chunk.length !== size) {
|
||||
const error = new Error(`stream has ended with not enough data (actual: ${chunk.length}, expected: ${size})`)
|
||||
|
||||
// Buffer.isUtf8 is too recent for now
|
||||
// @todo : replace external package by Buffer.isUtf8 when the supported version of node reach 18
|
||||
|
||||
if (chunk.length < 1024 && isUtf8(chunk)) {
|
||||
error.text = chunk.toString('utf8')
|
||||
}
|
||||
Object.defineProperties(error, {
|
||||
chunk: {
|
||||
value: chunk,
|
||||
@@ -111,42 +103,42 @@ async function skip(stream, size) {
|
||||
return stream.errored != null
|
||||
? Promise.reject(stream.errored)
|
||||
: size === 0 || stream.closed || stream.readableEnded
|
||||
? Promise.resolve(0)
|
||||
: new Promise((resolve, reject) => {
|
||||
let left = size
|
||||
function onEnd() {
|
||||
resolve(size - left)
|
||||
removeListeners()
|
||||
}
|
||||
function onError(error) {
|
||||
reject(error)
|
||||
removeListeners()
|
||||
}
|
||||
function onReadable() {
|
||||
const data = stream.read()
|
||||
left -= data === null ? 0 : data.length
|
||||
if (left > 0) {
|
||||
// continue to read
|
||||
} else {
|
||||
// if more than wanted has been read, push back the rest
|
||||
if (left < 0) {
|
||||
stream.unshift(data.slice(left))
|
||||
}
|
||||
|
||||
resolve(size)
|
||||
removeListeners()
|
||||
? Promise.resolve(0)
|
||||
: new Promise((resolve, reject) => {
|
||||
let left = size
|
||||
function onEnd() {
|
||||
resolve(size - left)
|
||||
removeListeners()
|
||||
}
|
||||
function onError(error) {
|
||||
reject(error)
|
||||
removeListeners()
|
||||
}
|
||||
function onReadable() {
|
||||
const data = stream.read()
|
||||
left -= data === null ? 0 : data.length
|
||||
if (left > 0) {
|
||||
// continue to read
|
||||
} else {
|
||||
// if more than wanted has been read, push back the rest
|
||||
if (left < 0) {
|
||||
stream.unshift(data.slice(left))
|
||||
}
|
||||
|
||||
resolve(size)
|
||||
removeListeners()
|
||||
}
|
||||
function removeListeners() {
|
||||
stream.removeListener('end', onEnd)
|
||||
stream.removeListener('error', onError)
|
||||
stream.removeListener('readable', onReadable)
|
||||
}
|
||||
stream.on('end', onEnd)
|
||||
stream.on('error', onError)
|
||||
stream.on('readable', onReadable)
|
||||
onReadable()
|
||||
})
|
||||
}
|
||||
function removeListeners() {
|
||||
stream.removeListener('end', onEnd)
|
||||
stream.removeListener('error', onError)
|
||||
stream.removeListener('readable', onReadable)
|
||||
}
|
||||
stream.on('end', onEnd)
|
||||
stream.on('error', onError)
|
||||
stream.on('readable', onReadable)
|
||||
onReadable()
|
||||
})
|
||||
}
|
||||
exports.skip = skip
|
||||
|
||||
|
||||
@@ -102,37 +102,12 @@ describe('readChunkStrict', function () {
|
||||
assert.strictEqual(error.chunk, undefined)
|
||||
})
|
||||
|
||||
it('throws if stream ends with not enough data, utf8', async () => {
|
||||
it('throws if stream ends with not enough data', async () => {
|
||||
const error = await rejectionOf(readChunkStrict(makeStream(['foo', 'bar']), 10))
|
||||
assert(error instanceof Error)
|
||||
assert.strictEqual(error.message, 'stream has ended with not enough data (actual: 6, expected: 10)')
|
||||
assert.strictEqual(error.text, 'foobar')
|
||||
assert.deepEqual(error.chunk, Buffer.from('foobar'))
|
||||
})
|
||||
|
||||
it('throws if stream ends with not enough data, non utf8 ', async () => {
|
||||
const source = [Buffer.alloc(10, 128), Buffer.alloc(10, 128)]
|
||||
const error = await rejectionOf(readChunkStrict(makeStream(source), 30))
|
||||
assert(error instanceof Error)
|
||||
assert.strictEqual(error.message, 'stream has ended with not enough data (actual: 20, expected: 30)')
|
||||
assert.strictEqual(error.text, undefined)
|
||||
assert.deepEqual(error.chunk, Buffer.concat(source))
|
||||
})
|
||||
|
||||
it('throws if stream ends with not enough data, utf8 , long data', async () => {
|
||||
const source = Buffer.from('a'.repeat(1500))
|
||||
const error = await rejectionOf(readChunkStrict(makeStream([source]), 2000))
|
||||
assert(error instanceof Error)
|
||||
assert.strictEqual(error.message, `stream has ended with not enough data (actual: 1500, expected: 2000)`)
|
||||
assert.strictEqual(error.text, undefined)
|
||||
assert.deepEqual(error.chunk, source)
|
||||
})
|
||||
|
||||
it('succeed', async () => {
|
||||
const source = Buffer.from('a'.repeat(20))
|
||||
const chunk = await readChunkStrict(makeStream([source]), 10)
|
||||
assert.deepEqual(source.subarray(10), chunk)
|
||||
})
|
||||
})
|
||||
|
||||
describe('skip', function () {
|
||||
@@ -159,16 +134,6 @@ describe('skip', function () {
|
||||
it('returns less size if stream ends', async () => {
|
||||
assert.deepEqual(await skip(makeStream('foo bar'), 10), 7)
|
||||
})
|
||||
|
||||
it('put back if it read too much', async () => {
|
||||
let source = makeStream(['foo', 'bar'])
|
||||
await skip(source, 1) // read part of data chunk
|
||||
const chunk = (await readChunkStrict(source, 2)).toString('utf-8')
|
||||
assert.strictEqual(chunk, 'oo')
|
||||
|
||||
source = makeStream(['foo', 'bar'])
|
||||
assert.strictEqual(await skip(source, 3), 3) // read aligned with data chunk
|
||||
})
|
||||
})
|
||||
|
||||
describe('skipStrict', function () {
|
||||
@@ -179,9 +144,4 @@ describe('skipStrict', function () {
|
||||
assert.strictEqual(error.message, 'stream has ended with not enough data (actual: 7, expected: 10)')
|
||||
assert.deepEqual(error.bytesSkipped, 7)
|
||||
})
|
||||
it('succeed', async () => {
|
||||
const source = makeStream(['foo', 'bar', 'baz'])
|
||||
const res = await skipStrict(source, 4)
|
||||
assert.strictEqual(res, undefined)
|
||||
})
|
||||
})
|
||||
|
||||
@@ -19,7 +19,7 @@
|
||||
"type": "git",
|
||||
"url": "https://github.com/vatesfr/xen-orchestra.git"
|
||||
},
|
||||
"version": "1.2.0",
|
||||
"version": "1.1.1",
|
||||
"engines": {
|
||||
"node": ">=8.10"
|
||||
},
|
||||
@@ -33,8 +33,5 @@
|
||||
},
|
||||
"devDependencies": {
|
||||
"test": "^3.2.1"
|
||||
},
|
||||
"dependencies": {
|
||||
"isutf8": "^4.0.0"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@
|
||||
"license": "ISC",
|
||||
"version": "0.1.0",
|
||||
"engines": {
|
||||
"node": ">=12.3"
|
||||
"node": ">=10"
|
||||
},
|
||||
"scripts": {
|
||||
"postversion": "npm publish --access public",
|
||||
|
||||
@@ -111,7 +111,7 @@ const onProgress = makeOnProgress({
|
||||
// current status of the task as described in the previous section
|
||||
taskLog.status
|
||||
|
||||
// undefined or a dictionary of properties attached to the task
|
||||
// undefined or a dictionnary of properties attached to the task
|
||||
taskLog.properties
|
||||
|
||||
// timestamp at which the abortion was requested, undefined otherwise
|
||||
|
||||
@@ -35,7 +35,7 @@
|
||||
"test": "node--test"
|
||||
},
|
||||
"devDependencies": {
|
||||
"sinon": "^17.0.1",
|
||||
"sinon": "^15.0.1",
|
||||
"test": "^3.2.1"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { asyncMap } from '@xen-orchestra/async-map'
|
||||
import { RemoteAdapter } from '@xen-orchestra/backups/RemoteAdapter.mjs'
|
||||
import { RemoteAdapter } from '@xen-orchestra/backups/RemoteAdapter.js'
|
||||
import { getSyncedHandler } from '@xen-orchestra/fs'
|
||||
import getopts from 'getopts'
|
||||
import { basename, dirname } from 'path'
|
||||
|
||||
@@ -7,9 +7,9 @@
|
||||
"bugs": "https://github.com/vatesfr/xen-orchestra/issues",
|
||||
"dependencies": {
|
||||
"@xen-orchestra/async-map": "^0.1.2",
|
||||
"@xen-orchestra/backups": "^0.44.3",
|
||||
"@xen-orchestra/fs": "^4.1.3",
|
||||
"filenamify": "^6.0.0",
|
||||
"@xen-orchestra/backups": "^0.39.0",
|
||||
"@xen-orchestra/fs": "^4.0.1",
|
||||
"filenamify": "^4.1.0",
|
||||
"getopts": "^2.2.5",
|
||||
"lodash": "^4.17.15",
|
||||
"promise-toolbox": "^0.21.0"
|
||||
@@ -27,7 +27,7 @@
|
||||
"scripts": {
|
||||
"postversion": "npm publish --access public"
|
||||
},
|
||||
"version": "1.0.14",
|
||||
"version": "1.0.9",
|
||||
"license": "AGPL-3.0-or-later",
|
||||
"author": {
|
||||
"name": "Vates SAS",
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
import { Metadata } from './_runners/Metadata.mjs'
|
||||
import { VmsRemote } from './_runners/VmsRemote.mjs'
|
||||
import { VmsXapi } from './_runners/VmsXapi.mjs'
|
||||
'use strict'
|
||||
|
||||
export function createRunner(opts) {
|
||||
const { Metadata } = require('./_runners/Metadata.js')
|
||||
const { VmsRemote } = require('./_runners/VmsRemote.js')
|
||||
const { VmsXapi } = require('./_runners/VmsXapi.js')
|
||||
|
||||
exports.createRunner = function createRunner(opts) {
|
||||
const { type } = opts.job
|
||||
switch (type) {
|
||||
case 'backup':
|
||||
@@ -1,6 +1,8 @@
|
||||
import { asyncMap } from '@xen-orchestra/async-map'
|
||||
'use strict'
|
||||
|
||||
export class DurablePartition {
|
||||
const { asyncMap } = require('@xen-orchestra/async-map')
|
||||
|
||||
exports.DurablePartition = class DurablePartition {
|
||||
// private resource API is used exceptionally to be able to separate resource creation and release
|
||||
#partitionDisposers = {}
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import { Task } from './Task.mjs'
|
||||
'use strict'
|
||||
|
||||
export class HealthCheckVmBackup {
|
||||
const { Task } = require('./Task')
|
||||
|
||||
exports.HealthCheckVmBackup = class HealthCheckVmBackup {
|
||||
#restoredVm
|
||||
#timeout
|
||||
#xapi
|
||||
73
@xen-orchestra/backups/ImportVmBackup.js
Normal file
73
@xen-orchestra/backups/ImportVmBackup.js
Normal file
@@ -0,0 +1,73 @@
|
||||
'use strict'
|
||||
|
||||
const assert = require('assert')
|
||||
|
||||
const { formatFilenameDate } = require('./_filenameDate.js')
|
||||
const { importIncrementalVm } = require('./_incrementalVm.js')
|
||||
const { Task } = require('./Task.js')
|
||||
const { watchStreamSize } = require('./_watchStreamSize.js')
|
||||
|
||||
exports.ImportVmBackup = class ImportVmBackup {
|
||||
constructor({ adapter, metadata, srUuid, xapi, settings: { newMacAddresses, mapVdisSrs = {} } = {} }) {
|
||||
this._adapter = adapter
|
||||
this._importIncrementalVmSettings = { newMacAddresses, mapVdisSrs }
|
||||
this._metadata = metadata
|
||||
this._srUuid = srUuid
|
||||
this._xapi = xapi
|
||||
}
|
||||
|
||||
async run() {
|
||||
const adapter = this._adapter
|
||||
const metadata = this._metadata
|
||||
const isFull = metadata.mode === 'full'
|
||||
|
||||
const sizeContainer = { size: 0 }
|
||||
|
||||
let backup
|
||||
if (isFull) {
|
||||
backup = await adapter.readFullVmBackup(metadata)
|
||||
watchStreamSize(backup, sizeContainer)
|
||||
} else {
|
||||
assert.strictEqual(metadata.mode, 'delta')
|
||||
|
||||
const ignoredVdis = new Set(
|
||||
Object.entries(this._importIncrementalVmSettings.mapVdisSrs)
|
||||
.filter(([_, srUuid]) => srUuid === null)
|
||||
.map(([vdiUuid]) => vdiUuid)
|
||||
)
|
||||
backup = await adapter.readIncrementalVmBackup(metadata, ignoredVdis)
|
||||
Object.values(backup.streams).forEach(stream => watchStreamSize(stream, sizeContainer))
|
||||
}
|
||||
|
||||
return Task.run(
|
||||
{
|
||||
name: 'transfer',
|
||||
},
|
||||
async () => {
|
||||
const xapi = this._xapi
|
||||
const srRef = await xapi.call('SR.get_by_uuid', this._srUuid)
|
||||
|
||||
const vmRef = isFull
|
||||
? await xapi.VM_import(backup, srRef)
|
||||
: await importIncrementalVm(backup, await xapi.getRecord('SR', srRef), {
|
||||
...this._importIncrementalVmSettings,
|
||||
detectBase: false,
|
||||
})
|
||||
|
||||
await Promise.all([
|
||||
xapi.call('VM.add_tags', vmRef, 'restored from backup'),
|
||||
xapi.call(
|
||||
'VM.set_name_label',
|
||||
vmRef,
|
||||
`${metadata.vm.name_label} (${formatFilenameDate(metadata.timestamp)})`
|
||||
),
|
||||
])
|
||||
|
||||
return {
|
||||
size: sizeContainer.size,
|
||||
id: await xapi.getField('VM', vmRef, 'uuid'),
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -1,280 +0,0 @@
|
||||
import assert from 'node:assert'
|
||||
|
||||
import { formatFilenameDate } from './_filenameDate.mjs'
|
||||
import { importIncrementalVm } from './_incrementalVm.mjs'
|
||||
import { Task } from './Task.mjs'
|
||||
import { watchStreamSize } from './_watchStreamSize.mjs'
|
||||
import { VhdNegative, VhdSynthetic } from 'vhd-lib'
|
||||
import { decorateClass } from '@vates/decorate-with'
|
||||
import { createLogger } from '@xen-orchestra/log'
|
||||
import { dirname, join } from 'node:path'
|
||||
import pickBy from 'lodash/pickBy.js'
|
||||
import { defer } from 'golike-defer'
|
||||
|
||||
const { debug, info, warn } = createLogger('xo:backups:importVmBackup')
|
||||
async function resolveUuid(xapi, cache, uuid, type) {
|
||||
if (uuid == null) {
|
||||
return uuid
|
||||
}
|
||||
const ref = cache.get(uuid)
|
||||
if (ref === undefined) {
|
||||
cache.set(uuid, xapi.call(`${type}.get_by_uuid`, uuid))
|
||||
}
|
||||
return cache.get(uuid)
|
||||
}
|
||||
export class ImportVmBackup {
|
||||
constructor({
|
||||
adapter,
|
||||
metadata,
|
||||
srUuid,
|
||||
xapi,
|
||||
settings: { additionnalVmTag, newMacAddresses, mapVdisSrs = {}, useDifferentialRestore = false } = {},
|
||||
}) {
|
||||
this._adapter = adapter
|
||||
this._importIncrementalVmSettings = { additionnalVmTag, newMacAddresses, mapVdisSrs, useDifferentialRestore }
|
||||
this._metadata = metadata
|
||||
this._srUuid = srUuid
|
||||
this._xapi = xapi
|
||||
}
|
||||
|
||||
async #getPathOfVdiSnapshot(snapshotUuid) {
|
||||
const metadata = this._metadata
|
||||
if (this._pathToVdis === undefined) {
|
||||
const backups = await this._adapter.listVmBackups(
|
||||
this._metadata.vm.uuid,
|
||||
({ mode, timestamp }) => mode === 'delta' && timestamp >= metadata.timestamp
|
||||
)
|
||||
const map = new Map()
|
||||
for (const backup of backups) {
|
||||
for (const [vdiRef, vdi] of Object.entries(backup.vdis)) {
|
||||
map.set(vdi.uuid, backup.vhds[vdiRef])
|
||||
}
|
||||
}
|
||||
this._pathToVdis = map
|
||||
}
|
||||
return this._pathToVdis.get(snapshotUuid)
|
||||
}
|
||||
|
||||
async _reuseNearestSnapshot($defer, ignoredVdis) {
|
||||
const metadata = this._metadata
|
||||
const { mapVdisSrs } = this._importIncrementalVmSettings
|
||||
const { vbds, vhds, vifs, vm, vmSnapshot } = metadata
|
||||
const streams = {}
|
||||
const metdataDir = dirname(metadata._filename)
|
||||
const vdis = ignoredVdis === undefined ? metadata.vdis : pickBy(metadata.vdis, vdi => !ignoredVdis.has(vdi.uuid))
|
||||
|
||||
for (const [vdiRef, vdi] of Object.entries(vdis)) {
|
||||
const vhdPath = join(metdataDir, vhds[vdiRef])
|
||||
|
||||
let xapiDisk
|
||||
try {
|
||||
xapiDisk = await this._xapi.getRecordByUuid('VDI', vdi.$snapshot_of$uuid)
|
||||
} catch (err) {
|
||||
// if this disk is not present anymore, fall back to default restore
|
||||
warn(err)
|
||||
}
|
||||
|
||||
let snapshotCandidate, backupCandidate
|
||||
if (xapiDisk !== undefined) {
|
||||
debug('found disks, wlll search its snapshots', { snapshots: xapiDisk.snapshots })
|
||||
for (const snapshotRef of xapiDisk.snapshots) {
|
||||
const snapshot = await this._xapi.getRecord('VDI', snapshotRef)
|
||||
debug('handling snapshot', { snapshot })
|
||||
|
||||
// take only the first snapshot
|
||||
if (snapshotCandidate && snapshotCandidate.snapshot_time < snapshot.snapshot_time) {
|
||||
debug('already got a better candidate')
|
||||
continue
|
||||
}
|
||||
|
||||
// have a corresponding backup more recent than metadata ?
|
||||
const pathToSnapshotData = await this.#getPathOfVdiSnapshot(snapshot.uuid)
|
||||
if (pathToSnapshotData === undefined) {
|
||||
debug('no backup linked to this snaphot')
|
||||
continue
|
||||
}
|
||||
if (snapshot.$SR.uuid !== (mapVdisSrs[vdi.$snapshot_of$uuid] ?? this._srUuid)) {
|
||||
debug('not restored on the same SR', { snapshotSr: snapshot.$SR.uuid, mapVdisSrs, srUuid: this._srUuid })
|
||||
continue
|
||||
}
|
||||
|
||||
debug('got a candidate', pathToSnapshotData)
|
||||
|
||||
snapshotCandidate = snapshot
|
||||
backupCandidate = pathToSnapshotData
|
||||
}
|
||||
}
|
||||
|
||||
let stream
|
||||
const backupWithSnapshotPath = join(metdataDir, backupCandidate ?? '')
|
||||
if (vhdPath === backupWithSnapshotPath) {
|
||||
// all the data are already on the host
|
||||
debug('direct reuse of a snapshot')
|
||||
stream = null
|
||||
vdis[vdiRef].baseVdi = snapshotCandidate
|
||||
// go next disk , we won't use this stream
|
||||
continue
|
||||
}
|
||||
|
||||
let disposableDescendants
|
||||
|
||||
const disposableSynthetic = await VhdSynthetic.fromVhdChain(this._adapter._handler, vhdPath)
|
||||
|
||||
// this will also clean if another disk of this VM backup fails
|
||||
// if user really only need to restore non failing disks he can retry with ignoredVdis
|
||||
let disposed = false
|
||||
const disposeOnce = async () => {
|
||||
if (!disposed) {
|
||||
disposed = true
|
||||
try {
|
||||
await disposableDescendants?.dispose()
|
||||
await disposableSynthetic?.dispose()
|
||||
} catch (error) {
|
||||
warn('openVhd: failed to dispose VHDs', { error })
|
||||
}
|
||||
}
|
||||
}
|
||||
$defer.onFailure(() => disposeOnce())
|
||||
|
||||
const parentVhd = disposableSynthetic.value
|
||||
await parentVhd.readBlockAllocationTable()
|
||||
debug('got vhd synthetic of parents', parentVhd.length)
|
||||
|
||||
if (snapshotCandidate !== undefined) {
|
||||
try {
|
||||
debug('will try to use differential restore', {
|
||||
backupWithSnapshotPath,
|
||||
vhdPath,
|
||||
vdiRef,
|
||||
})
|
||||
|
||||
disposableDescendants = await VhdSynthetic.fromVhdChain(this._adapter._handler, backupWithSnapshotPath, {
|
||||
until: vhdPath,
|
||||
})
|
||||
const descendantsVhd = disposableDescendants.value
|
||||
await descendantsVhd.readBlockAllocationTable()
|
||||
debug('got vhd synthetic of descendants')
|
||||
const negativeVhd = new VhdNegative(parentVhd, descendantsVhd)
|
||||
debug('got vhd negative')
|
||||
|
||||
// update the stream with the negative vhd stream
|
||||
stream = await negativeVhd.stream()
|
||||
vdis[vdiRef].baseVdi = snapshotCandidate
|
||||
} catch (err) {
|
||||
// can be a broken VHD chain, a vhd chain with a key backup, ....
|
||||
// not an irrecuperable error, don't dispose parentVhd, and fallback to full restore
|
||||
warn(`can't use differential restore`, err)
|
||||
disposableDescendants?.dispose()
|
||||
}
|
||||
}
|
||||
// didn't make a negative stream : fallback to classic stream
|
||||
if (stream === undefined) {
|
||||
debug('use legacy restore')
|
||||
stream = await parentVhd.stream()
|
||||
}
|
||||
|
||||
stream.on('end', disposeOnce)
|
||||
stream.on('close', disposeOnce)
|
||||
stream.on('error', disposeOnce)
|
||||
info('everything is ready, will transfer', stream.length)
|
||||
streams[`${vdiRef}.vhd`] = stream
|
||||
}
|
||||
return {
|
||||
streams,
|
||||
vbds,
|
||||
vdis,
|
||||
version: '1.0.0',
|
||||
vifs,
|
||||
vm: { ...vm, suspend_VDI: vmSnapshot.suspend_VDI },
|
||||
}
|
||||
}
|
||||
|
||||
async #decorateIncrementalVmMetadata() {
|
||||
const { additionnalVmTag, mapVdisSrs, useDifferentialRestore } = this._importIncrementalVmSettings
|
||||
|
||||
const ignoredVdis = new Set(
|
||||
Object.entries(mapVdisSrs)
|
||||
.filter(([_, srUuid]) => srUuid === null)
|
||||
.map(([vdiUuid]) => vdiUuid)
|
||||
)
|
||||
let backup
|
||||
if (useDifferentialRestore) {
|
||||
backup = await this._reuseNearestSnapshot(ignoredVdis)
|
||||
} else {
|
||||
backup = await this._adapter.readIncrementalVmBackup(this._metadata, ignoredVdis)
|
||||
}
|
||||
const xapi = this._xapi
|
||||
|
||||
const cache = new Map()
|
||||
const mapVdisSrRefs = {}
|
||||
if (additionnalVmTag !== undefined) {
|
||||
backup.vm.tags.push(additionnalVmTag)
|
||||
}
|
||||
for (const [vdiUuid, srUuid] of Object.entries(mapVdisSrs)) {
|
||||
mapVdisSrRefs[vdiUuid] = await resolveUuid(xapi, cache, srUuid, 'SR')
|
||||
}
|
||||
const srRef = await resolveUuid(xapi, cache, this._srUuid, 'SR')
|
||||
Object.values(backup.vdis).forEach(vdi => {
|
||||
vdi.SR = mapVdisSrRefs[vdi.uuid] ?? srRef
|
||||
})
|
||||
return backup
|
||||
}
|
||||
|
||||
async run() {
|
||||
const adapter = this._adapter
|
||||
const metadata = this._metadata
|
||||
const isFull = metadata.mode === 'full'
|
||||
|
||||
const sizeContainer = { size: 0 }
|
||||
const { newMacAddresses } = this._importIncrementalVmSettings
|
||||
let backup
|
||||
if (isFull) {
|
||||
backup = await adapter.readFullVmBackup(metadata)
|
||||
watchStreamSize(backup, sizeContainer)
|
||||
} else {
|
||||
assert.strictEqual(metadata.mode, 'delta')
|
||||
|
||||
backup = await this.#decorateIncrementalVmMetadata()
|
||||
Object.values(backup.streams).forEach(stream => watchStreamSize(stream, sizeContainer))
|
||||
}
|
||||
|
||||
return Task.run(
|
||||
{
|
||||
name: 'transfer',
|
||||
},
|
||||
async () => {
|
||||
const xapi = this._xapi
|
||||
const srRef = await xapi.call('SR.get_by_uuid', this._srUuid)
|
||||
|
||||
const vmRef = isFull
|
||||
? await xapi.VM_import(backup, srRef)
|
||||
: await importIncrementalVm(backup, await xapi.getRecord('SR', srRef), {
|
||||
newMacAddresses,
|
||||
})
|
||||
|
||||
await Promise.all([
|
||||
xapi.call('VM.add_tags', vmRef, 'restored from backup'),
|
||||
xapi.call(
|
||||
'VM.set_name_label',
|
||||
vmRef,
|
||||
`${metadata.vm.name_label} (${formatFilenameDate(metadata.timestamp)})`
|
||||
),
|
||||
xapi.call(
|
||||
'VM.set_name_description',
|
||||
vmRef,
|
||||
`Restored on ${formatFilenameDate(+new Date())} from ${adapter._handler._remote.name} -
|
||||
${metadata.vm.name_description}
|
||||
`
|
||||
),
|
||||
])
|
||||
|
||||
return {
|
||||
size: sizeContainer.size,
|
||||
id: await xapi.getField('VM', vmRef, 'uuid'),
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
decorateClass(ImportVmBackup, { _reuseNearestSnapshot: defer })
|
||||
@@ -1,39 +1,43 @@
|
||||
import { asyncEach } from '@vates/async-each'
|
||||
import { asyncMap, asyncMapSettled } from '@xen-orchestra/async-map'
|
||||
import { compose } from '@vates/compose'
|
||||
import { createLogger } from '@xen-orchestra/log'
|
||||
import { createVhdDirectoryFromStream, openVhd, VhdAbstract, VhdDirectory, VhdSynthetic } from 'vhd-lib'
|
||||
import { decorateMethodsWith } from '@vates/decorate-with'
|
||||
import { deduped } from '@vates/disposable/deduped.js'
|
||||
import { dirname, join, resolve } from 'node:path'
|
||||
import { execFile } from 'child_process'
|
||||
import { mount } from '@vates/fuse-vhd'
|
||||
import { readdir, lstat } from 'node:fs/promises'
|
||||
import { synchronized } from 'decorator-synchronized'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
import { ZipFile } from 'yazl'
|
||||
import Disposable from 'promise-toolbox/Disposable'
|
||||
import fromCallback from 'promise-toolbox/fromCallback'
|
||||
import fromEvent from 'promise-toolbox/fromEvent'
|
||||
import groupBy from 'lodash/groupBy.js'
|
||||
import pDefer from 'promise-toolbox/defer'
|
||||
import pickBy from 'lodash/pickBy.js'
|
||||
import tar from 'tar'
|
||||
import zlib from 'zlib'
|
||||
'use strict'
|
||||
|
||||
import { BACKUP_DIR } from './_getVmBackupDir.mjs'
|
||||
import { cleanVm } from './_cleanVm.mjs'
|
||||
import { formatFilenameDate } from './_filenameDate.mjs'
|
||||
import { getTmpDir } from './_getTmpDir.mjs'
|
||||
import { isMetadataFile } from './_backupType.mjs'
|
||||
import { isValidXva } from './_isValidXva.mjs'
|
||||
import { listPartitions, LVM_PARTITION_TYPE } from './_listPartitions.mjs'
|
||||
import { lvs, pvs } from './_lvm.mjs'
|
||||
import { watchStreamSize } from './_watchStreamSize.mjs'
|
||||
const { asyncMap, asyncMapSettled } = require('@xen-orchestra/async-map')
|
||||
const { synchronized } = require('decorator-synchronized')
|
||||
const Disposable = require('promise-toolbox/Disposable')
|
||||
const fromCallback = require('promise-toolbox/fromCallback')
|
||||
const fromEvent = require('promise-toolbox/fromEvent')
|
||||
const pDefer = require('promise-toolbox/defer')
|
||||
const groupBy = require('lodash/groupBy.js')
|
||||
const pickBy = require('lodash/pickBy.js')
|
||||
const { dirname, join, normalize, resolve } = require('path')
|
||||
const { createLogger } = require('@xen-orchestra/log')
|
||||
const { createVhdDirectoryFromStream, openVhd, VhdAbstract, VhdDirectory, VhdSynthetic } = require('vhd-lib')
|
||||
const { deduped } = require('@vates/disposable/deduped.js')
|
||||
const { decorateMethodsWith } = require('@vates/decorate-with')
|
||||
const { compose } = require('@vates/compose')
|
||||
const { execFile } = require('child_process')
|
||||
const { readdir, lstat } = require('fs-extra')
|
||||
const { v4: uuidv4 } = require('uuid')
|
||||
const { ZipFile } = require('yazl')
|
||||
const zlib = require('zlib')
|
||||
|
||||
export const DIR_XO_CONFIG_BACKUPS = 'xo-config-backups'
|
||||
const { BACKUP_DIR } = require('./_getVmBackupDir.js')
|
||||
const { cleanVm } = require('./_cleanVm.js')
|
||||
const { formatFilenameDate } = require('./_filenameDate.js')
|
||||
const { getTmpDir } = require('./_getTmpDir.js')
|
||||
const { isMetadataFile } = require('./_backupType.js')
|
||||
const { isValidXva } = require('./_isValidXva.js')
|
||||
const { listPartitions, LVM_PARTITION_TYPE } = require('./_listPartitions.js')
|
||||
const { lvs, pvs } = require('./_lvm.js')
|
||||
const { watchStreamSize } = require('./_watchStreamSize')
|
||||
// @todo : this import is marked extraneous , sould be fixed when lib is published
|
||||
const { mount } = require('@vates/fuse-vhd')
|
||||
const { asyncEach } = require('@vates/async-each')
|
||||
|
||||
export const DIR_XO_POOL_METADATA_BACKUPS = 'xo-pool-metadata-backups'
|
||||
const DIR_XO_CONFIG_BACKUPS = 'xo-config-backups'
|
||||
exports.DIR_XO_CONFIG_BACKUPS = DIR_XO_CONFIG_BACKUPS
|
||||
|
||||
const DIR_XO_POOL_METADATA_BACKUPS = 'xo-pool-metadata-backups'
|
||||
exports.DIR_XO_POOL_METADATA_BACKUPS = DIR_XO_POOL_METADATA_BACKUPS
|
||||
|
||||
const { debug, warn } = createLogger('xo:backups:RemoteAdapter')
|
||||
|
||||
@@ -42,23 +46,20 @@ const compareTimestamp = (a, b) => a.timestamp - b.timestamp
|
||||
const noop = Function.prototype
|
||||
|
||||
const resolveRelativeFromFile = (file, path) => resolve('/', dirname(file), path).slice(1)
|
||||
const makeRelative = path => resolve('/', path).slice(1)
|
||||
const resolveSubpath = (root, path) => resolve(root, makeRelative(path))
|
||||
|
||||
async function addZipEntries(zip, realBasePath, virtualBasePath, relativePaths) {
|
||||
for (const relativePath of relativePaths) {
|
||||
const realPath = join(realBasePath, relativePath)
|
||||
const virtualPath = join(virtualBasePath, relativePath)
|
||||
const resolveSubpath = (root, path) => resolve(root, `.${resolve('/', path)}`)
|
||||
|
||||
const stats = await lstat(realPath)
|
||||
const { mode, mtime } = stats
|
||||
const opts = { mode, mtime }
|
||||
if (stats.isDirectory()) {
|
||||
zip.addEmptyDirectory(virtualPath, opts)
|
||||
await addZipEntries(zip, realPath, virtualPath, await readdir(realPath))
|
||||
} else if (stats.isFile()) {
|
||||
zip.addFile(realPath, virtualPath, opts)
|
||||
}
|
||||
async function addDirectory(files, realPath, metadataPath) {
|
||||
const stats = await lstat(realPath)
|
||||
if (stats.isDirectory()) {
|
||||
await asyncMap(await readdir(realPath), file =>
|
||||
addDirectory(files, realPath + '/' + file, metadataPath + '/' + file)
|
||||
)
|
||||
} else if (stats.isFile()) {
|
||||
files.push({
|
||||
realPath,
|
||||
metadataPath,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -75,7 +76,7 @@ const debounceResourceFactory = factory =>
|
||||
return this._debounceResource(factory.apply(this, arguments))
|
||||
}
|
||||
|
||||
export class RemoteAdapter {
|
||||
class RemoteAdapter {
|
||||
constructor(
|
||||
handler,
|
||||
{ debounceResource = res => res, dirMode, vhdDirectoryCompression, useGetDiskLegacy = false } = {}
|
||||
@@ -186,6 +187,17 @@ export class RemoteAdapter {
|
||||
})
|
||||
}
|
||||
|
||||
async *_usePartitionFiles(diskId, partitionId, paths) {
|
||||
const path = yield this.getPartition(diskId, partitionId)
|
||||
|
||||
const files = []
|
||||
await asyncMap(paths, file =>
|
||||
addDirectory(files, resolveSubpath(path, file), normalize('./' + file).replace(/\/+$/, ''))
|
||||
)
|
||||
|
||||
return files
|
||||
}
|
||||
|
||||
// check if we will be allowed to merge a a vhd created in this adapter
|
||||
// with the vhd at path `path`
|
||||
async isMergeableParent(packedParentUid, path) {
|
||||
@@ -202,24 +214,15 @@ export class RemoteAdapter {
|
||||
})
|
||||
}
|
||||
|
||||
fetchPartitionFiles(diskId, partitionId, paths, format) {
|
||||
fetchPartitionFiles(diskId, partitionId, paths) {
|
||||
const { promise, reject, resolve } = pDefer()
|
||||
Disposable.use(
|
||||
async function* () {
|
||||
const path = yield this.getPartition(diskId, partitionId)
|
||||
let outputStream
|
||||
|
||||
if (format === 'tgz') {
|
||||
outputStream = tar.c({ cwd: path, gzip: true }, paths.map(makeRelative))
|
||||
} else if (format === 'zip') {
|
||||
const zip = new ZipFile()
|
||||
await addZipEntries(zip, path, '', paths.map(makeRelative))
|
||||
zip.end()
|
||||
;({ outputStream } = zip)
|
||||
} else {
|
||||
throw new Error('unsupported format ' + format)
|
||||
}
|
||||
|
||||
const files = yield this._usePartitionFiles(diskId, partitionId, paths)
|
||||
const zip = new ZipFile()
|
||||
files.forEach(({ realPath, metadataPath }) => zip.addFile(realPath, metadataPath))
|
||||
zip.end()
|
||||
const { outputStream } = zip
|
||||
resolve(outputStream)
|
||||
await fromEvent(outputStream, 'end')
|
||||
}.bind(this)
|
||||
@@ -681,13 +684,11 @@ export class RemoteAdapter {
|
||||
}
|
||||
}
|
||||
|
||||
async outputStream(path, input, { checksum = true, maxStreamLength, streamLength, validator = noop } = {}) {
|
||||
async outputStream(path, input, { checksum = true, validator = noop } = {}) {
|
||||
const container = watchStreamSize(input)
|
||||
await this._handler.outputStream(path, input, {
|
||||
checksum,
|
||||
dirMode: this._dirMode,
|
||||
maxStreamLength,
|
||||
streamLength,
|
||||
async validator() {
|
||||
await input.task
|
||||
return validator.apply(this, arguments)
|
||||
@@ -828,7 +829,11 @@ decorateMethodsWith(RemoteAdapter, {
|
||||
debounceResourceFactory,
|
||||
]),
|
||||
|
||||
_usePartitionFiles: Disposable.factory,
|
||||
|
||||
getDisk: compose([Disposable.factory, [deduped, diskId => [diskId]], debounceResourceFactory]),
|
||||
|
||||
getPartition: Disposable.factory,
|
||||
})
|
||||
|
||||
exports.RemoteAdapter = RemoteAdapter
|
||||
29
@xen-orchestra/backups/RestoreMetadataBackup.js
Normal file
29
@xen-orchestra/backups/RestoreMetadataBackup.js
Normal file
@@ -0,0 +1,29 @@
|
||||
'use strict'
|
||||
|
||||
const { join, resolve } = require('node:path/posix')
|
||||
|
||||
const { DIR_XO_POOL_METADATA_BACKUPS } = require('./RemoteAdapter.js')
|
||||
const { PATH_DB_DUMP } = require('./_runners/_PoolMetadataBackup.js')
|
||||
|
||||
exports.RestoreMetadataBackup = class RestoreMetadataBackup {
|
||||
constructor({ backupId, handler, xapi }) {
|
||||
this._backupId = backupId
|
||||
this._handler = handler
|
||||
this._xapi = xapi
|
||||
}
|
||||
|
||||
async run() {
|
||||
const backupId = this._backupId
|
||||
const handler = this._handler
|
||||
const xapi = this._xapi
|
||||
|
||||
if (backupId.split('/')[0] === DIR_XO_POOL_METADATA_BACKUPS) {
|
||||
return xapi.putResource(await handler.createReadStream(`${backupId}/data`), PATH_DB_DUMP, {
|
||||
task: xapi.task_create('Import pool metadata'),
|
||||
})
|
||||
} else {
|
||||
const metadata = JSON.parse(await handler.readFile(join(backupId, 'metadata.json')))
|
||||
return String(await handler.readFile(resolve(backupId, metadata.data ?? 'data.json')))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,32 +0,0 @@
|
||||
import { join, resolve } from 'node:path/posix'
|
||||
|
||||
import { DIR_XO_POOL_METADATA_BACKUPS } from './RemoteAdapter.mjs'
|
||||
import { PATH_DB_DUMP } from './_runners/_PoolMetadataBackup.mjs'
|
||||
|
||||
export class RestoreMetadataBackup {
|
||||
constructor({ backupId, handler, xapi }) {
|
||||
this._backupId = backupId
|
||||
this._handler = handler
|
||||
this._xapi = xapi
|
||||
}
|
||||
|
||||
async run() {
|
||||
const backupId = this._backupId
|
||||
const handler = this._handler
|
||||
const xapi = this._xapi
|
||||
|
||||
if (backupId.split('/')[0] === DIR_XO_POOL_METADATA_BACKUPS) {
|
||||
return xapi.putResource(await handler.createReadStream(`${backupId}/data`), PATH_DB_DUMP, {
|
||||
task: xapi.task_create('Import pool metadata'),
|
||||
})
|
||||
} else {
|
||||
const metadata = JSON.parse(await handler.readFile(join(backupId, 'metadata.json')))
|
||||
const dataFileName = resolve(backupId, metadata.data ?? 'data.json')
|
||||
const data = await handler.readFile(dataFileName)
|
||||
|
||||
// if data is JSON, sent it as a plain string, otherwise, consider the data as binary and encode it
|
||||
const isJson = dataFileName.endsWith('.json')
|
||||
return isJson ? data.toString() : { encoding: 'base64', data: data.toString('base64') }
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,7 @@
|
||||
import CancelToken from 'promise-toolbox/CancelToken'
|
||||
import Zone from 'node-zone'
|
||||
'use strict'
|
||||
|
||||
const CancelToken = require('promise-toolbox/CancelToken')
|
||||
const Zone = require('node-zone')
|
||||
|
||||
const logAfterEnd = log => {
|
||||
const error = new Error('task has already ended')
|
||||
@@ -28,7 +30,7 @@ const serializeError = error =>
|
||||
|
||||
const $$task = Symbol('@xen-orchestra/backups/Task')
|
||||
|
||||
export class Task {
|
||||
class Task {
|
||||
static get cancelToken() {
|
||||
const task = Zone.current.data[$$task]
|
||||
return task !== undefined ? task.#cancelToken : CancelToken.none
|
||||
@@ -149,6 +151,7 @@ export class Task {
|
||||
})
|
||||
}
|
||||
}
|
||||
exports.Task = Task
|
||||
|
||||
for (const method of ['info', 'warning']) {
|
||||
Task[method] = (...args) => Zone.current.data[$$task]?.[method](...args)
|
||||
6
@xen-orchestra/backups/_backupType.js
Normal file
6
@xen-orchestra/backups/_backupType.js
Normal file
@@ -0,0 +1,6 @@
|
||||
'use strict'
|
||||
|
||||
exports.isMetadataFile = filename => filename.endsWith('.json')
|
||||
exports.isVhdFile = filename => filename.endsWith('.vhd')
|
||||
exports.isXvaFile = filename => filename.endsWith('.xva')
|
||||
exports.isXvaSumFile = filename => filename.endsWith('.xva.checksum')
|
||||
@@ -1,4 +0,0 @@
|
||||
export const isMetadataFile = filename => filename.endsWith('.json')
|
||||
export const isVhdFile = filename => filename.endsWith('.vhd')
|
||||
export const isXvaFile = filename => filename.endsWith('.xva')
|
||||
export const isXvaSumFile = filename => filename.endsWith('.xva.checksum')
|
||||
@@ -1,25 +1,25 @@
|
||||
import { createLogger } from '@xen-orchestra/log'
|
||||
import { catchGlobalErrors } from '@xen-orchestra/log/configure'
|
||||
'use strict'
|
||||
|
||||
import Disposable from 'promise-toolbox/Disposable'
|
||||
import ignoreErrors from 'promise-toolbox/ignoreErrors'
|
||||
import { compose } from '@vates/compose'
|
||||
import { createCachedLookup } from '@vates/cached-dns.lookup'
|
||||
import { createDebounceResource } from '@vates/disposable/debounceResource.js'
|
||||
import { createRunner } from './Backup.mjs'
|
||||
import { decorateMethodsWith } from '@vates/decorate-with'
|
||||
import { deduped } from '@vates/disposable/deduped.js'
|
||||
import { getHandler } from '@xen-orchestra/fs'
|
||||
import { parseDuration } from '@vates/parse-duration'
|
||||
import { Xapi } from '@xen-orchestra/xapi'
|
||||
const logger = require('@xen-orchestra/log').createLogger('xo:backups:worker')
|
||||
|
||||
import { RemoteAdapter } from './RemoteAdapter.mjs'
|
||||
import { Task } from './Task.mjs'
|
||||
require('@xen-orchestra/log/configure').catchGlobalErrors(logger)
|
||||
|
||||
createCachedLookup().patchGlobal()
|
||||
require('@vates/cached-dns.lookup').createCachedLookup().patchGlobal()
|
||||
|
||||
const Disposable = require('promise-toolbox/Disposable')
|
||||
const ignoreErrors = require('promise-toolbox/ignoreErrors')
|
||||
const { compose } = require('@vates/compose')
|
||||
const { createDebounceResource } = require('@vates/disposable/debounceResource.js')
|
||||
const { decorateMethodsWith } = require('@vates/decorate-with')
|
||||
const { deduped } = require('@vates/disposable/deduped.js')
|
||||
const { getHandler } = require('@xen-orchestra/fs')
|
||||
const { createRunner } = require('./Backup.js')
|
||||
const { parseDuration } = require('@vates/parse-duration')
|
||||
const { Xapi } = require('@xen-orchestra/xapi')
|
||||
|
||||
const { RemoteAdapter } = require('./RemoteAdapter.js')
|
||||
const { Task } = require('./Task.js')
|
||||
|
||||
const logger = createLogger('xo:backups:worker')
|
||||
catchGlobalErrors(logger)
|
||||
const { debug } = logger
|
||||
|
||||
class BackupWorker {
|
||||
@@ -1,11 +1,13 @@
|
||||
import cancelable from 'promise-toolbox/cancelable'
|
||||
import CancelToken from 'promise-toolbox/CancelToken'
|
||||
'use strict'
|
||||
|
||||
const cancelable = require('promise-toolbox/cancelable')
|
||||
const CancelToken = require('promise-toolbox/CancelToken')
|
||||
|
||||
// Similar to `Promise.all` + `map` but pass a cancel token to the callback
|
||||
//
|
||||
// If any of the executions fails, the cancel token will be triggered and the
|
||||
// first reason will be rejected.
|
||||
export const cancelableMap = cancelable(async function cancelableMap($cancelToken, iterable, callback) {
|
||||
exports.cancelableMap = cancelable(async function cancelableMap($cancelToken, iterable, callback) {
|
||||
const { cancel, token } = CancelToken.source([$cancelToken])
|
||||
try {
|
||||
return await Promise.all(
|
||||
@@ -1,19 +1,19 @@
|
||||
import test from 'test'
|
||||
import { strict as assert } from 'node:assert'
|
||||
'use strict'
|
||||
|
||||
import tmp from 'tmp'
|
||||
import fs from 'fs-extra'
|
||||
import * as uuid from 'uuid'
|
||||
import { getHandler } from '@xen-orchestra/fs'
|
||||
import { pFromCallback } from 'promise-toolbox'
|
||||
import { RemoteAdapter } from './RemoteAdapter.mjs'
|
||||
import { VHDFOOTER, VHDHEADER } from './tests.fixtures.mjs'
|
||||
import { VhdFile, Constants, VhdDirectory, VhdAbstract } from 'vhd-lib'
|
||||
import { checkAliases } from './_cleanVm.mjs'
|
||||
import { dirname, basename } from 'node:path'
|
||||
import { rimraf } from 'rimraf'
|
||||
const { beforeEach, afterEach, test, describe } = require('test')
|
||||
const assert = require('assert').strict
|
||||
|
||||
const { beforeEach, afterEach, describe } = test
|
||||
const tmp = require('tmp')
|
||||
const fs = require('fs-extra')
|
||||
const uuid = require('uuid')
|
||||
const { getHandler } = require('@xen-orchestra/fs')
|
||||
const { pFromCallback } = require('promise-toolbox')
|
||||
const { RemoteAdapter } = require('./RemoteAdapter')
|
||||
const { VHDFOOTER, VHDHEADER } = require('./tests.fixtures.js')
|
||||
const { VhdFile, Constants, VhdDirectory, VhdAbstract } = require('vhd-lib')
|
||||
const { checkAliases } = require('./_cleanVm')
|
||||
const { dirname, basename } = require('path')
|
||||
const { rimraf } = require('rimraf')
|
||||
|
||||
let tempDir, adapter, handler, jobId, vdiId, basePath, relativePath
|
||||
const rootPath = 'xo-vm-backups/VMUUID/'
|
||||
@@ -67,11 +67,6 @@ async function generateVhd(path, opts = {}) {
|
||||
await VhdAbstract.createAlias(handler, path + '.alias.vhd', dataPath)
|
||||
}
|
||||
|
||||
if (opts.blocks) {
|
||||
for (const blockId of opts.blocks) {
|
||||
await vhd.writeEntireBlock({ id: blockId, buffer: Buffer.alloc(2 * 1024 * 1024 + 512, blockId) })
|
||||
}
|
||||
}
|
||||
await vhd.writeBlockAllocationTable()
|
||||
await vhd.writeHeader()
|
||||
await vhd.writeFooter()
|
||||
@@ -235,7 +230,7 @@ test('it merges delta of non destroyed chain', async () => {
|
||||
|
||||
const metadata = JSON.parse(await handler.readFile(`${rootPath}/metadata.json`))
|
||||
// size should be the size of children + grand children after the merge
|
||||
assert.equal(metadata.size, 104960)
|
||||
assert.equal(metadata.size, 209920)
|
||||
|
||||
// merging is already tested in vhd-lib, don't retest it here (and theses vhd are as empty as my stomach at 12h12)
|
||||
// only check deletion
|
||||
@@ -325,7 +320,6 @@ describe('tests multiple combination ', () => {
|
||||
const ancestor = await generateVhd(`${basePath}/ancestor.vhd`, {
|
||||
useAlias,
|
||||
mode: vhdMode,
|
||||
blocks: [1, 3],
|
||||
})
|
||||
const child = await generateVhd(`${basePath}/child.vhd`, {
|
||||
useAlias,
|
||||
@@ -334,7 +328,6 @@ describe('tests multiple combination ', () => {
|
||||
parentUnicodeName: 'ancestor.vhd' + (useAlias ? '.alias.vhd' : ''),
|
||||
parentUuid: ancestor.footer.uuid,
|
||||
},
|
||||
blocks: [1, 2],
|
||||
})
|
||||
// a grand child vhd in metadata
|
||||
await generateVhd(`${basePath}/grandchild.vhd`, {
|
||||
@@ -344,7 +337,6 @@ describe('tests multiple combination ', () => {
|
||||
parentUnicodeName: 'child.vhd' + (useAlias ? '.alias.vhd' : ''),
|
||||
parentUuid: child.footer.uuid,
|
||||
},
|
||||
blocks: [2, 3],
|
||||
})
|
||||
|
||||
// an older parent that was merging in clean
|
||||
@@ -403,7 +395,7 @@ describe('tests multiple combination ', () => {
|
||||
|
||||
const metadata = JSON.parse(await handler.readFile(`${rootPath}/metadata.json`))
|
||||
// size should be the size of children + grand children + clean after the merge
|
||||
assert.deepEqual(metadata.size, vhdMode === 'file' ? 6502400 : 6501888)
|
||||
assert.deepEqual(metadata.size, vhdMode === 'file' ? 314880 : undefined)
|
||||
|
||||
// broken vhd, non referenced, abandonned should be deleted ( alias and data)
|
||||
// ancestor and child should be merged
|
||||
@@ -1,18 +1,19 @@
|
||||
import * as UUID from 'uuid'
|
||||
import sum from 'lodash/sum.js'
|
||||
import { asyncMap } from '@xen-orchestra/async-map'
|
||||
import { Constants, openVhd, VhdAbstract, VhdFile } from 'vhd-lib'
|
||||
import { isVhdAlias, resolveVhdAlias } from 'vhd-lib/aliases.js'
|
||||
import { dirname, resolve } from 'node:path'
|
||||
import { isMetadataFile, isVhdFile, isXvaFile, isXvaSumFile } from './_backupType.mjs'
|
||||
import { limitConcurrency } from 'limit-concurrency-decorator'
|
||||
import { mergeVhdChain } from 'vhd-lib/merge.js'
|
||||
|
||||
import { Task } from './Task.mjs'
|
||||
import { Disposable } from 'promise-toolbox'
|
||||
import handlerPath from '@xen-orchestra/fs/path'
|
||||
'use strict'
|
||||
|
||||
const sum = require('lodash/sum')
|
||||
const UUID = require('uuid')
|
||||
const { asyncMap } = require('@xen-orchestra/async-map')
|
||||
const { Constants, openVhd, VhdAbstract, VhdFile } = require('vhd-lib')
|
||||
const { isVhdAlias, resolveVhdAlias } = require('vhd-lib/aliases')
|
||||
const { dirname, resolve } = require('path')
|
||||
const { DISK_TYPES } = Constants
|
||||
const { isMetadataFile, isVhdFile, isXvaFile, isXvaSumFile } = require('./_backupType.js')
|
||||
const { limitConcurrency } = require('limit-concurrency-decorator')
|
||||
const { mergeVhdChain } = require('vhd-lib/merge')
|
||||
|
||||
const { Task } = require('./Task.js')
|
||||
const { Disposable } = require('promise-toolbox')
|
||||
const handlerPath = require('@xen-orchestra/fs/path')
|
||||
|
||||
// checking the size of a vhd directory is costly
|
||||
// 1 Http Query per 1000 blocks
|
||||
@@ -36,32 +37,34 @@ const computeVhdsSize = (handler, vhdPaths) =>
|
||||
)
|
||||
|
||||
// chain is [ ancestor, child_1, ..., child_n ]
|
||||
async function _mergeVhdChain(handler, chain, { logInfo, remove, mergeBlockConcurrency }) {
|
||||
logInfo(`merging VHD chain`, { chain })
|
||||
async function _mergeVhdChain(handler, chain, { logInfo, remove, merge, mergeBlockConcurrency }) {
|
||||
if (merge) {
|
||||
logInfo(`merging VHD chain`, { chain })
|
||||
|
||||
let done, total
|
||||
const handle = setInterval(() => {
|
||||
if (done !== undefined) {
|
||||
logInfo('merge in progress', {
|
||||
done,
|
||||
parent: chain[0],
|
||||
progress: Math.round((100 * done) / total),
|
||||
total,
|
||||
let done, total
|
||||
const handle = setInterval(() => {
|
||||
if (done !== undefined) {
|
||||
logInfo('merge in progress', {
|
||||
done,
|
||||
parent: chain[0],
|
||||
progress: Math.round((100 * done) / total),
|
||||
total,
|
||||
})
|
||||
}
|
||||
}, 10e3)
|
||||
try {
|
||||
return await mergeVhdChain(handler, chain, {
|
||||
logInfo,
|
||||
mergeBlockConcurrency,
|
||||
onProgress({ done: d, total: t }) {
|
||||
done = d
|
||||
total = t
|
||||
},
|
||||
removeUnused: remove,
|
||||
})
|
||||
} finally {
|
||||
clearInterval(handle)
|
||||
}
|
||||
}, 10e3)
|
||||
try {
|
||||
return await mergeVhdChain(handler, chain, {
|
||||
logInfo,
|
||||
mergeBlockConcurrency,
|
||||
onProgress({ done: d, total: t }) {
|
||||
done = d
|
||||
total = t
|
||||
},
|
||||
removeUnused: remove,
|
||||
})
|
||||
} finally {
|
||||
clearInterval(handle)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -114,7 +117,7 @@ const listVhds = async (handler, vmDir, logWarn) => {
|
||||
return { vhds, interruptedVhds, aliases }
|
||||
}
|
||||
|
||||
export async function checkAliases(
|
||||
async function checkAliases(
|
||||
aliasPaths,
|
||||
targetDataRepository,
|
||||
{ handler, logInfo = noop, logWarn = console.warn, remove = false }
|
||||
@@ -173,9 +176,11 @@ export async function checkAliases(
|
||||
})
|
||||
}
|
||||
|
||||
exports.checkAliases = checkAliases
|
||||
|
||||
const defaultMergeLimiter = limitConcurrency(1)
|
||||
|
||||
export async function cleanVm(
|
||||
exports.cleanVm = async function cleanVm(
|
||||
vmDir,
|
||||
{
|
||||
fixMetadata,
|
||||
@@ -469,20 +474,23 @@ export async function cleanVm(
|
||||
const metadataWithMergedVhd = {}
|
||||
const doMerge = async () => {
|
||||
await asyncMap(toMerge, async chain => {
|
||||
const { finalVhdSize } = await limitedMergeVhdChain(handler, chain, {
|
||||
const merged = await limitedMergeVhdChain(handler, chain, {
|
||||
logInfo,
|
||||
logWarn,
|
||||
remove,
|
||||
merge,
|
||||
mergeBlockConcurrency,
|
||||
})
|
||||
const metadataPath = vhdsToJSons[chain[chain.length - 1]] // all the chain should have the same metada file
|
||||
metadataWithMergedVhd[metadataPath] = (metadataWithMergedVhd[metadataPath] ?? 0) + finalVhdSize
|
||||
if (merged !== undefined) {
|
||||
const metadataPath = vhdsToJSons[chain[chain.length - 1]] // all the chain should have the same metada file
|
||||
metadataWithMergedVhd[metadataPath] = true
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
await Promise.all([
|
||||
...unusedVhdsDeletion,
|
||||
toMerge.length !== 0 && (merge ? Task.run({ name: 'merge' }, doMerge) : () => Promise.resolve()),
|
||||
toMerge.length !== 0 && (merge ? Task.run({ name: 'merge' }, doMerge) : doMerge()),
|
||||
asyncMap(unusedXvas, path => {
|
||||
logWarn('unused XVA', { path })
|
||||
if (remove) {
|
||||
@@ -504,11 +512,12 @@ export async function cleanVm(
|
||||
|
||||
// update size for delta metadata with merged VHD
|
||||
// check for the other that the size is the same as the real file size
|
||||
|
||||
await asyncMap(jsons, async metadataPath => {
|
||||
const metadata = backups.get(metadataPath)
|
||||
|
||||
let fileSystemSize
|
||||
const mergedSize = metadataWithMergedVhd[metadataPath]
|
||||
const merged = metadataWithMergedVhd[metadataPath] !== undefined
|
||||
|
||||
const { mode, size, vhds, xva } = metadata
|
||||
|
||||
@@ -518,29 +527,26 @@ export async function cleanVm(
|
||||
const linkedXva = resolve('/', vmDir, xva)
|
||||
try {
|
||||
fileSystemSize = await handler.getSize(linkedXva)
|
||||
if (fileSystemSize !== size && fileSystemSize !== undefined) {
|
||||
logWarn('cleanVm: incorrect backup size in metadata', {
|
||||
path: metadataPath,
|
||||
actual: size ?? 'none',
|
||||
expected: fileSystemSize,
|
||||
})
|
||||
}
|
||||
} catch (error) {
|
||||
// can fail with encrypted remote
|
||||
}
|
||||
} else if (mode === 'delta') {
|
||||
const linkedVhds = Object.keys(vhds).map(key => resolve('/', vmDir, vhds[key]))
|
||||
fileSystemSize = await computeVhdsSize(handler, linkedVhds)
|
||||
|
||||
// the size is not computed in some cases (e.g. VhdDirectory)
|
||||
if (fileSystemSize === undefined) {
|
||||
return
|
||||
}
|
||||
|
||||
// don't warn if the size has changed after a merge
|
||||
if (mergedSize === undefined) {
|
||||
const linkedVhds = Object.keys(vhds).map(key => resolve('/', vmDir, vhds[key]))
|
||||
fileSystemSize = await computeVhdsSize(handler, linkedVhds)
|
||||
// the size is not computed in some cases (e.g. VhdDirectory)
|
||||
if (fileSystemSize !== undefined && fileSystemSize !== size) {
|
||||
logWarn('cleanVm: incorrect backup size in metadata', {
|
||||
path: metadataPath,
|
||||
actual: size ?? 'none',
|
||||
expected: fileSystemSize,
|
||||
})
|
||||
}
|
||||
if (!merged && fileSystemSize !== size) {
|
||||
// FIXME: figure out why it occurs so often and, once fixed, log the real problems with `logWarn`
|
||||
console.warn('cleanVm: incorrect backup size in metadata', {
|
||||
path: metadataPath,
|
||||
actual: size ?? 'none',
|
||||
expected: fileSystemSize,
|
||||
})
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
@@ -548,19 +554,9 @@ export async function cleanVm(
|
||||
return
|
||||
}
|
||||
|
||||
// systematically update size and differentials after a merge
|
||||
|
||||
// @todo : after 2024-04-01 remove the fixmetadata options since the size computation is fixed
|
||||
if (mergedSize || (fixMetadata && fileSystemSize !== size)) {
|
||||
metadata.size = mergedSize ?? fileSystemSize ?? size
|
||||
|
||||
if (mergedSize) {
|
||||
// all disks are now key disk
|
||||
metadata.isVhdDifferencing = {}
|
||||
for (const id of Object.values(metadata.vdis ?? {})) {
|
||||
metadata.isVhdDifferencing[`${id}.vhd`] = false
|
||||
}
|
||||
}
|
||||
// systematically update size after a merge
|
||||
if ((merged || fixMetadata) && size !== fileSystemSize) {
|
||||
metadata.size = fileSystemSize
|
||||
mustRegenerateCache = true
|
||||
try {
|
||||
await handler.writeFile(metadataPath, JSON.stringify(metadata), { flags: 'w' })
|
||||
8
@xen-orchestra/backups/_filenameDate.js
Normal file
8
@xen-orchestra/backups/_filenameDate.js
Normal file
@@ -0,0 +1,8 @@
|
||||
'use strict'
|
||||
|
||||
const { utcFormat, utcParse } = require('d3-time-format')
|
||||
|
||||
// Format a date in ISO 8601 in a safe way to be used in filenames
|
||||
// (even on Windows).
|
||||
exports.formatFilenameDate = utcFormat('%Y%m%dT%H%M%SZ')
|
||||
exports.parseFilenameDate = utcParse('%Y%m%dT%H%M%SZ')
|
||||
@@ -1,6 +0,0 @@
|
||||
import { utcFormat, utcParse } from 'd3-time-format'
|
||||
|
||||
// Format a date in ISO 8601 in a safe way to be used in filenames
|
||||
// (even on Windows).
|
||||
export const formatFilenameDate = utcFormat('%Y%m%dT%H%M%SZ')
|
||||
export const parseFilenameDate = utcParse('%Y%m%dT%H%M%SZ')
|
||||
@@ -1,4 +1,6 @@
|
||||
'use strict'
|
||||
|
||||
// returns all entries but the last retention-th
|
||||
export function getOldEntries(retention, entries) {
|
||||
exports.getOldEntries = function getOldEntries(retention, entries) {
|
||||
return entries === undefined ? [] : retention > 0 ? entries.slice(0, -retention) : entries
|
||||
}
|
||||
@@ -1,11 +1,13 @@
|
||||
import Disposable from 'promise-toolbox/Disposable'
|
||||
import { join } from 'node:path'
|
||||
import { mkdir, rmdir } from 'node:fs/promises'
|
||||
import { tmpdir } from 'os'
|
||||
'use strict'
|
||||
|
||||
const Disposable = require('promise-toolbox/Disposable')
|
||||
const { join } = require('path')
|
||||
const { mkdir, rmdir } = require('fs-extra')
|
||||
const { tmpdir } = require('os')
|
||||
|
||||
const MAX_ATTEMPTS = 3
|
||||
|
||||
export async function getTmpDir() {
|
||||
exports.getTmpDir = async function getTmpDir() {
|
||||
for (let i = 0; true; ++i) {
|
||||
const path = join(tmpdir(), Math.random().toString(36).slice(2))
|
||||
try {
|
||||
8
@xen-orchestra/backups/_getVmBackupDir.js
Normal file
8
@xen-orchestra/backups/_getVmBackupDir.js
Normal file
@@ -0,0 +1,8 @@
|
||||
'use strict'
|
||||
|
||||
const BACKUP_DIR = 'xo-vm-backups'
|
||||
exports.BACKUP_DIR = BACKUP_DIR
|
||||
|
||||
exports.getVmBackupDir = function getVmBackupDir(uuid) {
|
||||
return `${BACKUP_DIR}/${uuid}`
|
||||
}
|
||||
@@ -1,5 +0,0 @@
|
||||
export const BACKUP_DIR = 'xo-vm-backups'
|
||||
|
||||
export function getVmBackupDir(uuid) {
|
||||
return `${BACKUP_DIR}/${uuid}`
|
||||
}
|
||||
@@ -1,30 +1,39 @@
|
||||
import groupBy from 'lodash/groupBy.js'
|
||||
import ignoreErrors from 'promise-toolbox/ignoreErrors'
|
||||
import omit from 'lodash/omit.js'
|
||||
import { asyncMap } from '@xen-orchestra/async-map'
|
||||
import { CancelToken } from 'promise-toolbox'
|
||||
import { compareVersions } from 'compare-versions'
|
||||
import { createVhdStreamWithLength } from 'vhd-lib'
|
||||
import { defer } from 'golike-defer'
|
||||
'use strict'
|
||||
|
||||
import { cancelableMap } from './_cancelableMap.mjs'
|
||||
import { Task } from './Task.mjs'
|
||||
import pick from 'lodash/pick.js'
|
||||
const find = require('lodash/find.js')
|
||||
const groupBy = require('lodash/groupBy.js')
|
||||
const ignoreErrors = require('promise-toolbox/ignoreErrors')
|
||||
const omit = require('lodash/omit.js')
|
||||
const { asyncMap } = require('@xen-orchestra/async-map')
|
||||
const { CancelToken } = require('promise-toolbox')
|
||||
const { compareVersions } = require('compare-versions')
|
||||
const { createVhdStreamWithLength } = require('vhd-lib')
|
||||
const { defer } = require('golike-defer')
|
||||
|
||||
// in `other_config` of an incrementally replicated VM, contains the UUID of the source VM
|
||||
export const TAG_BASE_DELTA = 'xo:base_delta'
|
||||
const { cancelableMap } = require('./_cancelableMap.js')
|
||||
const { Task } = require('./Task.js')
|
||||
const pick = require('lodash/pick.js')
|
||||
|
||||
// in `other_config` of an incrementally replicated VM, contains the UUID of the target SR used for replication
|
||||
//
|
||||
// added after the complete replication
|
||||
export const TAG_BACKUP_SR = 'xo:backup:sr'
|
||||
const TAG_BASE_DELTA = 'xo:base_delta'
|
||||
exports.TAG_BASE_DELTA = TAG_BASE_DELTA
|
||||
|
||||
// in other_config of VDIs of an incrementally replicated VM, contains the UUID of the source VDI
|
||||
export const TAG_COPY_SRC = 'xo:copy_of'
|
||||
const TAG_COPY_SRC = 'xo:copy_of'
|
||||
exports.TAG_COPY_SRC = TAG_COPY_SRC
|
||||
|
||||
const ensureArray = value => (value === undefined ? [] : Array.isArray(value) ? value : [value])
|
||||
const resolveUuid = async (xapi, cache, uuid, type) => {
|
||||
if (uuid == null) {
|
||||
return uuid
|
||||
}
|
||||
let ref = cache.get(uuid)
|
||||
if (ref === undefined) {
|
||||
ref = await xapi.call(`${type}.get_by_uuid`, uuid)
|
||||
cache.set(uuid, ref)
|
||||
}
|
||||
return ref
|
||||
}
|
||||
|
||||
export async function exportIncrementalVm(
|
||||
exports.exportIncrementalVm = async function exportIncrementalVm(
|
||||
vm,
|
||||
baseVm,
|
||||
{
|
||||
@@ -34,8 +43,6 @@ export async function exportIncrementalVm(
|
||||
fullVdisRequired = new Set(),
|
||||
|
||||
disableBaseTags = false,
|
||||
nbdConcurrency = 1,
|
||||
preferNbd,
|
||||
} = {}
|
||||
) {
|
||||
// refs of VM's VDIs → base's VDIs.
|
||||
@@ -83,8 +90,6 @@ export async function exportIncrementalVm(
|
||||
baseRef: baseVdi?.$ref,
|
||||
cancelToken,
|
||||
format: 'vhd',
|
||||
nbdConcurrency,
|
||||
preferNbd,
|
||||
})
|
||||
})
|
||||
|
||||
@@ -138,11 +143,11 @@ export async function exportIncrementalVm(
|
||||
)
|
||||
}
|
||||
|
||||
export const importIncrementalVm = defer(async function importIncrementalVm(
|
||||
exports.importIncrementalVm = defer(async function importIncrementalVm(
|
||||
$defer,
|
||||
incrementalVm,
|
||||
sr,
|
||||
{ cancelToken = CancelToken.none, newMacAddresses = false } = {}
|
||||
{ cancelToken = CancelToken.none, detectBase = true, mapVdisSrs = {}, newMacAddresses = false } = {}
|
||||
) {
|
||||
const { version } = incrementalVm
|
||||
if (compareVersions(version, '1.0.0') < 0) {
|
||||
@@ -152,6 +157,32 @@ export const importIncrementalVm = defer(async function importIncrementalVm(
|
||||
const vmRecord = incrementalVm.vm
|
||||
const xapi = sr.$xapi
|
||||
|
||||
let baseVm
|
||||
if (detectBase) {
|
||||
const remoteBaseVmUuid = vmRecord.other_config[TAG_BASE_DELTA]
|
||||
if (remoteBaseVmUuid) {
|
||||
baseVm = find(xapi.objects.all, obj => (obj = obj.other_config) && obj[TAG_COPY_SRC] === remoteBaseVmUuid)
|
||||
|
||||
if (!baseVm) {
|
||||
throw new Error(`could not find the base VM (copy of ${remoteBaseVmUuid})`)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const cache = new Map()
|
||||
const mapVdisSrRefs = {}
|
||||
for (const [vdiUuid, srUuid] of Object.entries(mapVdisSrs)) {
|
||||
mapVdisSrRefs[vdiUuid] = await resolveUuid(xapi, cache, srUuid, 'SR')
|
||||
}
|
||||
|
||||
const baseVdis = {}
|
||||
baseVm &&
|
||||
baseVm.$VBDs.forEach(vbd => {
|
||||
const vdi = vbd.$VDI
|
||||
if (vdi !== undefined) {
|
||||
baseVdis[vbd.VDI] = vbd.$VDI
|
||||
}
|
||||
})
|
||||
const vdiRecords = incrementalVm.vdis
|
||||
|
||||
// 0. Create suspend_VDI
|
||||
@@ -163,7 +194,18 @@ export const importIncrementalVm = defer(async function importIncrementalVm(
|
||||
vm: pick(vmRecord, 'uuid', 'name_label', 'suspend_VDI'),
|
||||
})
|
||||
} else {
|
||||
suspendVdi = await xapi.getRecord('VDI', await xapi.VDI_create(vdi))
|
||||
suspendVdi = await xapi.getRecord(
|
||||
'VDI',
|
||||
await xapi.VDI_create({
|
||||
...vdi,
|
||||
other_config: {
|
||||
...vdi.other_config,
|
||||
[TAG_BASE_DELTA]: undefined,
|
||||
[TAG_COPY_SRC]: vdi.uuid,
|
||||
},
|
||||
sr: mapVdisSrRefs[vdi.uuid] ?? sr.$ref,
|
||||
})
|
||||
)
|
||||
$defer.onFailure(() => suspendVdi.$destroy())
|
||||
}
|
||||
}
|
||||
@@ -181,6 +223,10 @@ export const importIncrementalVm = defer(async function importIncrementalVm(
|
||||
ha_always_run: false,
|
||||
is_a_template: false,
|
||||
name_label: '[Importing…] ' + vmRecord.name_label,
|
||||
other_config: {
|
||||
...vmRecord.other_config,
|
||||
[TAG_COPY_SRC]: vmRecord.uuid,
|
||||
},
|
||||
},
|
||||
{
|
||||
bios_strings: vmRecord.bios_strings,
|
||||
@@ -201,8 +247,14 @@ export const importIncrementalVm = defer(async function importIncrementalVm(
|
||||
const vdi = vdiRecords[vdiRef]
|
||||
let newVdi
|
||||
|
||||
if (vdi.baseVdi !== undefined) {
|
||||
newVdi = await xapi.getRecord('VDI', await vdi.baseVdi.$clone())
|
||||
const remoteBaseVdiUuid = detectBase && vdi.other_config[TAG_BASE_DELTA]
|
||||
if (remoteBaseVdiUuid) {
|
||||
const baseVdi = find(baseVdis, vdi => vdi.other_config[TAG_COPY_SRC] === remoteBaseVdiUuid)
|
||||
if (!baseVdi) {
|
||||
throw new Error(`missing base VDI (copy of ${remoteBaseVdiUuid})`)
|
||||
}
|
||||
|
||||
newVdi = await xapi.getRecord('VDI', await baseVdi.$clone())
|
||||
$defer.onFailure(() => newVdi.$destroy())
|
||||
|
||||
await newVdi.update_other_config(TAG_COPY_SRC, vdi.uuid)
|
||||
@@ -213,7 +265,18 @@ export const importIncrementalVm = defer(async function importIncrementalVm(
|
||||
// suspendVDI has already created
|
||||
newVdi = suspendVdi
|
||||
} else {
|
||||
newVdi = await xapi.getRecord('VDI', await xapi.VDI_create(vdi))
|
||||
newVdi = await xapi.getRecord(
|
||||
'VDI',
|
||||
await xapi.VDI_create({
|
||||
...vdi,
|
||||
other_config: {
|
||||
...vdi.other_config,
|
||||
[TAG_BASE_DELTA]: undefined,
|
||||
[TAG_COPY_SRC]: vdi.uuid,
|
||||
},
|
||||
SR: mapVdisSrRefs[vdi.uuid] ?? sr.$ref,
|
||||
})
|
||||
)
|
||||
$defer.onFailure(() => newVdi.$destroy())
|
||||
}
|
||||
|
||||
@@ -252,19 +315,13 @@ export const importIncrementalVm = defer(async function importIncrementalVm(
|
||||
// Import VDI contents.
|
||||
cancelableMap(cancelToken, Object.entries(newVdis), async (cancelToken, [id, vdi]) => {
|
||||
for (let stream of ensureArray(streams[`${id}.vhd`])) {
|
||||
if (stream === null) {
|
||||
// we restore a backup and reuse completly a local snapshot
|
||||
continue
|
||||
}
|
||||
if (typeof stream === 'function') {
|
||||
stream = await stream()
|
||||
}
|
||||
if (stream.length === undefined) {
|
||||
stream = await createVhdStreamWithLength(stream)
|
||||
}
|
||||
await xapi.setField('VDI', vdi.$ref, 'name_label', `[Importing] ${vdiRecords[id].name_label}`)
|
||||
await vdi.$importContent(stream, { cancelToken, format: 'vhd' })
|
||||
await xapi.setField('VDI', vdi.$ref, 'name_label', vdiRecords[id].name_label)
|
||||
}
|
||||
}),
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
import assert from 'node:assert'
|
||||
'use strict'
|
||||
|
||||
const assert = require('assert')
|
||||
|
||||
const COMPRESSED_MAGIC_NUMBERS = [
|
||||
// https://tools.ietf.org/html/rfc1952.html#page-5
|
||||
@@ -45,7 +47,7 @@ const isValidTar = async (handler, size, fd) => {
|
||||
}
|
||||
|
||||
// TODO: find an heuristic for compressed files
|
||||
export async function isValidXva(path) {
|
||||
async function isValidXva(path) {
|
||||
const handler = this._handler
|
||||
|
||||
// size is longer when encrypted + reading part of an encrypted file is not implemented
|
||||
@@ -72,5 +74,6 @@ export async function isValidXva(path) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
exports.isValidXva = isValidXva
|
||||
|
||||
const noop = Function.prototype
|
||||
@@ -1,7 +1,9 @@
|
||||
import fromCallback from 'promise-toolbox/fromCallback'
|
||||
import { createLogger } from '@xen-orchestra/log'
|
||||
import { createParser } from 'parse-pairs'
|
||||
import { execFile } from 'child_process'
|
||||
'use strict'
|
||||
|
||||
const fromCallback = require('promise-toolbox/fromCallback')
|
||||
const { createLogger } = require('@xen-orchestra/log')
|
||||
const { createParser } = require('parse-pairs')
|
||||
const { execFile } = require('child_process')
|
||||
|
||||
const { debug } = createLogger('xo:backups:listPartitions')
|
||||
|
||||
@@ -22,7 +24,8 @@ const IGNORED_PARTITION_TYPES = {
|
||||
0x82: true, // swap
|
||||
}
|
||||
|
||||
export const LVM_PARTITION_TYPE = 0x8e
|
||||
const LVM_PARTITION_TYPE = 0x8e
|
||||
exports.LVM_PARTITION_TYPE = LVM_PARTITION_TYPE
|
||||
|
||||
const parsePartxLine = createParser({
|
||||
keyTransform: key => (key === 'UUID' ? 'id' : key.toLowerCase()),
|
||||
@@ -30,7 +33,7 @@ const parsePartxLine = createParser({
|
||||
})
|
||||
|
||||
// returns an empty array in case of a non-partitioned disk
|
||||
export async function listPartitions(devicePath) {
|
||||
exports.listPartitions = async function listPartitions(devicePath) {
|
||||
const parts = await fromCallback(execFile, 'partx', [
|
||||
'--bytes',
|
||||
'--output=NR,START,SIZE,NAME,UUID,TYPE',
|
||||
@@ -1,6 +1,8 @@
|
||||
import fromCallback from 'promise-toolbox/fromCallback'
|
||||
import { createParser } from 'parse-pairs'
|
||||
import { execFile } from 'child_process'
|
||||
'use strict'
|
||||
|
||||
const fromCallback = require('promise-toolbox/fromCallback')
|
||||
const { createParser } = require('parse-pairs')
|
||||
const { execFile } = require('child_process')
|
||||
|
||||
// ===================================================================
|
||||
|
||||
@@ -27,5 +29,5 @@ const makeFunction =
|
||||
.map(Array.isArray(fields) ? parse : line => parse(line)[fields])
|
||||
}
|
||||
|
||||
export const lvs = makeFunction('lvs')
|
||||
export const pvs = makeFunction('pvs')
|
||||
exports.lvs = makeFunction('lvs')
|
||||
exports.pvs = makeFunction('pvs')
|
||||
@@ -1,20 +1,22 @@
|
||||
import { asyncMap } from '@xen-orchestra/async-map'
|
||||
import Disposable from 'promise-toolbox/Disposable'
|
||||
import ignoreErrors from 'promise-toolbox/ignoreErrors'
|
||||
'use strict'
|
||||
|
||||
import { extractIdsFromSimplePattern } from '../extractIdsFromSimplePattern.mjs'
|
||||
import { PoolMetadataBackup } from './_PoolMetadataBackup.mjs'
|
||||
import { XoMetadataBackup } from './_XoMetadataBackup.mjs'
|
||||
import { DEFAULT_SETTINGS, Abstract } from './_Abstract.mjs'
|
||||
import { runTask } from './_runTask.mjs'
|
||||
import { getAdaptersByRemote } from './_getAdaptersByRemote.mjs'
|
||||
const { asyncMap } = require('@xen-orchestra/async-map')
|
||||
const Disposable = require('promise-toolbox/Disposable')
|
||||
const ignoreErrors = require('promise-toolbox/ignoreErrors')
|
||||
|
||||
const { extractIdsFromSimplePattern } = require('../extractIdsFromSimplePattern.js')
|
||||
const { PoolMetadataBackup } = require('./_PoolMetadataBackup.js')
|
||||
const { XoMetadataBackup } = require('./_XoMetadataBackup.js')
|
||||
const { DEFAULT_SETTINGS, Abstract } = require('./_Abstract.js')
|
||||
const { runTask } = require('./_runTask.js')
|
||||
const { getAdaptersByRemote } = require('./_getAdaptersByRemote.js')
|
||||
|
||||
const DEFAULT_METADATA_SETTINGS = {
|
||||
retentionPoolMetadata: 0,
|
||||
retentionXoMetadata: 0,
|
||||
}
|
||||
|
||||
export const Metadata = class MetadataBackupRunner extends Abstract {
|
||||
exports.Metadata = class MetadataBackupRunner extends Abstract {
|
||||
_computeBaseSettings(config, job) {
|
||||
const baseSettings = { ...DEFAULT_SETTINGS }
|
||||
Object.assign(baseSettings, DEFAULT_METADATA_SETTINGS, config.defaultSettings, config.metadata?.defaultSettings)
|
||||
@@ -1,15 +1,17 @@
|
||||
import { asyncMapSettled } from '@xen-orchestra/async-map'
|
||||
import Disposable from 'promise-toolbox/Disposable'
|
||||
import { limitConcurrency } from 'limit-concurrency-decorator'
|
||||
'use strict'
|
||||
|
||||
import { extractIdsFromSimplePattern } from '../extractIdsFromSimplePattern.mjs'
|
||||
import { Task } from '../Task.mjs'
|
||||
import createStreamThrottle from './_createStreamThrottle.mjs'
|
||||
import { DEFAULT_SETTINGS, Abstract } from './_Abstract.mjs'
|
||||
import { runTask } from './_runTask.mjs'
|
||||
import { getAdaptersByRemote } from './_getAdaptersByRemote.mjs'
|
||||
import { FullRemote } from './_vmRunners/FullRemote.mjs'
|
||||
import { IncrementalRemote } from './_vmRunners/IncrementalRemote.mjs'
|
||||
const { asyncMapSettled } = require('@xen-orchestra/async-map')
|
||||
const Disposable = require('promise-toolbox/Disposable')
|
||||
const { limitConcurrency } = require('limit-concurrency-decorator')
|
||||
|
||||
const { extractIdsFromSimplePattern } = require('../extractIdsFromSimplePattern.js')
|
||||
const { Task } = require('../Task.js')
|
||||
const createStreamThrottle = require('./_createStreamThrottle.js')
|
||||
const { DEFAULT_SETTINGS, Abstract } = require('./_Abstract.js')
|
||||
const { runTask } = require('./_runTask.js')
|
||||
const { getAdaptersByRemote } = require('./_getAdaptersByRemote.js')
|
||||
const { FullRemote } = require('./_vmRunners/FullRemote.js')
|
||||
const { IncrementalRemote } = require('./_vmRunners/IncrementalRemote.js')
|
||||
|
||||
const DEFAULT_REMOTE_VM_SETTINGS = {
|
||||
concurrency: 2,
|
||||
@@ -25,7 +27,7 @@ const DEFAULT_REMOTE_VM_SETTINGS = {
|
||||
vmTimeout: 0,
|
||||
}
|
||||
|
||||
export const VmsRemote = class RemoteVmsBackupRunner extends Abstract {
|
||||
exports.VmsRemote = class RemoteVmsBackupRunner extends Abstract {
|
||||
_computeBaseSettings(config, job) {
|
||||
const baseSettings = { ...DEFAULT_SETTINGS }
|
||||
Object.assign(baseSettings, DEFAULT_REMOTE_VM_SETTINGS, config.defaultSettings, config.vm?.defaultSettings)
|
||||
@@ -1,15 +1,17 @@
|
||||
import { asyncMapSettled } from '@xen-orchestra/async-map'
|
||||
import Disposable from 'promise-toolbox/Disposable'
|
||||
import { limitConcurrency } from 'limit-concurrency-decorator'
|
||||
'use strict'
|
||||
|
||||
import { extractIdsFromSimplePattern } from '../extractIdsFromSimplePattern.mjs'
|
||||
import { Task } from '../Task.mjs'
|
||||
import createStreamThrottle from './_createStreamThrottle.mjs'
|
||||
import { DEFAULT_SETTINGS, Abstract } from './_Abstract.mjs'
|
||||
import { runTask } from './_runTask.mjs'
|
||||
import { getAdaptersByRemote } from './_getAdaptersByRemote.mjs'
|
||||
import { IncrementalXapi } from './_vmRunners/IncrementalXapi.mjs'
|
||||
import { FullXapi } from './_vmRunners/FullXapi.mjs'
|
||||
const { asyncMapSettled } = require('@xen-orchestra/async-map')
|
||||
const Disposable = require('promise-toolbox/Disposable')
|
||||
const { limitConcurrency } = require('limit-concurrency-decorator')
|
||||
|
||||
const { extractIdsFromSimplePattern } = require('../extractIdsFromSimplePattern.js')
|
||||
const { Task } = require('../Task.js')
|
||||
const createStreamThrottle = require('./_createStreamThrottle.js')
|
||||
const { DEFAULT_SETTINGS, Abstract } = require('./_Abstract.js')
|
||||
const { runTask } = require('./_runTask.js')
|
||||
const { getAdaptersByRemote } = require('./_getAdaptersByRemote.js')
|
||||
const { IncrementalXapi } = require('./_vmRunners/IncrementalXapi.js')
|
||||
const { FullXapi } = require('./_vmRunners/FullXapi.js')
|
||||
|
||||
const DEFAULT_XAPI_VM_SETTINGS = {
|
||||
bypassVdiChainsCheck: false,
|
||||
@@ -34,7 +36,7 @@ const DEFAULT_XAPI_VM_SETTINGS = {
|
||||
vmTimeout: 0,
|
||||
}
|
||||
|
||||
export const VmsXapi = class VmsXapiBackupRunner extends Abstract {
|
||||
exports.VmsXapi = class VmsXapiBackupRunner extends Abstract {
|
||||
_computeBaseSettings(config, job) {
|
||||
const baseSettings = { ...DEFAULT_SETTINGS }
|
||||
Object.assign(baseSettings, DEFAULT_XAPI_VM_SETTINGS, config.defaultSettings, config.vm?.defaultSettings)
|
||||
@@ -1,15 +1,17 @@
|
||||
import Disposable from 'promise-toolbox/Disposable'
|
||||
import pTimeout from 'promise-toolbox/timeout'
|
||||
import { compileTemplate } from '@xen-orchestra/template'
|
||||
import { runTask } from './_runTask.mjs'
|
||||
import { RemoteTimeoutError } from './_RemoteTimeoutError.mjs'
|
||||
'use strict'
|
||||
|
||||
export const DEFAULT_SETTINGS = {
|
||||
const Disposable = require('promise-toolbox/Disposable')
|
||||
const pTimeout = require('promise-toolbox/timeout')
|
||||
const { compileTemplate } = require('@xen-orchestra/template')
|
||||
const { runTask } = require('./_runTask.js')
|
||||
const { RemoteTimeoutError } = require('./_RemoteTimeoutError.js')
|
||||
|
||||
exports.DEFAULT_SETTINGS = {
|
||||
getRemoteTimeout: 300e3,
|
||||
reportWhen: 'failure',
|
||||
}
|
||||
|
||||
export const Abstract = class AbstractRunner {
|
||||
exports.Abstract = class AbstractRunner {
|
||||
constructor({ config, getAdapter, getConnectedRecord, job, schedule }) {
|
||||
this._config = config
|
||||
this._getRecord = getConnectedRecord
|
||||
@@ -1,13 +1,16 @@
|
||||
import { asyncMap } from '@xen-orchestra/async-map'
|
||||
'use strict'
|
||||
|
||||
import { DIR_XO_POOL_METADATA_BACKUPS } from '../RemoteAdapter.mjs'
|
||||
import { forkStreamUnpipe } from './_forkStreamUnpipe.mjs'
|
||||
import { formatFilenameDate } from '../_filenameDate.mjs'
|
||||
import { Task } from '../Task.mjs'
|
||||
const { asyncMap } = require('@xen-orchestra/async-map')
|
||||
|
||||
export const PATH_DB_DUMP = '/pool/xmldbdump'
|
||||
const { DIR_XO_POOL_METADATA_BACKUPS } = require('../RemoteAdapter.js')
|
||||
const { forkStreamUnpipe } = require('./_forkStreamUnpipe.js')
|
||||
const { formatFilenameDate } = require('../_filenameDate.js')
|
||||
const { Task } = require('../Task.js')
|
||||
|
||||
export class PoolMetadataBackup {
|
||||
const PATH_DB_DUMP = '/pool/xmldbdump'
|
||||
exports.PATH_DB_DUMP = PATH_DB_DUMP
|
||||
|
||||
exports.PoolMetadataBackup = class PoolMetadataBackup {
|
||||
constructor({ config, job, pool, remoteAdapters, schedule, settings }) {
|
||||
this._config = config
|
||||
this._job = job
|
||||
@@ -1,6 +1,8 @@
|
||||
export class RemoteTimeoutError extends Error {
|
||||
'use strict'
|
||||
class RemoteTimeoutError extends Error {
|
||||
constructor(remoteId) {
|
||||
super('timeout while getting the remote ' + remoteId)
|
||||
this.remoteId = remoteId
|
||||
}
|
||||
}
|
||||
exports.RemoteTimeoutError = RemoteTimeoutError
|
||||
@@ -1,11 +1,13 @@
|
||||
import { asyncMap } from '@xen-orchestra/async-map'
|
||||
import { join } from '@xen-orchestra/fs/path'
|
||||
'use strict'
|
||||
|
||||
import { DIR_XO_CONFIG_BACKUPS } from '../RemoteAdapter.mjs'
|
||||
import { formatFilenameDate } from '../_filenameDate.mjs'
|
||||
import { Task } from '../Task.mjs'
|
||||
const { asyncMap } = require('@xen-orchestra/async-map')
|
||||
const { join } = require('@xen-orchestra/fs/path')
|
||||
|
||||
export class XoMetadataBackup {
|
||||
const { DIR_XO_CONFIG_BACKUPS } = require('../RemoteAdapter.js')
|
||||
const { formatFilenameDate } = require('../_filenameDate.js')
|
||||
const { Task } = require('../Task.js')
|
||||
|
||||
exports.XoMetadataBackup = class XoMetadataBackup {
|
||||
constructor({ config, job, remoteAdapters, schedule, settings }) {
|
||||
this._config = config
|
||||
this._job = job
|
||||
@@ -22,13 +24,7 @@ export class XoMetadataBackup {
|
||||
const dir = `${scheduleDir}/${formatFilenameDate(timestamp)}`
|
||||
|
||||
const data = job.xoMetadata
|
||||
let dataBaseName = './data'
|
||||
|
||||
// JSON data is sent as plain string, binary data is sent as an object with `data` and `encoding properties
|
||||
const isJson = typeof data === 'string'
|
||||
if (isJson) {
|
||||
dataBaseName += '.json'
|
||||
}
|
||||
const dataBaseName = './data.json'
|
||||
|
||||
const metadata = JSON.stringify(
|
||||
{
|
||||
@@ -60,7 +56,7 @@ export class XoMetadataBackup {
|
||||
async () => {
|
||||
const handler = adapter.handler
|
||||
const dirMode = this._config.dirMode
|
||||
await handler.outputFile(dataFileName, isJson ? data : Buffer.from(data.data, data.encoding), { dirMode })
|
||||
await handler.outputFile(dataFileName, data, { dirMode })
|
||||
await handler.outputFile(metaDataFileName, metadata, {
|
||||
dirMode,
|
||||
})
|
||||
@@ -1,10 +1,12 @@
|
||||
import { pipeline } from 'node:stream'
|
||||
import { ThrottleGroup } from '@kldzj/stream-throttle'
|
||||
import identity from 'lodash/identity.js'
|
||||
'use strict'
|
||||
|
||||
const { pipeline } = require('node:stream')
|
||||
const { ThrottleGroup } = require('@kldzj/stream-throttle')
|
||||
const identity = require('lodash/identity.js')
|
||||
|
||||
const noop = Function.prototype
|
||||
|
||||
export default function createStreamThrottle(rate) {
|
||||
module.exports = function createStreamThrottle(rate) {
|
||||
if (rate === 0) {
|
||||
return identity
|
||||
}
|
||||
@@ -1,13 +1,14 @@
|
||||
import { createLogger } from '@xen-orchestra/log'
|
||||
import { finished, PassThrough } from 'node:stream'
|
||||
'use strict'
|
||||
|
||||
const { debug } = createLogger('xo:backups:forkStreamUnpipe')
|
||||
const { finished, PassThrough } = require('node:stream')
|
||||
|
||||
const { debug } = require('@xen-orchestra/log').createLogger('xo:backups:forkStreamUnpipe')
|
||||
|
||||
// create a new readable stream from an existing one which may be piped later
|
||||
//
|
||||
// in case of error in the new readable stream, it will simply be unpiped
|
||||
// from the original one
|
||||
export function forkStreamUnpipe(source) {
|
||||
exports.forkStreamUnpipe = function forkStreamUnpipe(source) {
|
||||
const { forks = 0 } = source
|
||||
source.forks = forks + 1
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
export function getAdaptersByRemote(adapters) {
|
||||
'use strict'
|
||||
const getAdaptersByRemote = adapters => {
|
||||
const adaptersByRemote = {}
|
||||
adapters.forEach(({ adapter, remoteId }) => {
|
||||
adaptersByRemote[remoteId] = adapter
|
||||
})
|
||||
return adaptersByRemote
|
||||
}
|
||||
exports.getAdaptersByRemote = getAdaptersByRemote
|
||||
6
@xen-orchestra/backups/_runners/_runTask.js
Normal file
6
@xen-orchestra/backups/_runners/_runTask.js
Normal file
@@ -0,0 +1,6 @@
|
||||
'use strict'
|
||||
const { Task } = require('../Task.js')
|
||||
const noop = Function.prototype
|
||||
const runTask = (...args) => Task.run(...args).catch(noop) // errors are handled by logs
|
||||
|
||||
exports.runTask = runTask
|
||||
@@ -1,5 +0,0 @@
|
||||
import { Task } from '../Task.mjs'
|
||||
|
||||
const noop = Function.prototype
|
||||
|
||||
export const runTask = (...args) => Task.run(...args).catch(noop) // errors are handled by logs
|
||||
@@ -1,12 +1,14 @@
|
||||
import { decorateMethodsWith } from '@vates/decorate-with'
|
||||
import { defer } from 'golike-defer'
|
||||
import { AbstractRemote } from './_AbstractRemote.mjs'
|
||||
import { FullRemoteWriter } from '../_writers/FullRemoteWriter.mjs'
|
||||
import { forkStreamUnpipe } from '../_forkStreamUnpipe.mjs'
|
||||
import { watchStreamSize } from '../../_watchStreamSize.mjs'
|
||||
import { Task } from '../../Task.mjs'
|
||||
'use strict'
|
||||
|
||||
export const FullRemote = class FullRemoteVmBackupRunner extends AbstractRemote {
|
||||
const { decorateMethodsWith } = require('@vates/decorate-with')
|
||||
const { defer } = require('golike-defer')
|
||||
const { AbstractRemote } = require('./_AbstractRemote')
|
||||
const { FullRemoteWriter } = require('../_writers/FullRemoteWriter')
|
||||
const { forkStreamUnpipe } = require('../_forkStreamUnpipe')
|
||||
const { watchStreamSize } = require('../../_watchStreamSize')
|
||||
const { Task } = require('../../Task')
|
||||
|
||||
class FullRemoteVmBackupRunner extends AbstractRemote {
|
||||
_getRemoteWriter() {
|
||||
return FullRemoteWriter
|
||||
}
|
||||
@@ -29,8 +31,6 @@ export const FullRemote = class FullRemoteVmBackupRunner extends AbstractRemote
|
||||
writer =>
|
||||
writer.run({
|
||||
stream: forkStreamUnpipe(stream),
|
||||
// stream will be forked and transformed, it's not safe to attach additionnal properties to it
|
||||
streamLength: stream.length,
|
||||
timestamp: metadata.timestamp,
|
||||
vm: metadata.vm,
|
||||
vmSnapshot: metadata.vmSnapshot,
|
||||
@@ -47,6 +47,7 @@ export const FullRemote = class FullRemoteVmBackupRunner extends AbstractRemote
|
||||
}
|
||||
}
|
||||
|
||||
decorateMethodsWith(FullRemote, {
|
||||
exports.FullRemote = FullRemoteVmBackupRunner
|
||||
decorateMethodsWith(FullRemoteVmBackupRunner, {
|
||||
_run: defer,
|
||||
})
|
||||
@@ -1,14 +1,16 @@
|
||||
import { createLogger } from '@xen-orchestra/log'
|
||||
'use strict'
|
||||
|
||||
import { forkStreamUnpipe } from '../_forkStreamUnpipe.mjs'
|
||||
import { FullRemoteWriter } from '../_writers/FullRemoteWriter.mjs'
|
||||
import { FullXapiWriter } from '../_writers/FullXapiWriter.mjs'
|
||||
import { watchStreamSize } from '../../_watchStreamSize.mjs'
|
||||
import { AbstractXapi } from './_AbstractXapi.mjs'
|
||||
const { createLogger } = require('@xen-orchestra/log')
|
||||
|
||||
const { forkStreamUnpipe } = require('../_forkStreamUnpipe.js')
|
||||
const { FullRemoteWriter } = require('../_writers/FullRemoteWriter.js')
|
||||
const { FullXapiWriter } = require('../_writers/FullXapiWriter.js')
|
||||
const { watchStreamSize } = require('../../_watchStreamSize.js')
|
||||
const { AbstractXapi } = require('./_AbstractXapi.js')
|
||||
|
||||
const { debug } = createLogger('xo:backups:FullXapiVmBackup')
|
||||
|
||||
export const FullXapi = class FullXapiVmBackupRunner extends AbstractXapi {
|
||||
exports.FullXapi = class FullXapiVmBackupRunner extends AbstractXapi {
|
||||
_getWriters() {
|
||||
return [FullRemoteWriter, FullXapiWriter]
|
||||
}
|
||||
@@ -35,25 +37,13 @@ export const FullXapi = class FullXapiVmBackupRunner extends AbstractXapi {
|
||||
useSnapshot: false,
|
||||
})
|
||||
)
|
||||
|
||||
const vdis = await exportedVm.$getDisks()
|
||||
let maxStreamLength = 1024 * 1024 // Ovf file and tar headers are a few KB, let's stay safe
|
||||
for (const vdiRef of vdis) {
|
||||
const vdi = await this._xapi.getRecord('VDI', vdiRef)
|
||||
|
||||
// the size a of fully allocated vdi will be virtual_size exaclty, it's a gross over evaluation
|
||||
// of the real stream size in general, since a disk is never completly full
|
||||
// vdi.physical_size seems to underevaluate a lot the real disk usage of a VDI, as of 2023-10-30
|
||||
maxStreamLength += vdi.virtual_size
|
||||
}
|
||||
|
||||
const sizeContainer = watchStreamSize(stream)
|
||||
|
||||
const timestamp = Date.now()
|
||||
|
||||
await this._callWriters(
|
||||
writer =>
|
||||
writer.run({
|
||||
maxStreamLength,
|
||||
sizeContainer,
|
||||
stream: forkStreamUnpipe(stream),
|
||||
timestamp,
|
||||
@@ -1,14 +1,15 @@
|
||||
import { asyncEach } from '@vates/async-each'
|
||||
import { decorateMethodsWith } from '@vates/decorate-with'
|
||||
import { defer } from 'golike-defer'
|
||||
import assert from 'node:assert'
|
||||
import isVhdDifferencingDisk from 'vhd-lib/isVhdDifferencingDisk.js'
|
||||
import mapValues from 'lodash/mapValues.js'
|
||||
'use strict'
|
||||
const assert = require('node:assert')
|
||||
|
||||
import { AbstractRemote } from './_AbstractRemote.mjs'
|
||||
import { forkDeltaExport } from './_forkDeltaExport.mjs'
|
||||
import { IncrementalRemoteWriter } from '../_writers/IncrementalRemoteWriter.mjs'
|
||||
import { Task } from '../../Task.mjs'
|
||||
const { decorateMethodsWith } = require('@vates/decorate-with')
|
||||
const { defer } = require('golike-defer')
|
||||
const { mapValues } = require('lodash')
|
||||
const { Task } = require('../../Task')
|
||||
const { AbstractRemote } = require('./_AbstractRemote')
|
||||
const { IncrementalRemoteWriter } = require('../_writers/IncrementalRemoteWriter')
|
||||
const { forkDeltaExport } = require('./_forkDeltaExport')
|
||||
const isVhdDifferencingDisk = require('vhd-lib/isVhdDifferencingDisk')
|
||||
const { asyncEach } = require('@vates/async-each')
|
||||
|
||||
class IncrementalRemoteVmBackupRunner extends AbstractRemote {
|
||||
_getRemoteWriter() {
|
||||
@@ -32,10 +33,10 @@ class IncrementalRemoteVmBackupRunner extends AbstractRemote {
|
||||
useChain: false,
|
||||
})
|
||||
|
||||
const isVhdDifferencing = {}
|
||||
const differentialVhds = {}
|
||||
|
||||
await asyncEach(Object.entries(incrementalExport.streams), async ([key, stream]) => {
|
||||
isVhdDifferencing[key] = await isVhdDifferencingDisk(stream)
|
||||
differentialVhds[key] = await isVhdDifferencingDisk(stream)
|
||||
})
|
||||
|
||||
incrementalExport.streams = mapValues(incrementalExport.streams, this._throttleStream)
|
||||
@@ -43,7 +44,7 @@ class IncrementalRemoteVmBackupRunner extends AbstractRemote {
|
||||
writer =>
|
||||
writer.transfer({
|
||||
deltaExport: forkDeltaExport(incrementalExport),
|
||||
isVhdDifferencing,
|
||||
differentialVhds,
|
||||
timestamp: metadata.timestamp,
|
||||
vm: metadata.vm,
|
||||
vmSnapshot: metadata.vmSnapshot,
|
||||
@@ -60,7 +61,7 @@ class IncrementalRemoteVmBackupRunner extends AbstractRemote {
|
||||
}
|
||||
}
|
||||
|
||||
export const IncrementalRemote = IncrementalRemoteVmBackupRunner
|
||||
exports.IncrementalRemote = IncrementalRemoteVmBackupRunner
|
||||
decorateMethodsWith(IncrementalRemoteVmBackupRunner, {
|
||||
_run: defer,
|
||||
})
|
||||
@@ -1,26 +1,28 @@
|
||||
import { asyncEach } from '@vates/async-each'
|
||||
import { asyncMap } from '@xen-orchestra/async-map'
|
||||
import { createLogger } from '@xen-orchestra/log'
|
||||
import { pipeline } from 'node:stream'
|
||||
import findLast from 'lodash/findLast.js'
|
||||
import isVhdDifferencingDisk from 'vhd-lib/isVhdDifferencingDisk.js'
|
||||
import keyBy from 'lodash/keyBy.js'
|
||||
import mapValues from 'lodash/mapValues.js'
|
||||
import vhdStreamValidator from 'vhd-lib/vhdStreamValidator.js'
|
||||
'use strict'
|
||||
|
||||
import { AbstractXapi } from './_AbstractXapi.mjs'
|
||||
import { exportIncrementalVm } from '../../_incrementalVm.mjs'
|
||||
import { forkDeltaExport } from './_forkDeltaExport.mjs'
|
||||
import { IncrementalRemoteWriter } from '../_writers/IncrementalRemoteWriter.mjs'
|
||||
import { IncrementalXapiWriter } from '../_writers/IncrementalXapiWriter.mjs'
|
||||
import { Task } from '../../Task.mjs'
|
||||
import { watchStreamSize } from '../../_watchStreamSize.mjs'
|
||||
const findLast = require('lodash/findLast.js')
|
||||
const keyBy = require('lodash/keyBy.js')
|
||||
const mapValues = require('lodash/mapValues.js')
|
||||
const vhdStreamValidator = require('vhd-lib/vhdStreamValidator.js')
|
||||
const { asyncMap } = require('@xen-orchestra/async-map')
|
||||
const { createLogger } = require('@xen-orchestra/log')
|
||||
const { pipeline } = require('node:stream')
|
||||
|
||||
const { IncrementalRemoteWriter } = require('../_writers/IncrementalRemoteWriter.js')
|
||||
const { IncrementalXapiWriter } = require('../_writers/IncrementalXapiWriter.js')
|
||||
const { exportIncrementalVm } = require('../../_incrementalVm.js')
|
||||
const { Task } = require('../../Task.js')
|
||||
const { watchStreamSize } = require('../../_watchStreamSize.js')
|
||||
const { AbstractXapi } = require('./_AbstractXapi.js')
|
||||
const { forkDeltaExport } = require('./_forkDeltaExport.js')
|
||||
const isVhdDifferencingDisk = require('vhd-lib/isVhdDifferencingDisk')
|
||||
const { asyncEach } = require('@vates/async-each')
|
||||
|
||||
const { debug } = createLogger('xo:backups:IncrementalXapiVmBackup')
|
||||
|
||||
const noop = Function.prototype
|
||||
|
||||
export const IncrementalXapi = class IncrementalXapiVmBackupRunner extends AbstractXapi {
|
||||
exports.IncrementalXapi = class IncrementalXapiVmBackupRunner extends AbstractXapi {
|
||||
_getWriters() {
|
||||
return [IncrementalRemoteWriter, IncrementalXapiWriter]
|
||||
}
|
||||
@@ -41,8 +43,6 @@ export const IncrementalXapi = class IncrementalXapiVmBackupRunner extends Abstr
|
||||
|
||||
const deltaExport = await exportIncrementalVm(exportedVm, baseVm, {
|
||||
fullVdisRequired,
|
||||
nbdConcurrency: this._settings.nbdConcurrency,
|
||||
preferNbd: this._settings.preferNbd,
|
||||
})
|
||||
// since NBD is network based, if one disk use nbd , all the disk use them
|
||||
// except the suspended VDI
|
||||
@@ -50,11 +50,11 @@ export const IncrementalXapi = class IncrementalXapiVmBackupRunner extends Abstr
|
||||
Task.info('Transfer data using NBD')
|
||||
}
|
||||
|
||||
const isVhdDifferencing = {}
|
||||
const differentialVhds = {}
|
||||
// since isVhdDifferencingDisk is reading and unshifting data in stream
|
||||
// it should be done BEFORE any other stream transform
|
||||
await asyncEach(Object.entries(deltaExport.streams), async ([key, stream]) => {
|
||||
isVhdDifferencing[key] = await isVhdDifferencingDisk(stream)
|
||||
differentialVhds[key] = await isVhdDifferencingDisk(stream)
|
||||
})
|
||||
const sizeContainers = mapValues(deltaExport.streams, stream => watchStreamSize(stream))
|
||||
|
||||
@@ -69,7 +69,7 @@ export const IncrementalXapi = class IncrementalXapiVmBackupRunner extends Abstr
|
||||
writer =>
|
||||
writer.transfer({
|
||||
deltaExport: forkDeltaExport(deltaExport),
|
||||
isVhdDifferencing,
|
||||
differentialVhds,
|
||||
sizeContainers,
|
||||
timestamp,
|
||||
vm,
|
||||
@@ -1,6 +1,8 @@
|
||||
import { asyncMap } from '@xen-orchestra/async-map'
|
||||
import { createLogger } from '@xen-orchestra/log'
|
||||
import { Task } from '../../Task.mjs'
|
||||
'use strict'
|
||||
|
||||
const { asyncMap } = require('@xen-orchestra/async-map')
|
||||
const { createLogger } = require('@xen-orchestra/log')
|
||||
const { Task } = require('../../Task.js')
|
||||
|
||||
const { debug, warn } = createLogger('xo:backups:AbstractVmRunner')
|
||||
|
||||
@@ -17,7 +19,7 @@ const asyncEach = async (iterable, fn, thisArg = iterable) => {
|
||||
}
|
||||
}
|
||||
|
||||
export const Abstract = class AbstractVmBackupRunner {
|
||||
exports.Abstract = class AbstractVmBackupRunner {
|
||||
// calls fn for each function, warns of any errors, and throws only if there are no writers left
|
||||
async _callWriters(fn, step, parallel = true) {
|
||||
const writers = this._writers
|
||||
@@ -1,12 +1,11 @@
|
||||
import { asyncEach } from '@vates/async-each'
|
||||
import { Disposable } from 'promise-toolbox'
|
||||
'use strict'
|
||||
const { Abstract } = require('./_Abstract')
|
||||
|
||||
import { getVmBackupDir } from '../../_getVmBackupDir.mjs'
|
||||
const { getVmBackupDir } = require('../../_getVmBackupDir')
|
||||
const { asyncEach } = require('@vates/async-each')
|
||||
const { Disposable } = require('promise-toolbox')
|
||||
|
||||
import { Abstract } from './_Abstract.mjs'
|
||||
import { extractIdsFromSimplePattern } from '../../extractIdsFromSimplePattern.mjs'
|
||||
|
||||
export const AbstractRemote = class AbstractRemoteVmBackupRunner extends Abstract {
|
||||
exports.AbstractRemote = class AbstractRemoteVmBackupRunner extends Abstract {
|
||||
constructor({
|
||||
config,
|
||||
job,
|
||||
@@ -35,8 +34,7 @@ export const AbstractRemote = class AbstractRemoteVmBackupRunner extends Abstrac
|
||||
this._writers = writers
|
||||
|
||||
const RemoteWriter = this._getRemoteWriter()
|
||||
extractIdsFromSimplePattern(job.remotes).forEach(remoteId => {
|
||||
const adapter = remoteAdapters[remoteId]
|
||||
Object.entries(remoteAdapters).forEach(([remoteId, adapter]) => {
|
||||
const targetSettings = {
|
||||
...settings,
|
||||
...allSettings[remoteId],
|
||||
@@ -1,16 +1,18 @@
|
||||
import assert from 'node:assert'
|
||||
import groupBy from 'lodash/groupBy.js'
|
||||
import ignoreErrors from 'promise-toolbox/ignoreErrors'
|
||||
import { asyncMap } from '@xen-orchestra/async-map'
|
||||
import { decorateMethodsWith } from '@vates/decorate-with'
|
||||
import { defer } from 'golike-defer'
|
||||
import { formatDateTime } from '@xen-orchestra/xapi'
|
||||
'use strict'
|
||||
|
||||
import { getOldEntries } from '../../_getOldEntries.mjs'
|
||||
import { Task } from '../../Task.mjs'
|
||||
import { Abstract } from './_Abstract.mjs'
|
||||
const assert = require('assert')
|
||||
const groupBy = require('lodash/groupBy.js')
|
||||
const ignoreErrors = require('promise-toolbox/ignoreErrors')
|
||||
const { asyncMap } = require('@xen-orchestra/async-map')
|
||||
const { decorateMethodsWith } = require('@vates/decorate-with')
|
||||
const { defer } = require('golike-defer')
|
||||
const { formatDateTime } = require('@xen-orchestra/xapi')
|
||||
|
||||
export const AbstractXapi = class AbstractXapiVmBackupRunner extends Abstract {
|
||||
const { getOldEntries } = require('../../_getOldEntries.js')
|
||||
const { Task } = require('../../Task.js')
|
||||
const { Abstract } = require('./_Abstract.js')
|
||||
|
||||
class AbstractXapiVmBackupRunner extends Abstract {
|
||||
constructor({
|
||||
config,
|
||||
getSnapshotNameLabel,
|
||||
@@ -31,11 +33,6 @@ export const AbstractXapi = class AbstractXapiVmBackupRunner extends Abstract {
|
||||
throw new Error('cannot backup a VM created by this very job')
|
||||
}
|
||||
|
||||
const currentOperations = Object.values(vm.current_operations)
|
||||
if (currentOperations.some(_ => _ === 'migrate_send' || _ === 'pool_migrate')) {
|
||||
throw new Error('cannot backup a VM currently being migrated')
|
||||
}
|
||||
|
||||
this.config = config
|
||||
this.job = job
|
||||
this.remoteAdapters = remoteAdapters
|
||||
@@ -261,15 +258,7 @@ export const AbstractXapi = class AbstractXapiVmBackupRunner extends Abstract {
|
||||
}
|
||||
|
||||
if (this._writers.size !== 0) {
|
||||
const { pool_migrate = null, migrate_send = null } = this._exportedVm.blocked_operations
|
||||
|
||||
const reason = 'VM migration is blocked during backup'
|
||||
await this._exportedVm.update_blocked_operations({ pool_migrate: reason, migrate_send: reason })
|
||||
try {
|
||||
await this._copy()
|
||||
} finally {
|
||||
await this._exportedVm.update_blocked_operations({ pool_migrate, migrate_send })
|
||||
}
|
||||
await this._copy()
|
||||
}
|
||||
} finally {
|
||||
if (startAfter) {
|
||||
@@ -282,7 +271,8 @@ export const AbstractXapi = class AbstractXapiVmBackupRunner extends Abstract {
|
||||
await this._healthCheck()
|
||||
}
|
||||
}
|
||||
exports.AbstractXapi = AbstractXapiVmBackupRunner
|
||||
|
||||
decorateMethodsWith(AbstractXapi, {
|
||||
decorateMethodsWith(AbstractXapiVmBackupRunner, {
|
||||
run: defer,
|
||||
})
|
||||
@@ -0,0 +1,12 @@
|
||||
'use strict'
|
||||
|
||||
const { mapValues } = require('lodash')
|
||||
const { forkStreamUnpipe } = require('../_forkStreamUnpipe')
|
||||
|
||||
exports.forkDeltaExport = function forkDeltaExport(deltaExport) {
|
||||
return Object.create(deltaExport, {
|
||||
streams: {
|
||||
value: mapValues(deltaExport.streams, forkStreamUnpipe),
|
||||
},
|
||||
})
|
||||
}
|
||||
@@ -1,11 +0,0 @@
|
||||
import cloneDeep from 'lodash/cloneDeep.js'
|
||||
import mapValues from 'lodash/mapValues.js'
|
||||
|
||||
import { forkStreamUnpipe } from '../_forkStreamUnpipe.mjs'
|
||||
|
||||
export function forkDeltaExport(deltaExport) {
|
||||
const { streams, ...rest } = deltaExport
|
||||
const newMetadata = cloneDeep(rest)
|
||||
newMetadata.streams = mapValues(streams, forkStreamUnpipe)
|
||||
return newMetadata
|
||||
}
|
||||
@@ -1,11 +1,13 @@
|
||||
import { formatFilenameDate } from '../../_filenameDate.mjs'
|
||||
import { getOldEntries } from '../../_getOldEntries.mjs'
|
||||
import { Task } from '../../Task.mjs'
|
||||
'use strict'
|
||||
|
||||
import { MixinRemoteWriter } from './_MixinRemoteWriter.mjs'
|
||||
import { AbstractFullWriter } from './_AbstractFullWriter.mjs'
|
||||
const { formatFilenameDate } = require('../../_filenameDate.js')
|
||||
const { getOldEntries } = require('../../_getOldEntries.js')
|
||||
const { Task } = require('../../Task.js')
|
||||
|
||||
export class FullRemoteWriter extends MixinRemoteWriter(AbstractFullWriter) {
|
||||
const { MixinRemoteWriter } = require('./_MixinRemoteWriter.js')
|
||||
const { AbstractFullWriter } = require('./_AbstractFullWriter.js')
|
||||
|
||||
exports.FullRemoteWriter = class FullRemoteWriter extends MixinRemoteWriter(AbstractFullWriter) {
|
||||
constructor(props) {
|
||||
super(props)
|
||||
|
||||
@@ -24,7 +26,7 @@ export class FullRemoteWriter extends MixinRemoteWriter(AbstractFullWriter) {
|
||||
)
|
||||
}
|
||||
|
||||
async _run({ maxStreamLength, timestamp, sizeContainer, stream, streamLength, vm, vmSnapshot }) {
|
||||
async _run({ timestamp, sizeContainer, stream, vm, vmSnapshot }) {
|
||||
const settings = this._settings
|
||||
const job = this._job
|
||||
const scheduleId = this._scheduleId
|
||||
@@ -65,8 +67,6 @@ export class FullRemoteWriter extends MixinRemoteWriter(AbstractFullWriter) {
|
||||
|
||||
await Task.run({ name: 'transfer' }, async () => {
|
||||
await adapter.outputStream(dataFilename, stream, {
|
||||
maxStreamLength,
|
||||
streamLength,
|
||||
validator: tmpPath => adapter.isValidXva(tmpPath),
|
||||
})
|
||||
return { size: sizeContainer.size }
|
||||
@@ -1,16 +1,18 @@
|
||||
import ignoreErrors from 'promise-toolbox/ignoreErrors'
|
||||
import { asyncMap, asyncMapSettled } from '@xen-orchestra/async-map'
|
||||
import { formatDateTime } from '@xen-orchestra/xapi'
|
||||
'use strict'
|
||||
|
||||
import { formatFilenameDate } from '../../_filenameDate.mjs'
|
||||
import { getOldEntries } from '../../_getOldEntries.mjs'
|
||||
import { Task } from '../../Task.mjs'
|
||||
const ignoreErrors = require('promise-toolbox/ignoreErrors')
|
||||
const { asyncMap, asyncMapSettled } = require('@xen-orchestra/async-map')
|
||||
const { formatDateTime } = require('@xen-orchestra/xapi')
|
||||
|
||||
import { AbstractFullWriter } from './_AbstractFullWriter.mjs'
|
||||
import { MixinXapiWriter } from './_MixinXapiWriter.mjs'
|
||||
import { listReplicatedVms } from './_listReplicatedVms.mjs'
|
||||
const { formatFilenameDate } = require('../../_filenameDate.js')
|
||||
const { getOldEntries } = require('../../_getOldEntries.js')
|
||||
const { Task } = require('../../Task.js')
|
||||
|
||||
export class FullXapiWriter extends MixinXapiWriter(AbstractFullWriter) {
|
||||
const { AbstractFullWriter } = require('./_AbstractFullWriter.js')
|
||||
const { MixinXapiWriter } = require('./_MixinXapiWriter.js')
|
||||
const { listReplicatedVms } = require('./_listReplicatedVms.js')
|
||||
|
||||
exports.FullXapiWriter = class FullXapiWriter extends MixinXapiWriter(AbstractFullWriter) {
|
||||
constructor(props) {
|
||||
super(props)
|
||||
|
||||
@@ -1,28 +1,29 @@
|
||||
import assert from 'node:assert'
|
||||
import mapValues from 'lodash/mapValues.js'
|
||||
import ignoreErrors from 'promise-toolbox/ignoreErrors'
|
||||
import { asyncEach } from '@vates/async-each'
|
||||
import { asyncMap } from '@xen-orchestra/async-map'
|
||||
import { chainVhd, checkVhdChain, openVhd, VhdAbstract } from 'vhd-lib'
|
||||
import { createLogger } from '@xen-orchestra/log'
|
||||
import { decorateClass } from '@vates/decorate-with'
|
||||
import { defer } from 'golike-defer'
|
||||
import { dirname } from 'node:path'
|
||||
'use strict'
|
||||
|
||||
import { formatFilenameDate } from '../../_filenameDate.mjs'
|
||||
import { getOldEntries } from '../../_getOldEntries.mjs'
|
||||
import { TAG_BASE_DELTA } from '../../_incrementalVm.mjs'
|
||||
import { Task } from '../../Task.mjs'
|
||||
const assert = require('assert')
|
||||
const mapValues = require('lodash/mapValues.js')
|
||||
const ignoreErrors = require('promise-toolbox/ignoreErrors')
|
||||
const { asyncEach } = require('@vates/async-each')
|
||||
const { asyncMap } = require('@xen-orchestra/async-map')
|
||||
const { chainVhd, checkVhdChain, openVhd, VhdAbstract } = require('vhd-lib')
|
||||
const { createLogger } = require('@xen-orchestra/log')
|
||||
const { decorateClass } = require('@vates/decorate-with')
|
||||
const { defer } = require('golike-defer')
|
||||
const { dirname } = require('path')
|
||||
|
||||
import { MixinRemoteWriter } from './_MixinRemoteWriter.mjs'
|
||||
import { AbstractIncrementalWriter } from './_AbstractIncrementalWriter.mjs'
|
||||
import { checkVhd } from './_checkVhd.mjs'
|
||||
import { packUuid } from './_packUuid.mjs'
|
||||
import { Disposable } from 'promise-toolbox'
|
||||
const { formatFilenameDate } = require('../../_filenameDate.js')
|
||||
const { getOldEntries } = require('../../_getOldEntries.js')
|
||||
const { Task } = require('../../Task.js')
|
||||
|
||||
const { MixinRemoteWriter } = require('./_MixinRemoteWriter.js')
|
||||
const { AbstractIncrementalWriter } = require('./_AbstractIncrementalWriter.js')
|
||||
const { checkVhd } = require('./_checkVhd.js')
|
||||
const { packUuid } = require('./_packUuid.js')
|
||||
const { Disposable } = require('promise-toolbox')
|
||||
|
||||
const { warn } = createLogger('xo:backups:DeltaBackupWriter')
|
||||
|
||||
export class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrementalWriter) {
|
||||
class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrementalWriter) {
|
||||
async checkBaseVdis(baseUuidToSrcVdi) {
|
||||
const { handler } = this._adapter
|
||||
const adapter = this._adapter
|
||||
@@ -133,7 +134,7 @@ export class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrement
|
||||
}
|
||||
}
|
||||
|
||||
async _transfer($defer, { isVhdDifferencing, timestamp, deltaExport, vm, vmSnapshot }) {
|
||||
async _transfer($defer, { differentialVhds, timestamp, deltaExport, vm, vmSnapshot }) {
|
||||
const adapter = this._adapter
|
||||
const job = this._job
|
||||
const scheduleId = this._scheduleId
|
||||
@@ -161,7 +162,6 @@ export class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrement
|
||||
)
|
||||
|
||||
metadataContent = {
|
||||
isVhdDifferencing,
|
||||
jobId,
|
||||
mode: job.mode,
|
||||
scheduleId,
|
||||
@@ -181,9 +181,9 @@ export class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrement
|
||||
async ([id, vdi]) => {
|
||||
const path = `${this._vmBackupDir}/${vhds[id]}`
|
||||
|
||||
const isDifferencing = isVhdDifferencing[`${id}.vhd`]
|
||||
const isDelta = differentialVhds[`${id}.vhd`]
|
||||
let parentPath
|
||||
if (isDifferencing) {
|
||||
if (isDelta) {
|
||||
const vdiDir = dirname(path)
|
||||
parentPath = (
|
||||
await handler.list(vdiDir, {
|
||||
@@ -197,7 +197,7 @@ export class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrement
|
||||
assert.notStrictEqual(
|
||||
parentPath,
|
||||
undefined,
|
||||
`missing parent of ${id} in ${dirname(path)}, looking for ${vdi.other_config[TAG_BASE_DELTA]}`
|
||||
`missing parent of ${id} in ${dirname(path)}, looking for ${vdi.other_config['xo:base_delta']}`
|
||||
)
|
||||
|
||||
parentPath = parentPath.slice(1) // remove leading slash
|
||||
@@ -205,20 +205,16 @@ export class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrement
|
||||
// TODO remove when this has been done before the export
|
||||
await checkVhd(handler, parentPath)
|
||||
}
|
||||
|
||||
// don't write it as transferSize += await async function
|
||||
// since i += await asyncFun lead to race condition
|
||||
// as explained : https://eslint.org/docs/latest/rules/require-atomic-updates
|
||||
const transferSizeOneDisk = await adapter.writeVhd(path, deltaExport.streams[`${id}.vhd`], {
|
||||
|
||||
transferSize += await adapter.writeVhd(path, deltaExport.streams[`${id}.vhd`], {
|
||||
// no checksum for VHDs, because they will be invalidated by
|
||||
// merges and chainings
|
||||
checksum: false,
|
||||
validator: tmpPath => checkVhd(handler, tmpPath),
|
||||
writeBlockConcurrency: this._config.writeBlockConcurrency,
|
||||
})
|
||||
transferSize += transferSizeOneDisk
|
||||
|
||||
if (isDifferencing) {
|
||||
if (isDelta) {
|
||||
await chainVhd(handler, parentPath, handler, path)
|
||||
}
|
||||
|
||||
@@ -242,6 +238,6 @@ export class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrement
|
||||
// TODO: run cleanup?
|
||||
}
|
||||
}
|
||||
decorateClass(IncrementalRemoteWriter, {
|
||||
exports.IncrementalRemoteWriter = decorateClass(IncrementalRemoteWriter, {
|
||||
_transfer: defer,
|
||||
})
|
||||
@@ -1,18 +1,19 @@
|
||||
import { asyncMap, asyncMapSettled } from '@xen-orchestra/async-map'
|
||||
import ignoreErrors from 'promise-toolbox/ignoreErrors'
|
||||
import { formatDateTime } from '@xen-orchestra/xapi'
|
||||
'use strict'
|
||||
|
||||
import { formatFilenameDate } from '../../_filenameDate.mjs'
|
||||
import { getOldEntries } from '../../_getOldEntries.mjs'
|
||||
import { importIncrementalVm, TAG_BACKUP_SR, TAG_BASE_DELTA, TAG_COPY_SRC } from '../../_incrementalVm.mjs'
|
||||
import { Task } from '../../Task.mjs'
|
||||
const { asyncMap, asyncMapSettled } = require('@xen-orchestra/async-map')
|
||||
const ignoreErrors = require('promise-toolbox/ignoreErrors')
|
||||
const { formatDateTime } = require('@xen-orchestra/xapi')
|
||||
|
||||
import { AbstractIncrementalWriter } from './_AbstractIncrementalWriter.mjs'
|
||||
import { MixinXapiWriter } from './_MixinXapiWriter.mjs'
|
||||
import { listReplicatedVms } from './_listReplicatedVms.mjs'
|
||||
import find from 'lodash/find.js'
|
||||
const { formatFilenameDate } = require('../../_filenameDate.js')
|
||||
const { getOldEntries } = require('../../_getOldEntries.js')
|
||||
const { importIncrementalVm, TAG_COPY_SRC } = require('../../_incrementalVm.js')
|
||||
const { Task } = require('../../Task.js')
|
||||
|
||||
export class IncrementalXapiWriter extends MixinXapiWriter(AbstractIncrementalWriter) {
|
||||
const { AbstractIncrementalWriter } = require('./_AbstractIncrementalWriter.js')
|
||||
const { MixinXapiWriter } = require('./_MixinXapiWriter.js')
|
||||
const { listReplicatedVms } = require('./_listReplicatedVms.js')
|
||||
|
||||
exports.IncrementalXapiWriter = class IncrementalXapiWriter extends MixinXapiWriter(AbstractIncrementalWriter) {
|
||||
async checkBaseVdis(baseUuidToSrcVdi, baseVm) {
|
||||
const sr = this._sr
|
||||
const replicatedVm = listReplicatedVms(sr.$xapi, this._job.id, sr.uuid, this._vmUuid).find(
|
||||
@@ -82,54 +83,6 @@ export class IncrementalXapiWriter extends MixinXapiWriter(AbstractIncrementalWr
|
||||
return asyncMapSettled(this._oldEntries, vm => vm.$destroy())
|
||||
}
|
||||
|
||||
#decorateVmMetadata(backup) {
|
||||
const { _warmMigration } = this._settings
|
||||
const sr = this._sr
|
||||
const xapi = sr.$xapi
|
||||
const vm = backup.vm
|
||||
vm.other_config[TAG_COPY_SRC] = vm.uuid
|
||||
const remoteBaseVmUuid = vm.other_config[TAG_BASE_DELTA]
|
||||
let baseVm
|
||||
if (remoteBaseVmUuid) {
|
||||
baseVm = find(
|
||||
xapi.objects.all,
|
||||
obj => (obj = obj.other_config) && obj[TAG_COPY_SRC] === remoteBaseVmUuid && obj[TAG_BACKUP_SR] === sr.$id
|
||||
)
|
||||
|
||||
if (!baseVm) {
|
||||
throw new Error(`could not find the base VM (copy of ${remoteBaseVmUuid})`)
|
||||
}
|
||||
}
|
||||
const baseVdis = {}
|
||||
baseVm?.$VBDs.forEach(vbd => {
|
||||
const vdi = vbd.$VDI
|
||||
if (vdi !== undefined) {
|
||||
baseVdis[vbd.VDI] = vbd.$VDI
|
||||
}
|
||||
})
|
||||
|
||||
vm.other_config[TAG_COPY_SRC] = vm.uuid
|
||||
if (!_warmMigration) {
|
||||
vm.tags.push('Continuous Replication')
|
||||
}
|
||||
|
||||
Object.values(backup.vdis).forEach(vdi => {
|
||||
vdi.other_config[TAG_COPY_SRC] = vdi.uuid
|
||||
vdi.SR = sr.$ref
|
||||
// vdi.other_config[TAG_BASE_DELTA] is never defined on a suspend vdi
|
||||
if (vdi.other_config[TAG_BASE_DELTA]) {
|
||||
const remoteBaseVdiUuid = vdi.other_config[TAG_BASE_DELTA]
|
||||
const baseVdi = find(baseVdis, vdi => vdi.other_config[TAG_COPY_SRC] === remoteBaseVdiUuid)
|
||||
if (!baseVdi) {
|
||||
throw new Error(`missing base VDI (copy of ${remoteBaseVdiUuid})`)
|
||||
}
|
||||
vdi.baseVdi = baseVdi
|
||||
}
|
||||
})
|
||||
|
||||
return backup
|
||||
}
|
||||
|
||||
async _transfer({ timestamp, deltaExport, sizeContainers, vm }) {
|
||||
const { _warmMigration } = this._settings
|
||||
const sr = this._sr
|
||||
@@ -140,7 +93,16 @@ export class IncrementalXapiWriter extends MixinXapiWriter(AbstractIncrementalWr
|
||||
|
||||
let targetVmRef
|
||||
await Task.run({ name: 'transfer' }, async () => {
|
||||
targetVmRef = await importIncrementalVm(this.#decorateVmMetadata(deltaExport), sr)
|
||||
targetVmRef = await importIncrementalVm(
|
||||
{
|
||||
__proto__: deltaExport,
|
||||
vm: {
|
||||
...deltaExport.vm,
|
||||
tags: _warmMigration ? deltaExport.vm.tags : [...deltaExport.vm.tags, 'Continuous Replication'],
|
||||
},
|
||||
},
|
||||
sr
|
||||
)
|
||||
return {
|
||||
size: Object.values(sizeContainers).reduce((sum, { size }) => sum + size, 0),
|
||||
}
|
||||
@@ -161,13 +123,13 @@ export class IncrementalXapiWriter extends MixinXapiWriter(AbstractIncrementalWr
|
||||
)
|
||||
),
|
||||
targetVm.update_other_config({
|
||||
[TAG_BACKUP_SR]: srUuid,
|
||||
'xo:backup:sr': srUuid,
|
||||
|
||||
// these entries need to be added in case of offline backup
|
||||
'xo:backup:datetime': formatDateTime(timestamp),
|
||||
'xo:backup:job': job.id,
|
||||
'xo:backup:schedule': scheduleId,
|
||||
[TAG_BASE_DELTA]: vm.uuid,
|
||||
'xo:backup:vm': vm.uuid,
|
||||
}),
|
||||
])
|
||||
}
|
||||
@@ -0,0 +1,14 @@
|
||||
'use strict'
|
||||
|
||||
const { AbstractWriter } = require('./_AbstractWriter.js')
|
||||
|
||||
exports.AbstractFullWriter = class AbstractFullWriter extends AbstractWriter {
|
||||
async run({ timestamp, sizeContainer, stream, vm, vmSnapshot }) {
|
||||
try {
|
||||
return await this._run({ timestamp, sizeContainer, stream, vm, vmSnapshot })
|
||||
} finally {
|
||||
// ensure stream is properly closed
|
||||
stream.destroy()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,12 +0,0 @@
|
||||
import { AbstractWriter } from './_AbstractWriter.mjs'
|
||||
|
||||
export class AbstractFullWriter extends AbstractWriter {
|
||||
async run({ maxStreamLength, timestamp, sizeContainer, stream, streamLength, vm, vmSnapshot }) {
|
||||
try {
|
||||
return await this._run({ maxStreamLength, timestamp, sizeContainer, stream, streamLength, vm, vmSnapshot })
|
||||
} finally {
|
||||
// ensure stream is properly closed
|
||||
stream.destroy()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,8 @@
|
||||
import { AbstractWriter } from './_AbstractWriter.mjs'
|
||||
'use strict'
|
||||
|
||||
export class AbstractIncrementalWriter extends AbstractWriter {
|
||||
const { AbstractWriter } = require('./_AbstractWriter.js')
|
||||
|
||||
exports.AbstractIncrementalWriter = class AbstractIncrementalWriter extends AbstractWriter {
|
||||
checkBaseVdis(baseUuidToSrcVdi, baseVm) {
|
||||
throw new Error('Not implemented')
|
||||
}
|
||||
@@ -1,7 +1,9 @@
|
||||
import { formatFilenameDate } from '../../_filenameDate.mjs'
|
||||
import { getVmBackupDir } from '../../_getVmBackupDir.mjs'
|
||||
'use strict'
|
||||
|
||||
export class AbstractWriter {
|
||||
const { formatFilenameDate } = require('../../_filenameDate')
|
||||
const { getVmBackupDir } = require('../../_getVmBackupDir')
|
||||
|
||||
exports.AbstractWriter = class AbstractWriter {
|
||||
constructor({ config, healthCheckSr, job, vmUuid, scheduleId, settings }) {
|
||||
this._config = config
|
||||
this._healthCheckSr = healthCheckSr
|
||||
@@ -1,17 +1,19 @@
|
||||
import { createLogger } from '@xen-orchestra/log'
|
||||
import { join } from 'node:path'
|
||||
import assert from 'node:assert'
|
||||
'use strict'
|
||||
|
||||
import { formatFilenameDate } from '../../_filenameDate.mjs'
|
||||
import { getVmBackupDir } from '../../_getVmBackupDir.mjs'
|
||||
import { HealthCheckVmBackup } from '../../HealthCheckVmBackup.mjs'
|
||||
import { ImportVmBackup } from '../../ImportVmBackup.mjs'
|
||||
import { Task } from '../../Task.mjs'
|
||||
import * as MergeWorker from '../../merge-worker/index.mjs'
|
||||
const { createLogger } = require('@xen-orchestra/log')
|
||||
const { join } = require('path')
|
||||
|
||||
const assert = require('assert')
|
||||
const { formatFilenameDate } = require('../../_filenameDate.js')
|
||||
const { getVmBackupDir } = require('../../_getVmBackupDir.js')
|
||||
const { HealthCheckVmBackup } = require('../../HealthCheckVmBackup.js')
|
||||
const { ImportVmBackup } = require('../../ImportVmBackup.js')
|
||||
const { Task } = require('../../Task.js')
|
||||
const MergeWorker = require('../../merge-worker/index.js')
|
||||
|
||||
const { info, warn } = createLogger('xo:backups:MixinBackupWriter')
|
||||
|
||||
export const MixinRemoteWriter = (BaseClass = Object) =>
|
||||
exports.MixinRemoteWriter = (BaseClass = Object) =>
|
||||
class MixinRemoteWriter extends BaseClass {
|
||||
#lock
|
||||
|
||||
@@ -96,9 +98,6 @@ export const MixinRemoteWriter = (BaseClass = Object) =>
|
||||
metadata,
|
||||
srUuid,
|
||||
xapi,
|
||||
settings: {
|
||||
additionnalVmTag: 'xo:no-bak=Health Check',
|
||||
},
|
||||
}).run()
|
||||
const restoredVm = xapi.getObject(restoredId)
|
||||
try {
|
||||
@@ -1,10 +1,12 @@
|
||||
import { extractOpaqueRef } from '@xen-orchestra/xapi'
|
||||
import assert from 'node:assert/strict'
|
||||
'use strict'
|
||||
|
||||
import { HealthCheckVmBackup } from '../../HealthCheckVmBackup.mjs'
|
||||
import { Task } from '../../Task.mjs'
|
||||
const { extractOpaqueRef } = require('@xen-orchestra/xapi')
|
||||
|
||||
export const MixinXapiWriter = (BaseClass = Object) =>
|
||||
const { Task } = require('../../Task')
|
||||
const assert = require('node:assert/strict')
|
||||
const { HealthCheckVmBackup } = require('../../HealthCheckVmBackup')
|
||||
|
||||
exports.MixinXapiWriter = (BaseClass = Object) =>
|
||||
class MixinXapiWriter extends BaseClass {
|
||||
constructor({ sr, ...rest }) {
|
||||
super(rest)
|
||||
@@ -18,7 +20,7 @@ export const MixinXapiWriter = (BaseClass = Object) =>
|
||||
const vdiRefs = await xapi.VM_getDisks(baseVm.$ref)
|
||||
for (const vdiRef of vdiRefs) {
|
||||
const vdi = xapi.getObject(vdiRef)
|
||||
if (vdi.$SR.uuid !== this._healthCheckSr.uuid) {
|
||||
if (vdi.$SR.uuid !== this._heathCheckSr.uuid) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
@@ -58,7 +60,7 @@ export const MixinXapiWriter = (BaseClass = Object) =>
|
||||
)
|
||||
}
|
||||
const healthCheckVm = xapi.getObject(healthCheckVmRef) ?? (await xapi.waitObject(healthCheckVmRef))
|
||||
await healthCheckVm.add_tag('xo:no-bak=Health Check')
|
||||
|
||||
await new HealthCheckVmBackup({
|
||||
restoredVm: healthCheckVm,
|
||||
xapi,
|
||||
8
@xen-orchestra/backups/_runners/_writers/_checkVhd.js
Normal file
8
@xen-orchestra/backups/_runners/_writers/_checkVhd.js
Normal file
@@ -0,0 +1,8 @@
|
||||
'use strict'
|
||||
|
||||
const openVhd = require('vhd-lib').openVhd
|
||||
const Disposable = require('promise-toolbox/Disposable')
|
||||
|
||||
exports.checkVhd = async function checkVhd(handler, path) {
|
||||
await Disposable.use(openVhd(handler, path), () => {})
|
||||
}
|
||||
@@ -1,6 +0,0 @@
|
||||
import { openVhd } from 'vhd-lib'
|
||||
import Disposable from 'promise-toolbox/Disposable'
|
||||
|
||||
export async function checkVhd(handler, path) {
|
||||
await Disposable.use(openVhd(handler, path), () => {})
|
||||
}
|
||||
@@ -1,3 +1,5 @@
|
||||
'use strict'
|
||||
|
||||
const getReplicatedVmDatetime = vm => {
|
||||
const { 'xo:backup:datetime': datetime = vm.name_label.slice(-17, -1) } = vm.other_config
|
||||
return datetime
|
||||
@@ -5,7 +7,7 @@ const getReplicatedVmDatetime = vm => {
|
||||
|
||||
const compareReplicatedVmDatetime = (a, b) => (getReplicatedVmDatetime(a) < getReplicatedVmDatetime(b) ? -1 : 1)
|
||||
|
||||
export function listReplicatedVms(xapi, scheduleOrJobId, srUuid, vmUuid) {
|
||||
exports.listReplicatedVms = function listReplicatedVms(xapi, scheduleOrJobId, srUuid, vmUuid) {
|
||||
const { all } = xapi.objects
|
||||
const vms = {}
|
||||
for (const key in all) {
|
||||
@@ -1,5 +1,7 @@
|
||||
'use strict'
|
||||
|
||||
const PARSE_UUID_RE = /-/g
|
||||
|
||||
export function packUuid(uuid) {
|
||||
exports.packUuid = function packUuid(uuid) {
|
||||
return Buffer.from(uuid.replace(PARSE_UUID_RE, ''), 'hex')
|
||||
}
|
||||
@@ -1,4 +1,6 @@
|
||||
export function watchStreamSize(stream, container = { size: 0 }) {
|
||||
'use strict'
|
||||
|
||||
exports.watchStreamSize = function watchStreamSize(stream, container = { size: 0 }) {
|
||||
stream.on('data', data => {
|
||||
container.size += data.length
|
||||
})
|
||||
@@ -221,7 +221,7 @@ For multiple objects:
|
||||
|
||||
### Settings
|
||||
|
||||
Settings are described in [`@xen-orchestra/backups/\_runners/VmsXapi.mjs``](https://github.com/vatesfr/xen-orchestra/blob/master/%40xen-orchestra/backups/_runners/VmsXapi.mjs).
|
||||
Settings are described in [`@xen-orchestra/backups/Backup.js](https://github.com/vatesfr/xen-orchestra/blob/master/%40xen-orchestra/backups/Backup.js).
|
||||
|
||||
## Writer API
|
||||
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
export function extractIdsFromSimplePattern(pattern) {
|
||||
'use strict'
|
||||
|
||||
exports.extractIdsFromSimplePattern = function extractIdsFromSimplePattern(pattern) {
|
||||
if (pattern === undefined) {
|
||||
return []
|
||||
}
|
||||
@@ -1,9 +1,9 @@
|
||||
import mapValues from 'lodash/mapValues.js'
|
||||
import { dirname } from 'node:path'
|
||||
'use strict'
|
||||
|
||||
const mapValues = require('lodash/mapValues.js')
|
||||
const { dirname } = require('path')
|
||||
|
||||
function formatVmBackup(backup) {
|
||||
const { isVhdDifferencing } = backup
|
||||
|
||||
return {
|
||||
disks:
|
||||
backup.vhds === undefined
|
||||
@@ -27,14 +27,10 @@ function formatVmBackup(backup) {
|
||||
name_description: backup.vm.name_description,
|
||||
name_label: backup.vm.name_label,
|
||||
},
|
||||
|
||||
// isVhdDifferencing is either undefined or an object
|
||||
differencingVhds: isVhdDifferencing && Object.values(isVhdDifferencing).filter(t => t).length,
|
||||
dynamicVhds: isVhdDifferencing && Object.values(isVhdDifferencing).filter(t => !t).length,
|
||||
}
|
||||
}
|
||||
|
||||
// format all backups as returned by RemoteAdapter#listAllVmBackups()
|
||||
export function formatVmBackups(backupsByVM) {
|
||||
exports.formatVmBackups = function formatVmBackups(backupsByVM) {
|
||||
return mapValues(backupsByVM, backups => backups.map(formatVmBackup))
|
||||
}
|
||||
92
@xen-orchestra/backups/merge-worker/cli.js
Executable file
92
@xen-orchestra/backups/merge-worker/cli.js
Executable file
@@ -0,0 +1,92 @@
|
||||
#!/usr/bin/env node
|
||||
// eslint-disable-next-line eslint-comments/disable-enable-pair
|
||||
/* eslint-disable n/shebang */
|
||||
|
||||
'use strict'
|
||||
|
||||
const { catchGlobalErrors } = require('@xen-orchestra/log/configure')
|
||||
const { createLogger } = require('@xen-orchestra/log')
|
||||
const { getSyncedHandler } = require('@xen-orchestra/fs')
|
||||
const { join } = require('path')
|
||||
const Disposable = require('promise-toolbox/Disposable')
|
||||
const min = require('lodash/min')
|
||||
|
||||
const { getVmBackupDir } = require('../_getVmBackupDir.js')
|
||||
const { RemoteAdapter } = require('../RemoteAdapter.js')
|
||||
|
||||
const { CLEAN_VM_QUEUE } = require('./index.js')
|
||||
|
||||
// -------------------------------------------------------------------
|
||||
|
||||
catchGlobalErrors(createLogger('xo:backups:mergeWorker'))
|
||||
|
||||
const { fatal, info, warn } = createLogger('xo:backups:mergeWorker')
|
||||
|
||||
// -------------------------------------------------------------------
|
||||
|
||||
const main = Disposable.wrap(async function* main(args) {
|
||||
const handler = yield getSyncedHandler({ url: 'file://' + process.cwd() })
|
||||
|
||||
yield handler.lock(CLEAN_VM_QUEUE)
|
||||
|
||||
const adapter = new RemoteAdapter(handler)
|
||||
|
||||
const listRetry = async () => {
|
||||
const timeoutResolver = resolve => setTimeout(resolve, 10e3)
|
||||
for (let i = 0; i < 10; ++i) {
|
||||
const entries = await handler.list(CLEAN_VM_QUEUE)
|
||||
if (entries.length !== 0) {
|
||||
return entries
|
||||
}
|
||||
await new Promise(timeoutResolver)
|
||||
}
|
||||
}
|
||||
|
||||
let taskFiles
|
||||
while ((taskFiles = await listRetry()) !== undefined) {
|
||||
const taskFileBasename = min(taskFiles)
|
||||
const previousTaskFile = join(CLEAN_VM_QUEUE, taskFileBasename)
|
||||
const taskFile = join(CLEAN_VM_QUEUE, '_' + taskFileBasename)
|
||||
|
||||
// move this task to the end
|
||||
try {
|
||||
await handler.rename(previousTaskFile, taskFile)
|
||||
} catch (error) {
|
||||
// this error occurs if the task failed too many times (i.e. too many `_` prefixes)
|
||||
// there is nothing more that can be done
|
||||
if (error.code === 'ENAMETOOLONG') {
|
||||
await handler.unlink(previousTaskFile)
|
||||
}
|
||||
|
||||
throw error
|
||||
}
|
||||
|
||||
try {
|
||||
const vmDir = getVmBackupDir(String(await handler.readFile(taskFile)))
|
||||
try {
|
||||
await adapter.cleanVm(vmDir, { merge: true, logInfo: info, logWarn: warn, remove: true })
|
||||
} catch (error) {
|
||||
// consider the clean successful if the VM dir is missing
|
||||
if (error.code !== 'ENOENT') {
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
handler.unlink(taskFile).catch(error => warn('deleting task failure', { error }))
|
||||
} catch (error) {
|
||||
warn('failure handling task', { error })
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
info('starting')
|
||||
main(process.argv.slice(2)).then(
|
||||
() => {
|
||||
info('bye :-)')
|
||||
},
|
||||
error => {
|
||||
fatal(error)
|
||||
|
||||
process.exit(1)
|
||||
}
|
||||
)
|
||||
@@ -1,103 +0,0 @@
|
||||
#!/usr/bin/env node
|
||||
// eslint-disable-next-line eslint-comments/disable-enable-pair
|
||||
/* eslint-disable n/shebang */
|
||||
|
||||
import { asyncEach } from '@vates/async-each'
|
||||
import { catchGlobalErrors } from '@xen-orchestra/log/configure'
|
||||
import { createLogger } from '@xen-orchestra/log'
|
||||
import { getSyncedHandler } from '@xen-orchestra/fs'
|
||||
import { join } from 'node:path'
|
||||
import { load as loadConfig } from 'app-conf'
|
||||
import Disposable from 'promise-toolbox/Disposable'
|
||||
|
||||
import { getVmBackupDir } from '../_getVmBackupDir.mjs'
|
||||
import { RemoteAdapter } from '../RemoteAdapter.mjs'
|
||||
|
||||
import { CLEAN_VM_QUEUE } from './index.mjs'
|
||||
|
||||
const APP_NAME = 'xo-merge-worker'
|
||||
const APP_DIR = new URL('.', import.meta.url).pathname
|
||||
// -------------------------------------------------------------------
|
||||
|
||||
catchGlobalErrors(createLogger('xo:backups:mergeWorker'))
|
||||
|
||||
const { fatal, info, warn } = createLogger('xo:backups:mergeWorker')
|
||||
|
||||
// -------------------------------------------------------------------
|
||||
|
||||
const main = Disposable.wrap(async function* main(args) {
|
||||
const handler = yield getSyncedHandler({ url: 'file://' + process.cwd() })
|
||||
|
||||
yield handler.lock(CLEAN_VM_QUEUE)
|
||||
|
||||
const adapter = new RemoteAdapter(handler)
|
||||
|
||||
const listRetry = async () => {
|
||||
const timeoutResolver = resolve => setTimeout(resolve, 10e3)
|
||||
for (let i = 0; i < 10; ++i) {
|
||||
const entries = await handler.list(CLEAN_VM_QUEUE)
|
||||
if (entries.length !== 0) {
|
||||
entries.sort()
|
||||
return entries
|
||||
}
|
||||
await new Promise(timeoutResolver)
|
||||
}
|
||||
}
|
||||
|
||||
let taskFiles
|
||||
while ((taskFiles = await listRetry()) !== undefined) {
|
||||
const { concurrency } = await loadConfig(APP_NAME, {
|
||||
appDir: APP_DIR,
|
||||
ignoreUnknownFormats: true,
|
||||
})
|
||||
await asyncEach(
|
||||
taskFiles,
|
||||
async taskFileBasename => {
|
||||
const previousTaskFile = join(CLEAN_VM_QUEUE, taskFileBasename)
|
||||
const taskFile = join(CLEAN_VM_QUEUE, '_' + taskFileBasename)
|
||||
|
||||
// move this task to the end
|
||||
try {
|
||||
await handler.rename(previousTaskFile, taskFile)
|
||||
} catch (error) {
|
||||
// this error occurs if the task failed too many times (i.e. too many `_` prefixes)
|
||||
// there is nothing more that can be done
|
||||
if (error.code === 'ENAMETOOLONG') {
|
||||
await handler.unlink(previousTaskFile)
|
||||
}
|
||||
|
||||
throw error
|
||||
}
|
||||
|
||||
try {
|
||||
const vmDir = getVmBackupDir(String(await handler.readFile(taskFile)))
|
||||
try {
|
||||
await adapter.cleanVm(vmDir, { merge: true, logInfo: info, logWarn: warn, remove: true })
|
||||
} catch (error) {
|
||||
// consider the clean successful if the VM dir is missing
|
||||
if (error.code !== 'ENOENT') {
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
handler.unlink(taskFile).catch(error => warn('deleting task failure', { error }))
|
||||
} catch (error) {
|
||||
warn('failure handling task', { error })
|
||||
}
|
||||
},
|
||||
{ concurrency }
|
||||
)
|
||||
}
|
||||
})
|
||||
|
||||
info('starting')
|
||||
main(process.argv.slice(2)).then(
|
||||
() => {
|
||||
info('bye :-)')
|
||||
},
|
||||
error => {
|
||||
fatal(error)
|
||||
|
||||
process.exit(1)
|
||||
}
|
||||
)
|
||||
@@ -1 +0,0 @@
|
||||
concurrency = 1
|
||||
@@ -1,12 +1,13 @@
|
||||
import { join } from 'node:path'
|
||||
import { spawn } from 'child_process'
|
||||
import { check } from 'proper-lockfile'
|
||||
'use strict'
|
||||
|
||||
export const CLEAN_VM_QUEUE = '/xo-vm-backups/.queue/clean-vm/'
|
||||
const { join, resolve } = require('path')
|
||||
const { spawn } = require('child_process')
|
||||
const { check } = require('proper-lockfile')
|
||||
|
||||
const CLI_PATH = new URL('cli.mjs', import.meta.url).pathname
|
||||
const CLEAN_VM_QUEUE = (exports.CLEAN_VM_QUEUE = '/xo-vm-backups/.queue/clean-vm/')
|
||||
|
||||
export const run = async function runMergeWorker(remotePath) {
|
||||
const CLI_PATH = resolve(__dirname, 'cli.js')
|
||||
exports.run = async function runMergeWorker(remotePath) {
|
||||
try {
|
||||
// TODO: find a way to pass the acquire the lock and then pass it down the worker
|
||||
if (await check(join(remotePath, CLEAN_VM_QUEUE))) {
|
||||
@@ -8,33 +8,32 @@
|
||||
"type": "git",
|
||||
"url": "https://github.com/vatesfr/xen-orchestra.git"
|
||||
},
|
||||
"version": "0.44.3",
|
||||
"version": "0.39.0",
|
||||
"engines": {
|
||||
"node": ">=14.18"
|
||||
},
|
||||
"scripts": {
|
||||
"postversion": "npm publish --access public",
|
||||
"test-integration": "node--test *.integ.mjs"
|
||||
"test-integration": "node--test *.integ.js"
|
||||
},
|
||||
"dependencies": {
|
||||
"@iarna/toml": "^2.2.5",
|
||||
"@kldzj/stream-throttle": "^1.1.1",
|
||||
"@vates/async-each": "^1.0.0",
|
||||
"@vates/cached-dns.lookup": "^1.0.0",
|
||||
"@vates/compose": "^2.1.0",
|
||||
"@vates/decorate-with": "^2.0.0",
|
||||
"@vates/disposable": "^0.1.5",
|
||||
"@vates/fuse-vhd": "^2.0.0",
|
||||
"@vates/nbd-client": "^3.0.0",
|
||||
"@vates/disposable": "^0.1.4",
|
||||
"@vates/fuse-vhd": "^1.0.0",
|
||||
"@vates/nbd-client": "^1.2.1",
|
||||
"@vates/parse-duration": "^0.1.1",
|
||||
"@xen-orchestra/async-map": "^0.1.2",
|
||||
"@xen-orchestra/fs": "^4.1.3",
|
||||
"@xen-orchestra/fs": "^4.0.1",
|
||||
"@xen-orchestra/log": "^0.6.0",
|
||||
"@xen-orchestra/template": "^0.1.0",
|
||||
"app-conf": "^2.3.0",
|
||||
"compare-versions": "^6.0.0",
|
||||
"d3-time-format": "^4.1.0",
|
||||
"compare-versions": "^5.0.1",
|
||||
"d3-time-format": "^3.0.0",
|
||||
"decorator-synchronized": "^0.6.0",
|
||||
"fs-extra": "^11.1.0",
|
||||
"golike-defer": "^0.5.1",
|
||||
"limit-concurrency-decorator": "^0.5.0",
|
||||
"lodash": "^4.17.20",
|
||||
@@ -42,21 +41,19 @@
|
||||
"parse-pairs": "^2.0.0",
|
||||
"promise-toolbox": "^0.21.0",
|
||||
"proper-lockfile": "^4.1.2",
|
||||
"tar": "^6.1.15",
|
||||
"uuid": "^9.0.0",
|
||||
"vhd-lib": "^4.8.0",
|
||||
"xen-api": "^2.0.0",
|
||||
"vhd-lib": "^4.5.0",
|
||||
"xen-api": "^1.3.3",
|
||||
"yazl": "^2.5.1"
|
||||
},
|
||||
"devDependencies": {
|
||||
"fs-extra": "^11.1.0",
|
||||
"rimraf": "^5.0.1",
|
||||
"sinon": "^17.0.1",
|
||||
"sinon": "^15.0.1",
|
||||
"test": "^3.2.1",
|
||||
"tmp": "^0.2.1"
|
||||
},
|
||||
"peerDependencies": {
|
||||
"@xen-orchestra/xapi": "^4.1.0"
|
||||
"@xen-orchestra/xapi": "^2.2.1"
|
||||
},
|
||||
"license": "AGPL-3.0-or-later",
|
||||
"author": {
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user