Compare commits

..

1 Commits

Author SHA1 Message Date
Thierry
677a9c958c feat(lite/component): add support for nested modals 2023-07-04 08:59:16 +02:00
728 changed files with 11719 additions and 29672 deletions

View File

@@ -68,11 +68,6 @@ module.exports = {
'no-console': ['error', { allow: ['warn', 'error'] }],
// this rule can prevent race condition bugs like parallel `a += await foo()`
//
// as it has a lots of false positive, it is only enabled as a warning for now
'require-atomic-updates': 'warn',
strict: 'error',
},
}

View File

@@ -1,3 +0,0 @@
@xen-orchestra/web
@xen-orchestra/web-core
@xen-orchestra/web-lite

View File

@@ -1,11 +1,8 @@
'use strict'
module.exports = {
arrowParens: 'avoid',
jsxSingleQuote: true,
semi: false,
singleQuote: true,
trailingComma: 'es5',
// 2020-11-24: Requested by nraynaud and approved by the rest of the team
//

View File

@@ -33,7 +33,7 @@
"test": "node--test"
},
"devDependencies": {
"sinon": "^17.0.1",
"sinon": "^15.0.1",
"tap": "^16.3.0",
"test": "^3.2.1"
}

View File

@@ -13,15 +13,12 @@ describe('decorateWith', () => {
const expectedFn = Function.prototype
const newFn = () => {}
const decorator = decorateWith(
function wrapper(fn, ...args) {
assert.deepStrictEqual(fn, expectedFn)
assert.deepStrictEqual(args, expectedArgs)
const decorator = decorateWith(function wrapper(fn, ...args) {
assert.deepStrictEqual(fn, expectedFn)
assert.deepStrictEqual(args, expectedArgs)
return newFn
},
...expectedArgs
)
return newFn
}, ...expectedArgs)
const descriptor = {
configurable: true,

View File

@@ -14,7 +14,7 @@
"url": "https://vates.fr"
},
"license": "ISC",
"version": "0.1.5",
"version": "0.1.4",
"engines": {
"node": ">=8.10"
},
@@ -23,13 +23,13 @@
"test": "node--test"
},
"dependencies": {
"@vates/multi-key-map": "^0.2.0",
"@vates/multi-key-map": "^0.1.0",
"@xen-orchestra/async-map": "^0.1.2",
"@xen-orchestra/log": "^0.6.0",
"ensure-array": "^1.0.0"
},
"devDependencies": {
"sinon": "^17.0.1",
"sinon": "^15.0.1",
"test": "^3.2.1"
}
}

View File

@@ -1,7 +1,9 @@
import LRU from 'lru-cache'
import Fuse from 'fuse-native'
import { VhdSynthetic } from 'vhd-lib'
import { Disposable, fromCallback } from 'promise-toolbox'
'use strict'
const LRU = require('lru-cache')
const Fuse = require('fuse-native')
const { VhdSynthetic } = require('vhd-lib')
const { Disposable, fromCallback } = require('promise-toolbox')
// build a s stat object from https://github.com/fuse-friends/fuse-native/blob/master/test/fixtures/stat.js
const stat = st => ({
@@ -14,7 +16,7 @@ const stat = st => ({
gid: st.gid !== undefined ? st.gid : process.getgid(),
})
export const mount = Disposable.factory(async function* mount(handler, diskPath, mountDir) {
exports.mount = Disposable.factory(async function* mount(handler, diskPath, mountDir) {
const vhd = yield VhdSynthetic.fromVhdChain(handler, diskPath)
const cache = new LRU({

View File

@@ -1,6 +1,6 @@
{
"name": "@vates/fuse-vhd",
"version": "2.0.0",
"version": "1.0.0",
"license": "ISC",
"private": false,
"homepage": "https://github.com/vatesfr/xen-orchestra/tree/master/@vates/fuse-vhd",
@@ -15,14 +15,13 @@
"url": "https://vates.fr"
},
"engines": {
"node": ">=14"
"node": ">=10.0"
},
"main": "./index.mjs",
"dependencies": {
"fuse-native": "^2.2.6",
"lru-cache": "^7.14.0",
"promise-toolbox": "^0.21.0",
"vhd-lib": "^4.8.0"
"vhd-lib": "^4.5.0"
},
"scripts": {
"postversion": "npm publish --access public"

View File

@@ -17,14 +17,4 @@ map.get(['foo', 'bar']) // 2
map.get(['bar', 'foo']) // 3
map.get([OBJ]) // 4
map.get([{}]) // undefined
map.delete([])
for (const [key, value] of map.entries() {
console.log(key, value)
}
for (const value of map.values()) {
console.log(value)
}
```

View File

@@ -35,16 +35,6 @@ map.get(['foo', 'bar']) // 2
map.get(['bar', 'foo']) // 3
map.get([OBJ]) // 4
map.get([{}]) // undefined
map.delete([])
for (const [key, value] of map.entries() {
console.log(key, value)
}
for (const value of map.values()) {
console.log(value)
}
```
## Contributions

View File

@@ -36,31 +36,14 @@ function del(node, i, keys) {
return node
}
function* entries(node, key) {
if (node !== undefined) {
if (node instanceof Node) {
const { value } = node
if (value !== undefined) {
yield [key, node.value]
}
for (const [childKey, child] of node.children.entries()) {
yield* entries(child, key.concat(childKey))
}
} else {
yield [key, node]
}
}
}
function get(node, i, keys) {
return i === keys.length
? node instanceof Node
? node.value
: node
: node instanceof Node
? get(node.children.get(keys[i]), i + 1, keys)
: undefined
? get(node.children.get(keys[i]), i + 1, keys)
: undefined
}
function set(node, i, keys, value) {
@@ -86,22 +69,6 @@ function set(node, i, keys, value) {
return node
}
function* values(node) {
if (node !== undefined) {
if (node instanceof Node) {
const { value } = node
if (value !== undefined) {
yield node.value
}
for (const child of node.children.values()) {
yield* values(child)
}
} else {
yield node
}
}
}
exports.MultiKeyMap = class MultiKeyMap {
constructor() {
// each node is either a value or a Node if it contains children
@@ -112,10 +79,6 @@ exports.MultiKeyMap = class MultiKeyMap {
this._root = del(this._root, 0, keys)
}
entries() {
return entries(this._root, [])
}
get(keys) {
return get(this._root, 0, keys)
}
@@ -123,8 +86,4 @@ exports.MultiKeyMap = class MultiKeyMap {
set(keys, value) {
this._root = set(this._root, 0, keys, value)
}
values() {
return values(this._root)
}
}

View File

@@ -19,7 +19,7 @@ describe('MultiKeyMap', () => {
// reverse composite key
['bar', 'foo'],
]
const values = keys.map(() => Math.random())
const values = keys.map(() => ({}))
// set all values first to make sure they are all stored and not only the
// last one
@@ -27,12 +27,6 @@ describe('MultiKeyMap', () => {
map.set(key, values[i])
})
assert.deepEqual(
Array.from(map.entries()),
keys.map((key, i) => [key, values[i]])
)
assert.deepEqual(Array.from(map.values()), values)
keys.forEach((key, i) => {
// copy the key to make sure the array itself is not the key
assert.strictEqual(map.get(key.slice()), values[i])

View File

@@ -18,7 +18,7 @@
"url": "https://vates.fr"
},
"license": "ISC",
"version": "0.2.0",
"version": "0.1.0",
"engines": {
"node": ">=8.10"
},

View File

@@ -0,0 +1,42 @@
'use strict'
exports.INIT_PASSWD = Buffer.from('NBDMAGIC') // "NBDMAGIC" ensure we're connected to a nbd server
exports.OPTS_MAGIC = Buffer.from('IHAVEOPT') // "IHAVEOPT" start an option block
exports.NBD_OPT_REPLY_MAGIC = 1100100111001001n // magic received during negociation
exports.NBD_OPT_EXPORT_NAME = 1
exports.NBD_OPT_ABORT = 2
exports.NBD_OPT_LIST = 3
exports.NBD_OPT_STARTTLS = 5
exports.NBD_OPT_INFO = 6
exports.NBD_OPT_GO = 7
exports.NBD_FLAG_HAS_FLAGS = 1 << 0
exports.NBD_FLAG_READ_ONLY = 1 << 1
exports.NBD_FLAG_SEND_FLUSH = 1 << 2
exports.NBD_FLAG_SEND_FUA = 1 << 3
exports.NBD_FLAG_ROTATIONAL = 1 << 4
exports.NBD_FLAG_SEND_TRIM = 1 << 5
exports.NBD_FLAG_FIXED_NEWSTYLE = 1 << 0
exports.NBD_CMD_FLAG_FUA = 1 << 0
exports.NBD_CMD_FLAG_NO_HOLE = 1 << 1
exports.NBD_CMD_FLAG_DF = 1 << 2
exports.NBD_CMD_FLAG_REQ_ONE = 1 << 3
exports.NBD_CMD_FLAG_FAST_ZERO = 1 << 4
exports.NBD_CMD_READ = 0
exports.NBD_CMD_WRITE = 1
exports.NBD_CMD_DISC = 2
exports.NBD_CMD_FLUSH = 3
exports.NBD_CMD_TRIM = 4
exports.NBD_CMD_CACHE = 5
exports.NBD_CMD_WRITE_ZEROES = 6
exports.NBD_CMD_BLOCK_STATUS = 7
exports.NBD_CMD_RESIZE = 8
exports.NBD_REQUEST_MAGIC = 0x25609513 // magic number to create a new NBD request to send to the server
exports.NBD_REPLY_MAGIC = 0x67446698 // magic number received from the server when reading response to a nbd request
exports.NBD_REPLY_ACK = 1
exports.NBD_DEFAULT_PORT = 10809
exports.NBD_DEFAULT_BLOCK_SIZE = 64 * 1024

View File

@@ -1,41 +0,0 @@
export const INIT_PASSWD = Buffer.from('NBDMAGIC') // "NBDMAGIC" ensure we're connected to a nbd server
export const OPTS_MAGIC = Buffer.from('IHAVEOPT') // "IHAVEOPT" start an option block
export const NBD_OPT_REPLY_MAGIC = 1100100111001001n // magic received during negociation
export const NBD_OPT_EXPORT_NAME = 1
export const NBD_OPT_ABORT = 2
export const NBD_OPT_LIST = 3
export const NBD_OPT_STARTTLS = 5
export const NBD_OPT_INFO = 6
export const NBD_OPT_GO = 7
export const NBD_FLAG_HAS_FLAGS = 1 << 0
export const NBD_FLAG_READ_ONLY = 1 << 1
export const NBD_FLAG_SEND_FLUSH = 1 << 2
export const NBD_FLAG_SEND_FUA = 1 << 3
export const NBD_FLAG_ROTATIONAL = 1 << 4
export const NBD_FLAG_SEND_TRIM = 1 << 5
export const NBD_FLAG_FIXED_NEWSTYLE = 1 << 0
export const NBD_CMD_FLAG_FUA = 1 << 0
export const NBD_CMD_FLAG_NO_HOLE = 1 << 1
export const NBD_CMD_FLAG_DF = 1 << 2
export const NBD_CMD_FLAG_REQ_ONE = 1 << 3
export const NBD_CMD_FLAG_FAST_ZERO = 1 << 4
export const NBD_CMD_READ = 0
export const NBD_CMD_WRITE = 1
export const NBD_CMD_DISC = 2
export const NBD_CMD_FLUSH = 3
export const NBD_CMD_TRIM = 4
export const NBD_CMD_CACHE = 5
export const NBD_CMD_WRITE_ZEROES = 6
export const NBD_CMD_BLOCK_STATUS = 7
export const NBD_CMD_RESIZE = 8
export const NBD_REQUEST_MAGIC = 0x25609513 // magic number to create a new NBD request to send to the server
export const NBD_REPLY_MAGIC = 0x67446698 // magic number received from the server when reading response to a nbd request
export const NBD_REPLY_ACK = 1
export const NBD_DEFAULT_PORT = 10809
export const NBD_DEFAULT_BLOCK_SIZE = 64 * 1024

View File

@@ -1,10 +1,8 @@
import assert from 'node:assert'
import { Socket } from 'node:net'
import { connect } from 'node:tls'
import { fromCallback, pRetry, pDelay, pTimeout, pFromCallback } from 'promise-toolbox'
import { readChunkStrict } from '@vates/read-chunk'
import { createLogger } from '@xen-orchestra/log'
import {
'use strict'
const assert = require('node:assert')
const { Socket } = require('node:net')
const { connect } = require('node:tls')
const {
INIT_PASSWD,
NBD_CMD_READ,
NBD_DEFAULT_BLOCK_SIZE,
@@ -19,12 +17,16 @@ import {
NBD_REQUEST_MAGIC,
OPTS_MAGIC,
NBD_CMD_DISC,
} from './constants.mjs'
} = require('./constants.js')
const { fromCallback, pRetry, pDelay, pTimeout } = require('promise-toolbox')
const { readChunkStrict } = require('@vates/read-chunk')
const { createLogger } = require('@xen-orchestra/log')
const { warn } = createLogger('vates:nbd-client')
// documentation is here : https://github.com/NetworkBlockDevice/nbd/blob/master/doc/proto.md
export default class NbdClient {
module.exports = class NbdClient {
#serverAddress
#serverCert
#serverPort
@@ -38,7 +40,6 @@ export default class NbdClient {
#readBlockRetries
#reconnectRetry
#connectTimeout
#messageTimeout
// AFAIK, there is no guaranty the server answers in the same order as the queries
// so we handle a backlog of command waiting for response and handle concurrency manually
@@ -51,14 +52,7 @@ export default class NbdClient {
#reconnectingPromise
constructor(
{ address, port = NBD_DEFAULT_PORT, exportname, cert },
{
connectTimeout = 6e4,
messageTimeout = 6e4,
waitBeforeReconnect = 1e3,
readAhead = 10,
readBlockRetries = 5,
reconnectRetry = 5,
} = {}
{ connectTimeout = 6e4, waitBeforeReconnect = 1e3, readAhead = 10, readBlockRetries = 5, reconnectRetry = 5 } = {}
) {
this.#serverAddress = address
this.#serverPort = port
@@ -69,7 +63,6 @@ export default class NbdClient {
this.#readBlockRetries = readBlockRetries
this.#reconnectRetry = reconnectRetry
this.#connectTimeout = connectTimeout
this.#messageTimeout = messageTimeout
}
get exportSize() {
@@ -122,27 +115,13 @@ export default class NbdClient {
if (!this.#connected) {
return
}
this.#connected = false
const socket = this.#serverSocket
const queryId = this.#nextCommandQueryId
this.#nextCommandQueryId++
const buffer = Buffer.alloc(28)
buffer.writeInt32BE(NBD_REQUEST_MAGIC, 0) // it is a nbd request
buffer.writeInt16BE(0, 4) // no command flags for a disconnect
buffer.writeInt16BE(NBD_CMD_DISC, 6) // we want to disconnect from nbd server
buffer.writeBigUInt64BE(queryId, 8)
buffer.writeBigUInt64BE(0n, 16)
buffer.writeInt32BE(0, 24)
const promise = pFromCallback(cb => {
socket.end(buffer, 'utf8', cb)
})
try {
await pTimeout.call(promise, this.#messageTimeout)
} catch (error) {
socket.destroy()
}
await this.#write(buffer)
await this.#serverSocket.destroy()
this.#serverSocket = undefined
this.#connected = false
}
@@ -216,13 +195,11 @@ export default class NbdClient {
}
#read(length) {
const promise = readChunkStrict(this.#serverSocket, length)
return pTimeout.call(promise, this.#messageTimeout)
return readChunkStrict(this.#serverSocket, length)
}
#write(buffer) {
const promise = fromCallback.call(this.#serverSocket, 'write', buffer)
return pTimeout.call(promise, this.#messageTimeout)
return fromCallback.call(this.#serverSocket, 'write', buffer)
}
async #readInt32() {
@@ -255,20 +232,19 @@ export default class NbdClient {
}
try {
this.#waitingForResponse = true
const buffer = await this.#read(16)
const magic = buffer.readInt32BE(0)
const magic = await this.#readInt32()
if (magic !== NBD_REPLY_MAGIC) {
throw new Error(`magic number for block answer is wrong : ${magic} ${NBD_REPLY_MAGIC}`)
}
const error = buffer.readInt32BE(4)
const error = await this.#readInt32()
if (error !== 0) {
// @todo use error code from constants.mjs
throw new Error(`GOT ERROR CODE : ${error}`)
}
const blockQueryId = buffer.readBigUInt64BE(8)
const blockQueryId = await this.#readInt64()
const query = this.#commandQueryBacklog.get(blockQueryId)
if (!query) {
throw new Error(` no query associated with id ${blockQueryId}`)
@@ -289,7 +265,7 @@ export default class NbdClient {
}
}
async #readBlock(index, size) {
async readBlock(index, size = NBD_DEFAULT_BLOCK_SIZE) {
// we don't want to add anything in backlog while reconnecting
if (this.#reconnectingPromise) {
await this.#reconnectingPromise
@@ -305,13 +281,7 @@ export default class NbdClient {
buffer.writeInt16BE(NBD_CMD_READ, 6) // we want to read a data block
buffer.writeBigUInt64BE(queryId, 8)
// byte offset in the raw disk
const offset = BigInt(index) * BigInt(size)
const remaining = this.#exportSize - offset
if (remaining < BigInt(size)) {
size = Number(remaining)
}
buffer.writeBigUInt64BE(offset, 16)
buffer.writeBigUInt64BE(BigInt(index) * BigInt(size), 16)
buffer.writeInt32BE(size, 24)
return new Promise((resolve, reject) => {
@@ -337,13 +307,45 @@ export default class NbdClient {
})
}
async readBlock(index, size = NBD_DEFAULT_BLOCK_SIZE) {
return pRetry(() => this.#readBlock(index, size), {
tries: this.#readBlockRetries,
onRetry: async err => {
warn('will retry reading block ', index, err)
await this.reconnect()
},
})
async *readBlocks(indexGenerator) {
// default : read all blocks
if (indexGenerator === undefined) {
const exportSize = this.#exportSize
const chunkSize = 2 * 1024 * 1024
indexGenerator = function* () {
const nbBlocks = Math.ceil(Number(exportSize / BigInt(chunkSize)))
for (let index = 0; BigInt(index) < nbBlocks; index++) {
yield { index, size: chunkSize }
}
}
}
const readAhead = []
const readAheadMaxLength = this.#readAhead
const makeReadBlockPromise = (index, size) => {
const promise = pRetry(() => this.readBlock(index, size), {
tries: this.#readBlockRetries,
onRetry: async err => {
warn('will retry reading block ', index, err)
await this.reconnect()
},
})
// error is handled during unshift
promise.catch(() => {})
return promise
}
// read all blocks, but try to keep readAheadMaxLength promise waiting ahead
for (const { index, size } of indexGenerator()) {
// stack readAheadMaxLength promises before starting to handle the results
if (readAhead.length === readAheadMaxLength) {
// any error will stop reading blocks
yield readAhead.shift()
}
readAhead.push(makeReadBlockPromise(index, size))
}
while (readAhead.length > 0) {
yield readAhead.shift()
}
}
}

View File

@@ -1,87 +0,0 @@
import { asyncEach } from '@vates/async-each'
import { NBD_DEFAULT_BLOCK_SIZE } from './constants.mjs'
import NbdClient from './index.mjs'
import { createLogger } from '@xen-orchestra/log'
const { warn } = createLogger('vates:nbd-client:multi')
export default class MultiNbdClient {
#clients = []
#readAhead
get exportSize() {
return this.#clients[0].exportSize
}
constructor(settings, { nbdConcurrency = 8, readAhead = 16, ...options } = {}) {
this.#readAhead = readAhead
if (!Array.isArray(settings)) {
settings = [settings]
}
for (let i = 0; i < nbdConcurrency; i++) {
this.#clients.push(
new NbdClient(settings[i % settings.length], { ...options, readAhead: Math.ceil(readAhead / nbdConcurrency) })
)
}
}
async connect() {
const connectedClients = []
for (const clientId in this.#clients) {
const client = this.#clients[clientId]
try {
await client.connect()
connectedClients.push(client)
} catch (err) {
client.disconnect().catch(() => {})
warn(`can't connect to one nbd client`, { err })
}
}
if (connectedClients.length === 0) {
throw new Error(`Fail to connect to any Nbd client`)
}
if (connectedClients.length < this.#clients.length) {
warn(
`incomplete connection by multi Nbd, only ${connectedClients.length} over ${
this.#clients.length
} expected clients`
)
this.#clients = connectedClients
}
}
async disconnect() {
await asyncEach(this.#clients, client => client.disconnect(), {
stopOnError: false,
})
}
async readBlock(index, size = NBD_DEFAULT_BLOCK_SIZE) {
const clientId = index % this.#clients.length
return this.#clients[clientId].readBlock(index, size)
}
async *readBlocks(indexGenerator) {
// default : read all blocks
const readAhead = []
const makeReadBlockPromise = (index, size) => {
const promise = this.readBlock(index, size)
// error is handled during unshift
promise.catch(() => {})
return promise
}
// read all blocks, but try to keep readAheadMaxLength promise waiting ahead
for (const { index, size } of indexGenerator()) {
// stack readAheadMaxLength promises before starting to handle the results
if (readAhead.length === this.#readAhead) {
// any error will stop reading blocks
yield readAhead.shift()
}
readAhead.push(makeReadBlockPromise(index, size))
}
while (readAhead.length > 0) {
yield readAhead.shift()
}
}
}

View File

@@ -13,18 +13,17 @@
"url": "https://vates.fr"
},
"license": "ISC",
"version": "3.0.0",
"version": "1.2.1",
"engines": {
"node": ">=14.0"
},
"main": "./index.mjs",
"dependencies": {
"@vates/async-each": "^1.0.0",
"@vates/read-chunk": "^1.2.0",
"@vates/read-chunk": "^1.1.1",
"@xen-orchestra/async-map": "^0.1.2",
"@xen-orchestra/log": "^0.6.0",
"promise-toolbox": "^0.21.0",
"xen-api": "^2.0.0"
"xen-api": "^1.3.3"
},
"devDependencies": {
"tap": "^16.3.0",
@@ -32,6 +31,6 @@
},
"scripts": {
"postversion": "npm publish --access public",
"test-integration": "tap --lines 97 --functions 95 --branches 74 --statements 97 tests/*.integ.mjs"
"test-integration": "tap --lines 97 --functions 95 --branches 74 --statements 97 tests/*.integ.js"
}
}

View File

@@ -1,15 +1,15 @@
import { spawn, exec } from 'node:child_process'
import fs from 'node:fs/promises'
import { test } from 'tap'
import tmp from 'tmp'
import { pFromCallback } from 'promise-toolbox'
import { Socket } from 'node:net'
import { NBD_DEFAULT_PORT } from '../constants.mjs'
import assert from 'node:assert'
import MultiNbdClient from '../multi.mjs'
'use strict'
const NbdClient = require('../index.js')
const { spawn, exec } = require('node:child_process')
const fs = require('node:fs/promises')
const { test } = require('tap')
const tmp = require('tmp')
const { pFromCallback } = require('promise-toolbox')
const { Socket } = require('node:net')
const { NBD_DEFAULT_PORT } = require('../constants.js')
const assert = require('node:assert')
const CHUNK_SIZE = 1024 * 1024 // non default size
const FILE_SIZE = 1024 * 1024 * 9.5 // non aligned file size
const FILE_SIZE = 10 * 1024 * 1024
async function createTempFile(size) {
const tmpPath = await pFromCallback(cb => tmp.file(cb))
@@ -82,7 +82,7 @@ test('it works with unsecured network', async tap => {
const path = await createTempFile(FILE_SIZE)
let nbdServer = await spawnNbdKit(path)
const client = new MultiNbdClient(
const client = new NbdClient(
{
address: '127.0.0.1',
exportname: 'MY_SECRET_EXPORT',
@@ -110,13 +110,13 @@ CYu1Xn/FVPx1HoRgWc7E8wFhDcA/P3SJtfIQWHB9FzSaBflKGR4t8WCE2eE8+cTB
`,
},
{
nbdConcurrency: 1,
readAhead: 2,
}
)
await client.connect()
tap.equal(client.exportSize, BigInt(FILE_SIZE))
const CHUNK_SIZE = 1024 * 1024 // non default size
const indexes = []
for (let i = 0; i < FILE_SIZE / CHUNK_SIZE; i++) {
indexes.push(i)
@@ -128,9 +128,9 @@ CYu1Xn/FVPx1HoRgWc7E8wFhDcA/P3SJtfIQWHB9FzSaBflKGR4t8WCE2eE8+cTB
})
let i = 0
for await (const block of nbdIterator) {
let blockOk = block.length === Math.min(CHUNK_SIZE, FILE_SIZE - CHUNK_SIZE * i)
let blockOk = true
let firstFail
for (let j = 0; j < block.length; j += 4) {
for (let j = 0; j < CHUNK_SIZE; j += 4) {
const wanted = i * CHUNK_SIZE + j
const found = block.readUInt32BE(j)
blockOk = blockOk && found === wanted
@@ -138,7 +138,7 @@ CYu1Xn/FVPx1HoRgWc7E8wFhDcA/P3SJtfIQWHB9FzSaBflKGR4t8WCE2eE8+cTB
firstFail = j
}
}
tap.ok(blockOk, `check block ${i} content ${block.length}`)
tap.ok(blockOk, `check block ${i} content`)
i++
// flaky server is flaky
@@ -148,6 +148,17 @@ CYu1Xn/FVPx1HoRgWc7E8wFhDcA/P3SJtfIQWHB9FzSaBflKGR4t8WCE2eE8+cTB
nbdServer = await spawnNbdKit(path)
}
}
// we can reuse the conneciton to read other blocks
// default iterator
const nbdIteratorWithDefaultBlockIterator = client.readBlocks()
let nb = 0
for await (const block of nbdIteratorWithDefaultBlockIterator) {
nb++
tap.equal(block.length, 2 * 1024 * 1024)
}
tap.equal(nb, 5)
assert.rejects(() => client.readBlock(100, CHUNK_SIZE))
await client.disconnect()

View File

@@ -1,3 +1,4 @@
'use strict'
/*
node-vsphere-soap
@@ -11,18 +12,17 @@
*/
import { EventEmitter } from 'events'
import axios from 'axios'
import https from 'node:https'
import util from 'util'
import soap from 'soap'
import Cookie from 'soap-cookie' // required for session persistence
const EventEmitter = require('events').EventEmitter
const axios = require('axios')
const https = require('node:https')
const util = require('util')
const soap = require('soap')
const Cookie = require('soap-cookie') // required for session persistence
// Client class
// inherits from EventEmitter
// possible events: connect, error, ready
export function Client(vCenterHostname, username, password, sslVerify) {
function Client(vCenterHostname, username, password, sslVerify) {
this.status = 'disconnected'
this.reconnectCount = 0
@@ -228,3 +228,4 @@ function _soapErrorHandler(self, emitter, command, args, err) {
}
// end
exports.Client = Client

View File

@@ -1,8 +1,8 @@
{
"name": "@vates/node-vsphere-soap",
"version": "2.0.0",
"version": "1.0.0",
"description": "interface to vSphere SOAP/WSDL from node for interfacing with vCenter or ESXi, forked from node-vsphere-soap",
"main": "lib/client.mjs",
"main": "lib/client.js",
"author": "reedog117",
"repository": {
"directory": "@vates/node-vsphere-soap",
@@ -30,7 +30,7 @@
"private": false,
"homepage": "https://github.com/vatesfr/xen-orchestra/tree/master/@vates/node-vsphere-soap",
"engines": {
"node": ">=14"
"node": ">=8.10"
},
"scripts": {
"postversion": "npm publish --access public"

View File

@@ -1,11 +1,15 @@
'use strict'
// place your own credentials here for a vCenter or ESXi server
// this information will be used for connecting to a vCenter instance
// for module testing
// name the file config-test.js
export const vCenterTestCreds = {
const vCenterTestCreds = {
vCenterIP: 'vcsa',
vCenterUser: 'vcuser',
vCenterPassword: 'vcpw',
vCenter: true,
}
exports.vCenterTestCreds = vCenterTestCreds

View File

@@ -1,16 +1,18 @@
'use strict'
/*
vsphere-soap.test.js
tests for the vCenterConnectionInstance class
*/
import assert from 'assert'
import { describe, it } from 'test'
const assert = require('assert')
const { describe, it } = require('test')
import * as vc from '../lib/client.mjs'
const vc = require('../lib/client')
// eslint-disable-next-line n/no-missing-import
import { vCenterTestCreds as TestCreds } from '../config-test.mjs'
// eslint-disable-next-line n/no-missing-require
const TestCreds = require('../config-test.js').vCenterTestCreds
const VItest = new vc.Client(TestCreds.vCenterIP, TestCreds.vCenterUser, TestCreds.vCenterPassword, false)

View File

@@ -1,7 +1,6 @@
'use strict'
const assert = require('assert')
const isUtf8 = require('isutf8')
/**
* Read a chunk of data from a stream.
@@ -22,41 +21,41 @@ const readChunk = (stream, size) =>
stream.errored != null
? Promise.reject(stream.errored)
: stream.closed || stream.readableEnded
? Promise.resolve(null)
: new Promise((resolve, reject) => {
if (size !== undefined) {
assert(size > 0)
? Promise.resolve(null)
: new Promise((resolve, reject) => {
if (size !== undefined) {
assert(size > 0)
// per Node documentation:
// > The size argument must be less than or equal to 1 GiB.
assert(size < 1073741824)
}
// per Node documentation:
// > The size argument must be less than or equal to 1 GiB.
assert(size < 1073741824)
}
function onEnd() {
resolve(null)
function onEnd() {
resolve(null)
removeListeners()
}
function onError(error) {
reject(error)
removeListeners()
}
function onReadable() {
const data = stream.read(size)
if (data !== null) {
resolve(data)
removeListeners()
}
function onError(error) {
reject(error)
removeListeners()
}
function onReadable() {
const data = stream.read(size)
if (data !== null) {
resolve(data)
removeListeners()
}
}
function removeListeners() {
stream.removeListener('end', onEnd)
stream.removeListener('error', onError)
stream.removeListener('readable', onReadable)
}
stream.on('end', onEnd)
stream.on('error', onError)
stream.on('readable', onReadable)
onReadable()
})
}
function removeListeners() {
stream.removeListener('end', onEnd)
stream.removeListener('error', onError)
stream.removeListener('readable', onReadable)
}
stream.on('end', onEnd)
stream.on('error', onError)
stream.on('readable', onReadable)
onReadable()
})
exports.readChunk = readChunk
/**
@@ -82,13 +81,6 @@ exports.readChunkStrict = async function readChunkStrict(stream, size) {
if (size !== undefined && chunk.length !== size) {
const error = new Error(`stream has ended with not enough data (actual: ${chunk.length}, expected: ${size})`)
// Buffer.isUtf8 is too recent for now
// @todo : replace external package by Buffer.isUtf8 when the supported version of node reach 18
if (chunk.length < 1024 && isUtf8(chunk)) {
error.text = chunk.toString('utf8')
}
Object.defineProperties(error, {
chunk: {
value: chunk,
@@ -111,42 +103,42 @@ async function skip(stream, size) {
return stream.errored != null
? Promise.reject(stream.errored)
: size === 0 || stream.closed || stream.readableEnded
? Promise.resolve(0)
: new Promise((resolve, reject) => {
let left = size
function onEnd() {
resolve(size - left)
removeListeners()
}
function onError(error) {
reject(error)
removeListeners()
}
function onReadable() {
const data = stream.read()
left -= data === null ? 0 : data.length
if (left > 0) {
// continue to read
} else {
// if more than wanted has been read, push back the rest
if (left < 0) {
stream.unshift(data.slice(left))
}
resolve(size)
removeListeners()
? Promise.resolve(0)
: new Promise((resolve, reject) => {
let left = size
function onEnd() {
resolve(size - left)
removeListeners()
}
function onError(error) {
reject(error)
removeListeners()
}
function onReadable() {
const data = stream.read()
left -= data === null ? 0 : data.length
if (left > 0) {
// continue to read
} else {
// if more than wanted has been read, push back the rest
if (left < 0) {
stream.unshift(data.slice(left))
}
resolve(size)
removeListeners()
}
function removeListeners() {
stream.removeListener('end', onEnd)
stream.removeListener('error', onError)
stream.removeListener('readable', onReadable)
}
stream.on('end', onEnd)
stream.on('error', onError)
stream.on('readable', onReadable)
onReadable()
})
}
function removeListeners() {
stream.removeListener('end', onEnd)
stream.removeListener('error', onError)
stream.removeListener('readable', onReadable)
}
stream.on('end', onEnd)
stream.on('error', onError)
stream.on('readable', onReadable)
onReadable()
})
}
exports.skip = skip

View File

@@ -102,37 +102,12 @@ describe('readChunkStrict', function () {
assert.strictEqual(error.chunk, undefined)
})
it('throws if stream ends with not enough data, utf8', async () => {
it('throws if stream ends with not enough data', async () => {
const error = await rejectionOf(readChunkStrict(makeStream(['foo', 'bar']), 10))
assert(error instanceof Error)
assert.strictEqual(error.message, 'stream has ended with not enough data (actual: 6, expected: 10)')
assert.strictEqual(error.text, 'foobar')
assert.deepEqual(error.chunk, Buffer.from('foobar'))
})
it('throws if stream ends with not enough data, non utf8 ', async () => {
const source = [Buffer.alloc(10, 128), Buffer.alloc(10, 128)]
const error = await rejectionOf(readChunkStrict(makeStream(source), 30))
assert(error instanceof Error)
assert.strictEqual(error.message, 'stream has ended with not enough data (actual: 20, expected: 30)')
assert.strictEqual(error.text, undefined)
assert.deepEqual(error.chunk, Buffer.concat(source))
})
it('throws if stream ends with not enough data, utf8 , long data', async () => {
const source = Buffer.from('a'.repeat(1500))
const error = await rejectionOf(readChunkStrict(makeStream([source]), 2000))
assert(error instanceof Error)
assert.strictEqual(error.message, `stream has ended with not enough data (actual: 1500, expected: 2000)`)
assert.strictEqual(error.text, undefined)
assert.deepEqual(error.chunk, source)
})
it('succeed', async () => {
const source = Buffer.from('a'.repeat(20))
const chunk = await readChunkStrict(makeStream([source]), 10)
assert.deepEqual(source.subarray(10), chunk)
})
})
describe('skip', function () {
@@ -159,16 +134,6 @@ describe('skip', function () {
it('returns less size if stream ends', async () => {
assert.deepEqual(await skip(makeStream('foo bar'), 10), 7)
})
it('put back if it read too much', async () => {
let source = makeStream(['foo', 'bar'])
await skip(source, 1) // read part of data chunk
const chunk = (await readChunkStrict(source, 2)).toString('utf-8')
assert.strictEqual(chunk, 'oo')
source = makeStream(['foo', 'bar'])
assert.strictEqual(await skip(source, 3), 3) // read aligned with data chunk
})
})
describe('skipStrict', function () {
@@ -179,9 +144,4 @@ describe('skipStrict', function () {
assert.strictEqual(error.message, 'stream has ended with not enough data (actual: 7, expected: 10)')
assert.deepEqual(error.bytesSkipped, 7)
})
it('succeed', async () => {
const source = makeStream(['foo', 'bar', 'baz'])
const res = await skipStrict(source, 4)
assert.strictEqual(res, undefined)
})
})

View File

@@ -19,7 +19,7 @@
"type": "git",
"url": "https://github.com/vatesfr/xen-orchestra.git"
},
"version": "1.2.0",
"version": "1.1.1",
"engines": {
"node": ">=8.10"
},
@@ -33,8 +33,5 @@
},
"devDependencies": {
"test": "^3.2.1"
},
"dependencies": {
"isutf8": "^4.0.0"
}
}

View File

@@ -27,7 +27,7 @@
"license": "ISC",
"version": "0.1.0",
"engines": {
"node": ">=12.3"
"node": ">=10"
},
"scripts": {
"postversion": "npm publish --access public",

View File

@@ -111,7 +111,7 @@ const onProgress = makeOnProgress({
// current status of the task as described in the previous section
taskLog.status
// undefined or a dictionary of properties attached to the task
// undefined or a dictionnary of properties attached to the task
taskLog.properties
// timestamp at which the abortion was requested, undefined otherwise

View File

@@ -35,7 +35,7 @@
"test": "node--test"
},
"devDependencies": {
"sinon": "^17.0.1",
"sinon": "^15.0.1",
"test": "^3.2.1"
}
}

View File

@@ -1,5 +1,5 @@
import { asyncMap } from '@xen-orchestra/async-map'
import { RemoteAdapter } from '@xen-orchestra/backups/RemoteAdapter.mjs'
import { RemoteAdapter } from '@xen-orchestra/backups/RemoteAdapter.js'
import { getSyncedHandler } from '@xen-orchestra/fs'
import getopts from 'getopts'
import { basename, dirname } from 'path'

View File

@@ -7,9 +7,9 @@
"bugs": "https://github.com/vatesfr/xen-orchestra/issues",
"dependencies": {
"@xen-orchestra/async-map": "^0.1.2",
"@xen-orchestra/backups": "^0.44.3",
"@xen-orchestra/fs": "^4.1.3",
"filenamify": "^6.0.0",
"@xen-orchestra/backups": "^0.39.0",
"@xen-orchestra/fs": "^4.0.1",
"filenamify": "^4.1.0",
"getopts": "^2.2.5",
"lodash": "^4.17.15",
"promise-toolbox": "^0.21.0"
@@ -27,7 +27,7 @@
"scripts": {
"postversion": "npm publish --access public"
},
"version": "1.0.14",
"version": "1.0.9",
"license": "AGPL-3.0-or-later",
"author": {
"name": "Vates SAS",

View File

@@ -1,8 +1,10 @@
import { Metadata } from './_runners/Metadata.mjs'
import { VmsRemote } from './_runners/VmsRemote.mjs'
import { VmsXapi } from './_runners/VmsXapi.mjs'
'use strict'
export function createRunner(opts) {
const { Metadata } = require('./_runners/Metadata.js')
const { VmsRemote } = require('./_runners/VmsRemote.js')
const { VmsXapi } = require('./_runners/VmsXapi.js')
exports.createRunner = function createRunner(opts) {
const { type } = opts.job
switch (type) {
case 'backup':

View File

@@ -1,6 +1,8 @@
import { asyncMap } from '@xen-orchestra/async-map'
'use strict'
export class DurablePartition {
const { asyncMap } = require('@xen-orchestra/async-map')
exports.DurablePartition = class DurablePartition {
// private resource API is used exceptionally to be able to separate resource creation and release
#partitionDisposers = {}

View File

@@ -1,6 +1,8 @@
import { Task } from './Task.mjs'
'use strict'
export class HealthCheckVmBackup {
const { Task } = require('./Task')
exports.HealthCheckVmBackup = class HealthCheckVmBackup {
#restoredVm
#timeout
#xapi

View File

@@ -0,0 +1,73 @@
'use strict'
const assert = require('assert')
const { formatFilenameDate } = require('./_filenameDate.js')
const { importIncrementalVm } = require('./_incrementalVm.js')
const { Task } = require('./Task.js')
const { watchStreamSize } = require('./_watchStreamSize.js')
exports.ImportVmBackup = class ImportVmBackup {
constructor({ adapter, metadata, srUuid, xapi, settings: { newMacAddresses, mapVdisSrs = {} } = {} }) {
this._adapter = adapter
this._importIncrementalVmSettings = { newMacAddresses, mapVdisSrs }
this._metadata = metadata
this._srUuid = srUuid
this._xapi = xapi
}
async run() {
const adapter = this._adapter
const metadata = this._metadata
const isFull = metadata.mode === 'full'
const sizeContainer = { size: 0 }
let backup
if (isFull) {
backup = await adapter.readFullVmBackup(metadata)
watchStreamSize(backup, sizeContainer)
} else {
assert.strictEqual(metadata.mode, 'delta')
const ignoredVdis = new Set(
Object.entries(this._importIncrementalVmSettings.mapVdisSrs)
.filter(([_, srUuid]) => srUuid === null)
.map(([vdiUuid]) => vdiUuid)
)
backup = await adapter.readIncrementalVmBackup(metadata, ignoredVdis)
Object.values(backup.streams).forEach(stream => watchStreamSize(stream, sizeContainer))
}
return Task.run(
{
name: 'transfer',
},
async () => {
const xapi = this._xapi
const srRef = await xapi.call('SR.get_by_uuid', this._srUuid)
const vmRef = isFull
? await xapi.VM_import(backup, srRef)
: await importIncrementalVm(backup, await xapi.getRecord('SR', srRef), {
...this._importIncrementalVmSettings,
detectBase: false,
})
await Promise.all([
xapi.call('VM.add_tags', vmRef, 'restored from backup'),
xapi.call(
'VM.set_name_label',
vmRef,
`${metadata.vm.name_label} (${formatFilenameDate(metadata.timestamp)})`
),
])
return {
size: sizeContainer.size,
id: await xapi.getField('VM', vmRef, 'uuid'),
}
}
)
}
}

View File

@@ -1,280 +0,0 @@
import assert from 'node:assert'
import { formatFilenameDate } from './_filenameDate.mjs'
import { importIncrementalVm } from './_incrementalVm.mjs'
import { Task } from './Task.mjs'
import { watchStreamSize } from './_watchStreamSize.mjs'
import { VhdNegative, VhdSynthetic } from 'vhd-lib'
import { decorateClass } from '@vates/decorate-with'
import { createLogger } from '@xen-orchestra/log'
import { dirname, join } from 'node:path'
import pickBy from 'lodash/pickBy.js'
import { defer } from 'golike-defer'
const { debug, info, warn } = createLogger('xo:backups:importVmBackup')
async function resolveUuid(xapi, cache, uuid, type) {
if (uuid == null) {
return uuid
}
const ref = cache.get(uuid)
if (ref === undefined) {
cache.set(uuid, xapi.call(`${type}.get_by_uuid`, uuid))
}
return cache.get(uuid)
}
export class ImportVmBackup {
constructor({
adapter,
metadata,
srUuid,
xapi,
settings: { additionnalVmTag, newMacAddresses, mapVdisSrs = {}, useDifferentialRestore = false } = {},
}) {
this._adapter = adapter
this._importIncrementalVmSettings = { additionnalVmTag, newMacAddresses, mapVdisSrs, useDifferentialRestore }
this._metadata = metadata
this._srUuid = srUuid
this._xapi = xapi
}
async #getPathOfVdiSnapshot(snapshotUuid) {
const metadata = this._metadata
if (this._pathToVdis === undefined) {
const backups = await this._adapter.listVmBackups(
this._metadata.vm.uuid,
({ mode, timestamp }) => mode === 'delta' && timestamp >= metadata.timestamp
)
const map = new Map()
for (const backup of backups) {
for (const [vdiRef, vdi] of Object.entries(backup.vdis)) {
map.set(vdi.uuid, backup.vhds[vdiRef])
}
}
this._pathToVdis = map
}
return this._pathToVdis.get(snapshotUuid)
}
async _reuseNearestSnapshot($defer, ignoredVdis) {
const metadata = this._metadata
const { mapVdisSrs } = this._importIncrementalVmSettings
const { vbds, vhds, vifs, vm, vmSnapshot } = metadata
const streams = {}
const metdataDir = dirname(metadata._filename)
const vdis = ignoredVdis === undefined ? metadata.vdis : pickBy(metadata.vdis, vdi => !ignoredVdis.has(vdi.uuid))
for (const [vdiRef, vdi] of Object.entries(vdis)) {
const vhdPath = join(metdataDir, vhds[vdiRef])
let xapiDisk
try {
xapiDisk = await this._xapi.getRecordByUuid('VDI', vdi.$snapshot_of$uuid)
} catch (err) {
// if this disk is not present anymore, fall back to default restore
warn(err)
}
let snapshotCandidate, backupCandidate
if (xapiDisk !== undefined) {
debug('found disks, wlll search its snapshots', { snapshots: xapiDisk.snapshots })
for (const snapshotRef of xapiDisk.snapshots) {
const snapshot = await this._xapi.getRecord('VDI', snapshotRef)
debug('handling snapshot', { snapshot })
// take only the first snapshot
if (snapshotCandidate && snapshotCandidate.snapshot_time < snapshot.snapshot_time) {
debug('already got a better candidate')
continue
}
// have a corresponding backup more recent than metadata ?
const pathToSnapshotData = await this.#getPathOfVdiSnapshot(snapshot.uuid)
if (pathToSnapshotData === undefined) {
debug('no backup linked to this snaphot')
continue
}
if (snapshot.$SR.uuid !== (mapVdisSrs[vdi.$snapshot_of$uuid] ?? this._srUuid)) {
debug('not restored on the same SR', { snapshotSr: snapshot.$SR.uuid, mapVdisSrs, srUuid: this._srUuid })
continue
}
debug('got a candidate', pathToSnapshotData)
snapshotCandidate = snapshot
backupCandidate = pathToSnapshotData
}
}
let stream
const backupWithSnapshotPath = join(metdataDir, backupCandidate ?? '')
if (vhdPath === backupWithSnapshotPath) {
// all the data are already on the host
debug('direct reuse of a snapshot')
stream = null
vdis[vdiRef].baseVdi = snapshotCandidate
// go next disk , we won't use this stream
continue
}
let disposableDescendants
const disposableSynthetic = await VhdSynthetic.fromVhdChain(this._adapter._handler, vhdPath)
// this will also clean if another disk of this VM backup fails
// if user really only need to restore non failing disks he can retry with ignoredVdis
let disposed = false
const disposeOnce = async () => {
if (!disposed) {
disposed = true
try {
await disposableDescendants?.dispose()
await disposableSynthetic?.dispose()
} catch (error) {
warn('openVhd: failed to dispose VHDs', { error })
}
}
}
$defer.onFailure(() => disposeOnce())
const parentVhd = disposableSynthetic.value
await parentVhd.readBlockAllocationTable()
debug('got vhd synthetic of parents', parentVhd.length)
if (snapshotCandidate !== undefined) {
try {
debug('will try to use differential restore', {
backupWithSnapshotPath,
vhdPath,
vdiRef,
})
disposableDescendants = await VhdSynthetic.fromVhdChain(this._adapter._handler, backupWithSnapshotPath, {
until: vhdPath,
})
const descendantsVhd = disposableDescendants.value
await descendantsVhd.readBlockAllocationTable()
debug('got vhd synthetic of descendants')
const negativeVhd = new VhdNegative(parentVhd, descendantsVhd)
debug('got vhd negative')
// update the stream with the negative vhd stream
stream = await negativeVhd.stream()
vdis[vdiRef].baseVdi = snapshotCandidate
} catch (err) {
// can be a broken VHD chain, a vhd chain with a key backup, ....
// not an irrecuperable error, don't dispose parentVhd, and fallback to full restore
warn(`can't use differential restore`, err)
disposableDescendants?.dispose()
}
}
// didn't make a negative stream : fallback to classic stream
if (stream === undefined) {
debug('use legacy restore')
stream = await parentVhd.stream()
}
stream.on('end', disposeOnce)
stream.on('close', disposeOnce)
stream.on('error', disposeOnce)
info('everything is ready, will transfer', stream.length)
streams[`${vdiRef}.vhd`] = stream
}
return {
streams,
vbds,
vdis,
version: '1.0.0',
vifs,
vm: { ...vm, suspend_VDI: vmSnapshot.suspend_VDI },
}
}
async #decorateIncrementalVmMetadata() {
const { additionnalVmTag, mapVdisSrs, useDifferentialRestore } = this._importIncrementalVmSettings
const ignoredVdis = new Set(
Object.entries(mapVdisSrs)
.filter(([_, srUuid]) => srUuid === null)
.map(([vdiUuid]) => vdiUuid)
)
let backup
if (useDifferentialRestore) {
backup = await this._reuseNearestSnapshot(ignoredVdis)
} else {
backup = await this._adapter.readIncrementalVmBackup(this._metadata, ignoredVdis)
}
const xapi = this._xapi
const cache = new Map()
const mapVdisSrRefs = {}
if (additionnalVmTag !== undefined) {
backup.vm.tags.push(additionnalVmTag)
}
for (const [vdiUuid, srUuid] of Object.entries(mapVdisSrs)) {
mapVdisSrRefs[vdiUuid] = await resolveUuid(xapi, cache, srUuid, 'SR')
}
const srRef = await resolveUuid(xapi, cache, this._srUuid, 'SR')
Object.values(backup.vdis).forEach(vdi => {
vdi.SR = mapVdisSrRefs[vdi.uuid] ?? srRef
})
return backup
}
async run() {
const adapter = this._adapter
const metadata = this._metadata
const isFull = metadata.mode === 'full'
const sizeContainer = { size: 0 }
const { newMacAddresses } = this._importIncrementalVmSettings
let backup
if (isFull) {
backup = await adapter.readFullVmBackup(metadata)
watchStreamSize(backup, sizeContainer)
} else {
assert.strictEqual(metadata.mode, 'delta')
backup = await this.#decorateIncrementalVmMetadata()
Object.values(backup.streams).forEach(stream => watchStreamSize(stream, sizeContainer))
}
return Task.run(
{
name: 'transfer',
},
async () => {
const xapi = this._xapi
const srRef = await xapi.call('SR.get_by_uuid', this._srUuid)
const vmRef = isFull
? await xapi.VM_import(backup, srRef)
: await importIncrementalVm(backup, await xapi.getRecord('SR', srRef), {
newMacAddresses,
})
await Promise.all([
xapi.call('VM.add_tags', vmRef, 'restored from backup'),
xapi.call(
'VM.set_name_label',
vmRef,
`${metadata.vm.name_label} (${formatFilenameDate(metadata.timestamp)})`
),
xapi.call(
'VM.set_name_description',
vmRef,
`Restored on ${formatFilenameDate(+new Date())} from ${adapter._handler._remote.name} -
${metadata.vm.name_description}
`
),
])
return {
size: sizeContainer.size,
id: await xapi.getField('VM', vmRef, 'uuid'),
}
}
)
}
}
decorateClass(ImportVmBackup, { _reuseNearestSnapshot: defer })

View File

@@ -1,39 +1,43 @@
import { asyncEach } from '@vates/async-each'
import { asyncMap, asyncMapSettled } from '@xen-orchestra/async-map'
import { compose } from '@vates/compose'
import { createLogger } from '@xen-orchestra/log'
import { createVhdDirectoryFromStream, openVhd, VhdAbstract, VhdDirectory, VhdSynthetic } from 'vhd-lib'
import { decorateMethodsWith } from '@vates/decorate-with'
import { deduped } from '@vates/disposable/deduped.js'
import { dirname, join, resolve } from 'node:path'
import { execFile } from 'child_process'
import { mount } from '@vates/fuse-vhd'
import { readdir, lstat } from 'node:fs/promises'
import { synchronized } from 'decorator-synchronized'
import { v4 as uuidv4 } from 'uuid'
import { ZipFile } from 'yazl'
import Disposable from 'promise-toolbox/Disposable'
import fromCallback from 'promise-toolbox/fromCallback'
import fromEvent from 'promise-toolbox/fromEvent'
import groupBy from 'lodash/groupBy.js'
import pDefer from 'promise-toolbox/defer'
import pickBy from 'lodash/pickBy.js'
import tar from 'tar'
import zlib from 'zlib'
'use strict'
import { BACKUP_DIR } from './_getVmBackupDir.mjs'
import { cleanVm } from './_cleanVm.mjs'
import { formatFilenameDate } from './_filenameDate.mjs'
import { getTmpDir } from './_getTmpDir.mjs'
import { isMetadataFile } from './_backupType.mjs'
import { isValidXva } from './_isValidXva.mjs'
import { listPartitions, LVM_PARTITION_TYPE } from './_listPartitions.mjs'
import { lvs, pvs } from './_lvm.mjs'
import { watchStreamSize } from './_watchStreamSize.mjs'
const { asyncMap, asyncMapSettled } = require('@xen-orchestra/async-map')
const { synchronized } = require('decorator-synchronized')
const Disposable = require('promise-toolbox/Disposable')
const fromCallback = require('promise-toolbox/fromCallback')
const fromEvent = require('promise-toolbox/fromEvent')
const pDefer = require('promise-toolbox/defer')
const groupBy = require('lodash/groupBy.js')
const pickBy = require('lodash/pickBy.js')
const { dirname, join, normalize, resolve } = require('path')
const { createLogger } = require('@xen-orchestra/log')
const { createVhdDirectoryFromStream, openVhd, VhdAbstract, VhdDirectory, VhdSynthetic } = require('vhd-lib')
const { deduped } = require('@vates/disposable/deduped.js')
const { decorateMethodsWith } = require('@vates/decorate-with')
const { compose } = require('@vates/compose')
const { execFile } = require('child_process')
const { readdir, lstat } = require('fs-extra')
const { v4: uuidv4 } = require('uuid')
const { ZipFile } = require('yazl')
const zlib = require('zlib')
export const DIR_XO_CONFIG_BACKUPS = 'xo-config-backups'
const { BACKUP_DIR } = require('./_getVmBackupDir.js')
const { cleanVm } = require('./_cleanVm.js')
const { formatFilenameDate } = require('./_filenameDate.js')
const { getTmpDir } = require('./_getTmpDir.js')
const { isMetadataFile } = require('./_backupType.js')
const { isValidXva } = require('./_isValidXva.js')
const { listPartitions, LVM_PARTITION_TYPE } = require('./_listPartitions.js')
const { lvs, pvs } = require('./_lvm.js')
const { watchStreamSize } = require('./_watchStreamSize')
// @todo : this import is marked extraneous , sould be fixed when lib is published
const { mount } = require('@vates/fuse-vhd')
const { asyncEach } = require('@vates/async-each')
export const DIR_XO_POOL_METADATA_BACKUPS = 'xo-pool-metadata-backups'
const DIR_XO_CONFIG_BACKUPS = 'xo-config-backups'
exports.DIR_XO_CONFIG_BACKUPS = DIR_XO_CONFIG_BACKUPS
const DIR_XO_POOL_METADATA_BACKUPS = 'xo-pool-metadata-backups'
exports.DIR_XO_POOL_METADATA_BACKUPS = DIR_XO_POOL_METADATA_BACKUPS
const { debug, warn } = createLogger('xo:backups:RemoteAdapter')
@@ -42,23 +46,20 @@ const compareTimestamp = (a, b) => a.timestamp - b.timestamp
const noop = Function.prototype
const resolveRelativeFromFile = (file, path) => resolve('/', dirname(file), path).slice(1)
const makeRelative = path => resolve('/', path).slice(1)
const resolveSubpath = (root, path) => resolve(root, makeRelative(path))
async function addZipEntries(zip, realBasePath, virtualBasePath, relativePaths) {
for (const relativePath of relativePaths) {
const realPath = join(realBasePath, relativePath)
const virtualPath = join(virtualBasePath, relativePath)
const resolveSubpath = (root, path) => resolve(root, `.${resolve('/', path)}`)
const stats = await lstat(realPath)
const { mode, mtime } = stats
const opts = { mode, mtime }
if (stats.isDirectory()) {
zip.addEmptyDirectory(virtualPath, opts)
await addZipEntries(zip, realPath, virtualPath, await readdir(realPath))
} else if (stats.isFile()) {
zip.addFile(realPath, virtualPath, opts)
}
async function addDirectory(files, realPath, metadataPath) {
const stats = await lstat(realPath)
if (stats.isDirectory()) {
await asyncMap(await readdir(realPath), file =>
addDirectory(files, realPath + '/' + file, metadataPath + '/' + file)
)
} else if (stats.isFile()) {
files.push({
realPath,
metadataPath,
})
}
}
@@ -75,7 +76,7 @@ const debounceResourceFactory = factory =>
return this._debounceResource(factory.apply(this, arguments))
}
export class RemoteAdapter {
class RemoteAdapter {
constructor(
handler,
{ debounceResource = res => res, dirMode, vhdDirectoryCompression, useGetDiskLegacy = false } = {}
@@ -186,6 +187,17 @@ export class RemoteAdapter {
})
}
async *_usePartitionFiles(diskId, partitionId, paths) {
const path = yield this.getPartition(diskId, partitionId)
const files = []
await asyncMap(paths, file =>
addDirectory(files, resolveSubpath(path, file), normalize('./' + file).replace(/\/+$/, ''))
)
return files
}
// check if we will be allowed to merge a a vhd created in this adapter
// with the vhd at path `path`
async isMergeableParent(packedParentUid, path) {
@@ -202,24 +214,15 @@ export class RemoteAdapter {
})
}
fetchPartitionFiles(diskId, partitionId, paths, format) {
fetchPartitionFiles(diskId, partitionId, paths) {
const { promise, reject, resolve } = pDefer()
Disposable.use(
async function* () {
const path = yield this.getPartition(diskId, partitionId)
let outputStream
if (format === 'tgz') {
outputStream = tar.c({ cwd: path, gzip: true }, paths.map(makeRelative))
} else if (format === 'zip') {
const zip = new ZipFile()
await addZipEntries(zip, path, '', paths.map(makeRelative))
zip.end()
;({ outputStream } = zip)
} else {
throw new Error('unsupported format ' + format)
}
const files = yield this._usePartitionFiles(diskId, partitionId, paths)
const zip = new ZipFile()
files.forEach(({ realPath, metadataPath }) => zip.addFile(realPath, metadataPath))
zip.end()
const { outputStream } = zip
resolve(outputStream)
await fromEvent(outputStream, 'end')
}.bind(this)
@@ -681,13 +684,11 @@ export class RemoteAdapter {
}
}
async outputStream(path, input, { checksum = true, maxStreamLength, streamLength, validator = noop } = {}) {
async outputStream(path, input, { checksum = true, validator = noop } = {}) {
const container = watchStreamSize(input)
await this._handler.outputStream(path, input, {
checksum,
dirMode: this._dirMode,
maxStreamLength,
streamLength,
async validator() {
await input.task
return validator.apply(this, arguments)
@@ -828,7 +829,11 @@ decorateMethodsWith(RemoteAdapter, {
debounceResourceFactory,
]),
_usePartitionFiles: Disposable.factory,
getDisk: compose([Disposable.factory, [deduped, diskId => [diskId]], debounceResourceFactory]),
getPartition: Disposable.factory,
})
exports.RemoteAdapter = RemoteAdapter

View File

@@ -0,0 +1,29 @@
'use strict'
const { join, resolve } = require('node:path/posix')
const { DIR_XO_POOL_METADATA_BACKUPS } = require('./RemoteAdapter.js')
const { PATH_DB_DUMP } = require('./_runners/_PoolMetadataBackup.js')
exports.RestoreMetadataBackup = class RestoreMetadataBackup {
constructor({ backupId, handler, xapi }) {
this._backupId = backupId
this._handler = handler
this._xapi = xapi
}
async run() {
const backupId = this._backupId
const handler = this._handler
const xapi = this._xapi
if (backupId.split('/')[0] === DIR_XO_POOL_METADATA_BACKUPS) {
return xapi.putResource(await handler.createReadStream(`${backupId}/data`), PATH_DB_DUMP, {
task: xapi.task_create('Import pool metadata'),
})
} else {
const metadata = JSON.parse(await handler.readFile(join(backupId, 'metadata.json')))
return String(await handler.readFile(resolve(backupId, metadata.data ?? 'data.json')))
}
}
}

View File

@@ -1,32 +0,0 @@
import { join, resolve } from 'node:path/posix'
import { DIR_XO_POOL_METADATA_BACKUPS } from './RemoteAdapter.mjs'
import { PATH_DB_DUMP } from './_runners/_PoolMetadataBackup.mjs'
export class RestoreMetadataBackup {
constructor({ backupId, handler, xapi }) {
this._backupId = backupId
this._handler = handler
this._xapi = xapi
}
async run() {
const backupId = this._backupId
const handler = this._handler
const xapi = this._xapi
if (backupId.split('/')[0] === DIR_XO_POOL_METADATA_BACKUPS) {
return xapi.putResource(await handler.createReadStream(`${backupId}/data`), PATH_DB_DUMP, {
task: xapi.task_create('Import pool metadata'),
})
} else {
const metadata = JSON.parse(await handler.readFile(join(backupId, 'metadata.json')))
const dataFileName = resolve(backupId, metadata.data ?? 'data.json')
const data = await handler.readFile(dataFileName)
// if data is JSON, sent it as a plain string, otherwise, consider the data as binary and encode it
const isJson = dataFileName.endsWith('.json')
return isJson ? data.toString() : { encoding: 'base64', data: data.toString('base64') }
}
}
}

View File

@@ -1,5 +1,7 @@
import CancelToken from 'promise-toolbox/CancelToken'
import Zone from 'node-zone'
'use strict'
const CancelToken = require('promise-toolbox/CancelToken')
const Zone = require('node-zone')
const logAfterEnd = log => {
const error = new Error('task has already ended')
@@ -28,7 +30,7 @@ const serializeError = error =>
const $$task = Symbol('@xen-orchestra/backups/Task')
export class Task {
class Task {
static get cancelToken() {
const task = Zone.current.data[$$task]
return task !== undefined ? task.#cancelToken : CancelToken.none
@@ -149,6 +151,7 @@ export class Task {
})
}
}
exports.Task = Task
for (const method of ['info', 'warning']) {
Task[method] = (...args) => Zone.current.data[$$task]?.[method](...args)

View File

@@ -0,0 +1,6 @@
'use strict'
exports.isMetadataFile = filename => filename.endsWith('.json')
exports.isVhdFile = filename => filename.endsWith('.vhd')
exports.isXvaFile = filename => filename.endsWith('.xva')
exports.isXvaSumFile = filename => filename.endsWith('.xva.checksum')

View File

@@ -1,4 +0,0 @@
export const isMetadataFile = filename => filename.endsWith('.json')
export const isVhdFile = filename => filename.endsWith('.vhd')
export const isXvaFile = filename => filename.endsWith('.xva')
export const isXvaSumFile = filename => filename.endsWith('.xva.checksum')

View File

@@ -1,25 +1,25 @@
import { createLogger } from '@xen-orchestra/log'
import { catchGlobalErrors } from '@xen-orchestra/log/configure'
'use strict'
import Disposable from 'promise-toolbox/Disposable'
import ignoreErrors from 'promise-toolbox/ignoreErrors'
import { compose } from '@vates/compose'
import { createCachedLookup } from '@vates/cached-dns.lookup'
import { createDebounceResource } from '@vates/disposable/debounceResource.js'
import { createRunner } from './Backup.mjs'
import { decorateMethodsWith } from '@vates/decorate-with'
import { deduped } from '@vates/disposable/deduped.js'
import { getHandler } from '@xen-orchestra/fs'
import { parseDuration } from '@vates/parse-duration'
import { Xapi } from '@xen-orchestra/xapi'
const logger = require('@xen-orchestra/log').createLogger('xo:backups:worker')
import { RemoteAdapter } from './RemoteAdapter.mjs'
import { Task } from './Task.mjs'
require('@xen-orchestra/log/configure').catchGlobalErrors(logger)
createCachedLookup().patchGlobal()
require('@vates/cached-dns.lookup').createCachedLookup().patchGlobal()
const Disposable = require('promise-toolbox/Disposable')
const ignoreErrors = require('promise-toolbox/ignoreErrors')
const { compose } = require('@vates/compose')
const { createDebounceResource } = require('@vates/disposable/debounceResource.js')
const { decorateMethodsWith } = require('@vates/decorate-with')
const { deduped } = require('@vates/disposable/deduped.js')
const { getHandler } = require('@xen-orchestra/fs')
const { createRunner } = require('./Backup.js')
const { parseDuration } = require('@vates/parse-duration')
const { Xapi } = require('@xen-orchestra/xapi')
const { RemoteAdapter } = require('./RemoteAdapter.js')
const { Task } = require('./Task.js')
const logger = createLogger('xo:backups:worker')
catchGlobalErrors(logger)
const { debug } = logger
class BackupWorker {

View File

@@ -1,11 +1,13 @@
import cancelable from 'promise-toolbox/cancelable'
import CancelToken from 'promise-toolbox/CancelToken'
'use strict'
const cancelable = require('promise-toolbox/cancelable')
const CancelToken = require('promise-toolbox/CancelToken')
// Similar to `Promise.all` + `map` but pass a cancel token to the callback
//
// If any of the executions fails, the cancel token will be triggered and the
// first reason will be rejected.
export const cancelableMap = cancelable(async function cancelableMap($cancelToken, iterable, callback) {
exports.cancelableMap = cancelable(async function cancelableMap($cancelToken, iterable, callback) {
const { cancel, token } = CancelToken.source([$cancelToken])
try {
return await Promise.all(

View File

@@ -1,19 +1,19 @@
import test from 'test'
import { strict as assert } from 'node:assert'
'use strict'
import tmp from 'tmp'
import fs from 'fs-extra'
import * as uuid from 'uuid'
import { getHandler } from '@xen-orchestra/fs'
import { pFromCallback } from 'promise-toolbox'
import { RemoteAdapter } from './RemoteAdapter.mjs'
import { VHDFOOTER, VHDHEADER } from './tests.fixtures.mjs'
import { VhdFile, Constants, VhdDirectory, VhdAbstract } from 'vhd-lib'
import { checkAliases } from './_cleanVm.mjs'
import { dirname, basename } from 'node:path'
import { rimraf } from 'rimraf'
const { beforeEach, afterEach, test, describe } = require('test')
const assert = require('assert').strict
const { beforeEach, afterEach, describe } = test
const tmp = require('tmp')
const fs = require('fs-extra')
const uuid = require('uuid')
const { getHandler } = require('@xen-orchestra/fs')
const { pFromCallback } = require('promise-toolbox')
const { RemoteAdapter } = require('./RemoteAdapter')
const { VHDFOOTER, VHDHEADER } = require('./tests.fixtures.js')
const { VhdFile, Constants, VhdDirectory, VhdAbstract } = require('vhd-lib')
const { checkAliases } = require('./_cleanVm')
const { dirname, basename } = require('path')
const { rimraf } = require('rimraf')
let tempDir, adapter, handler, jobId, vdiId, basePath, relativePath
const rootPath = 'xo-vm-backups/VMUUID/'
@@ -67,11 +67,6 @@ async function generateVhd(path, opts = {}) {
await VhdAbstract.createAlias(handler, path + '.alias.vhd', dataPath)
}
if (opts.blocks) {
for (const blockId of opts.blocks) {
await vhd.writeEntireBlock({ id: blockId, buffer: Buffer.alloc(2 * 1024 * 1024 + 512, blockId) })
}
}
await vhd.writeBlockAllocationTable()
await vhd.writeHeader()
await vhd.writeFooter()
@@ -235,7 +230,7 @@ test('it merges delta of non destroyed chain', async () => {
const metadata = JSON.parse(await handler.readFile(`${rootPath}/metadata.json`))
// size should be the size of children + grand children after the merge
assert.equal(metadata.size, 104960)
assert.equal(metadata.size, 209920)
// merging is already tested in vhd-lib, don't retest it here (and theses vhd are as empty as my stomach at 12h12)
// only check deletion
@@ -325,7 +320,6 @@ describe('tests multiple combination ', () => {
const ancestor = await generateVhd(`${basePath}/ancestor.vhd`, {
useAlias,
mode: vhdMode,
blocks: [1, 3],
})
const child = await generateVhd(`${basePath}/child.vhd`, {
useAlias,
@@ -334,7 +328,6 @@ describe('tests multiple combination ', () => {
parentUnicodeName: 'ancestor.vhd' + (useAlias ? '.alias.vhd' : ''),
parentUuid: ancestor.footer.uuid,
},
blocks: [1, 2],
})
// a grand child vhd in metadata
await generateVhd(`${basePath}/grandchild.vhd`, {
@@ -344,7 +337,6 @@ describe('tests multiple combination ', () => {
parentUnicodeName: 'child.vhd' + (useAlias ? '.alias.vhd' : ''),
parentUuid: child.footer.uuid,
},
blocks: [2, 3],
})
// an older parent that was merging in clean
@@ -403,7 +395,7 @@ describe('tests multiple combination ', () => {
const metadata = JSON.parse(await handler.readFile(`${rootPath}/metadata.json`))
// size should be the size of children + grand children + clean after the merge
assert.deepEqual(metadata.size, vhdMode === 'file' ? 6502400 : 6501888)
assert.deepEqual(metadata.size, vhdMode === 'file' ? 314880 : undefined)
// broken vhd, non referenced, abandonned should be deleted ( alias and data)
// ancestor and child should be merged

View File

@@ -1,18 +1,19 @@
import * as UUID from 'uuid'
import sum from 'lodash/sum.js'
import { asyncMap } from '@xen-orchestra/async-map'
import { Constants, openVhd, VhdAbstract, VhdFile } from 'vhd-lib'
import { isVhdAlias, resolveVhdAlias } from 'vhd-lib/aliases.js'
import { dirname, resolve } from 'node:path'
import { isMetadataFile, isVhdFile, isXvaFile, isXvaSumFile } from './_backupType.mjs'
import { limitConcurrency } from 'limit-concurrency-decorator'
import { mergeVhdChain } from 'vhd-lib/merge.js'
import { Task } from './Task.mjs'
import { Disposable } from 'promise-toolbox'
import handlerPath from '@xen-orchestra/fs/path'
'use strict'
const sum = require('lodash/sum')
const UUID = require('uuid')
const { asyncMap } = require('@xen-orchestra/async-map')
const { Constants, openVhd, VhdAbstract, VhdFile } = require('vhd-lib')
const { isVhdAlias, resolveVhdAlias } = require('vhd-lib/aliases')
const { dirname, resolve } = require('path')
const { DISK_TYPES } = Constants
const { isMetadataFile, isVhdFile, isXvaFile, isXvaSumFile } = require('./_backupType.js')
const { limitConcurrency } = require('limit-concurrency-decorator')
const { mergeVhdChain } = require('vhd-lib/merge')
const { Task } = require('./Task.js')
const { Disposable } = require('promise-toolbox')
const handlerPath = require('@xen-orchestra/fs/path')
// checking the size of a vhd directory is costly
// 1 Http Query per 1000 blocks
@@ -36,32 +37,34 @@ const computeVhdsSize = (handler, vhdPaths) =>
)
// chain is [ ancestor, child_1, ..., child_n ]
async function _mergeVhdChain(handler, chain, { logInfo, remove, mergeBlockConcurrency }) {
logInfo(`merging VHD chain`, { chain })
async function _mergeVhdChain(handler, chain, { logInfo, remove, merge, mergeBlockConcurrency }) {
if (merge) {
logInfo(`merging VHD chain`, { chain })
let done, total
const handle = setInterval(() => {
if (done !== undefined) {
logInfo('merge in progress', {
done,
parent: chain[0],
progress: Math.round((100 * done) / total),
total,
let done, total
const handle = setInterval(() => {
if (done !== undefined) {
logInfo('merge in progress', {
done,
parent: chain[0],
progress: Math.round((100 * done) / total),
total,
})
}
}, 10e3)
try {
return await mergeVhdChain(handler, chain, {
logInfo,
mergeBlockConcurrency,
onProgress({ done: d, total: t }) {
done = d
total = t
},
removeUnused: remove,
})
} finally {
clearInterval(handle)
}
}, 10e3)
try {
return await mergeVhdChain(handler, chain, {
logInfo,
mergeBlockConcurrency,
onProgress({ done: d, total: t }) {
done = d
total = t
},
removeUnused: remove,
})
} finally {
clearInterval(handle)
}
}
@@ -114,7 +117,7 @@ const listVhds = async (handler, vmDir, logWarn) => {
return { vhds, interruptedVhds, aliases }
}
export async function checkAliases(
async function checkAliases(
aliasPaths,
targetDataRepository,
{ handler, logInfo = noop, logWarn = console.warn, remove = false }
@@ -173,9 +176,11 @@ export async function checkAliases(
})
}
exports.checkAliases = checkAliases
const defaultMergeLimiter = limitConcurrency(1)
export async function cleanVm(
exports.cleanVm = async function cleanVm(
vmDir,
{
fixMetadata,
@@ -469,20 +474,23 @@ export async function cleanVm(
const metadataWithMergedVhd = {}
const doMerge = async () => {
await asyncMap(toMerge, async chain => {
const { finalVhdSize } = await limitedMergeVhdChain(handler, chain, {
const merged = await limitedMergeVhdChain(handler, chain, {
logInfo,
logWarn,
remove,
merge,
mergeBlockConcurrency,
})
const metadataPath = vhdsToJSons[chain[chain.length - 1]] // all the chain should have the same metada file
metadataWithMergedVhd[metadataPath] = (metadataWithMergedVhd[metadataPath] ?? 0) + finalVhdSize
if (merged !== undefined) {
const metadataPath = vhdsToJSons[chain[chain.length - 1]] // all the chain should have the same metada file
metadataWithMergedVhd[metadataPath] = true
}
})
}
await Promise.all([
...unusedVhdsDeletion,
toMerge.length !== 0 && (merge ? Task.run({ name: 'merge' }, doMerge) : () => Promise.resolve()),
toMerge.length !== 0 && (merge ? Task.run({ name: 'merge' }, doMerge) : doMerge()),
asyncMap(unusedXvas, path => {
logWarn('unused XVA', { path })
if (remove) {
@@ -504,11 +512,12 @@ export async function cleanVm(
// update size for delta metadata with merged VHD
// check for the other that the size is the same as the real file size
await asyncMap(jsons, async metadataPath => {
const metadata = backups.get(metadataPath)
let fileSystemSize
const mergedSize = metadataWithMergedVhd[metadataPath]
const merged = metadataWithMergedVhd[metadataPath] !== undefined
const { mode, size, vhds, xva } = metadata
@@ -518,29 +527,26 @@ export async function cleanVm(
const linkedXva = resolve('/', vmDir, xva)
try {
fileSystemSize = await handler.getSize(linkedXva)
if (fileSystemSize !== size && fileSystemSize !== undefined) {
logWarn('cleanVm: incorrect backup size in metadata', {
path: metadataPath,
actual: size ?? 'none',
expected: fileSystemSize,
})
}
} catch (error) {
// can fail with encrypted remote
}
} else if (mode === 'delta') {
const linkedVhds = Object.keys(vhds).map(key => resolve('/', vmDir, vhds[key]))
fileSystemSize = await computeVhdsSize(handler, linkedVhds)
// the size is not computed in some cases (e.g. VhdDirectory)
if (fileSystemSize === undefined) {
return
}
// don't warn if the size has changed after a merge
if (mergedSize === undefined) {
const linkedVhds = Object.keys(vhds).map(key => resolve('/', vmDir, vhds[key]))
fileSystemSize = await computeVhdsSize(handler, linkedVhds)
// the size is not computed in some cases (e.g. VhdDirectory)
if (fileSystemSize !== undefined && fileSystemSize !== size) {
logWarn('cleanVm: incorrect backup size in metadata', {
path: metadataPath,
actual: size ?? 'none',
expected: fileSystemSize,
})
}
if (!merged && fileSystemSize !== size) {
// FIXME: figure out why it occurs so often and, once fixed, log the real problems with `logWarn`
console.warn('cleanVm: incorrect backup size in metadata', {
path: metadataPath,
actual: size ?? 'none',
expected: fileSystemSize,
})
}
}
} catch (error) {
@@ -548,19 +554,9 @@ export async function cleanVm(
return
}
// systematically update size and differentials after a merge
// @todo : after 2024-04-01 remove the fixmetadata options since the size computation is fixed
if (mergedSize || (fixMetadata && fileSystemSize !== size)) {
metadata.size = mergedSize ?? fileSystemSize ?? size
if (mergedSize) {
// all disks are now key disk
metadata.isVhdDifferencing = {}
for (const id of Object.values(metadata.vdis ?? {})) {
metadata.isVhdDifferencing[`${id}.vhd`] = false
}
}
// systematically update size after a merge
if ((merged || fixMetadata) && size !== fileSystemSize) {
metadata.size = fileSystemSize
mustRegenerateCache = true
try {
await handler.writeFile(metadataPath, JSON.stringify(metadata), { flags: 'w' })

View File

@@ -0,0 +1,8 @@
'use strict'
const { utcFormat, utcParse } = require('d3-time-format')
// Format a date in ISO 8601 in a safe way to be used in filenames
// (even on Windows).
exports.formatFilenameDate = utcFormat('%Y%m%dT%H%M%SZ')
exports.parseFilenameDate = utcParse('%Y%m%dT%H%M%SZ')

View File

@@ -1,6 +0,0 @@
import { utcFormat, utcParse } from 'd3-time-format'
// Format a date in ISO 8601 in a safe way to be used in filenames
// (even on Windows).
export const formatFilenameDate = utcFormat('%Y%m%dT%H%M%SZ')
export const parseFilenameDate = utcParse('%Y%m%dT%H%M%SZ')

View File

@@ -1,4 +1,6 @@
'use strict'
// returns all entries but the last retention-th
export function getOldEntries(retention, entries) {
exports.getOldEntries = function getOldEntries(retention, entries) {
return entries === undefined ? [] : retention > 0 ? entries.slice(0, -retention) : entries
}

View File

@@ -1,11 +1,13 @@
import Disposable from 'promise-toolbox/Disposable'
import { join } from 'node:path'
import { mkdir, rmdir } from 'node:fs/promises'
import { tmpdir } from 'os'
'use strict'
const Disposable = require('promise-toolbox/Disposable')
const { join } = require('path')
const { mkdir, rmdir } = require('fs-extra')
const { tmpdir } = require('os')
const MAX_ATTEMPTS = 3
export async function getTmpDir() {
exports.getTmpDir = async function getTmpDir() {
for (let i = 0; true; ++i) {
const path = join(tmpdir(), Math.random().toString(36).slice(2))
try {

View File

@@ -0,0 +1,8 @@
'use strict'
const BACKUP_DIR = 'xo-vm-backups'
exports.BACKUP_DIR = BACKUP_DIR
exports.getVmBackupDir = function getVmBackupDir(uuid) {
return `${BACKUP_DIR}/${uuid}`
}

View File

@@ -1,5 +0,0 @@
export const BACKUP_DIR = 'xo-vm-backups'
export function getVmBackupDir(uuid) {
return `${BACKUP_DIR}/${uuid}`
}

View File

@@ -1,30 +1,39 @@
import groupBy from 'lodash/groupBy.js'
import ignoreErrors from 'promise-toolbox/ignoreErrors'
import omit from 'lodash/omit.js'
import { asyncMap } from '@xen-orchestra/async-map'
import { CancelToken } from 'promise-toolbox'
import { compareVersions } from 'compare-versions'
import { createVhdStreamWithLength } from 'vhd-lib'
import { defer } from 'golike-defer'
'use strict'
import { cancelableMap } from './_cancelableMap.mjs'
import { Task } from './Task.mjs'
import pick from 'lodash/pick.js'
const find = require('lodash/find.js')
const groupBy = require('lodash/groupBy.js')
const ignoreErrors = require('promise-toolbox/ignoreErrors')
const omit = require('lodash/omit.js')
const { asyncMap } = require('@xen-orchestra/async-map')
const { CancelToken } = require('promise-toolbox')
const { compareVersions } = require('compare-versions')
const { createVhdStreamWithLength } = require('vhd-lib')
const { defer } = require('golike-defer')
// in `other_config` of an incrementally replicated VM, contains the UUID of the source VM
export const TAG_BASE_DELTA = 'xo:base_delta'
const { cancelableMap } = require('./_cancelableMap.js')
const { Task } = require('./Task.js')
const pick = require('lodash/pick.js')
// in `other_config` of an incrementally replicated VM, contains the UUID of the target SR used for replication
//
// added after the complete replication
export const TAG_BACKUP_SR = 'xo:backup:sr'
const TAG_BASE_DELTA = 'xo:base_delta'
exports.TAG_BASE_DELTA = TAG_BASE_DELTA
// in other_config of VDIs of an incrementally replicated VM, contains the UUID of the source VDI
export const TAG_COPY_SRC = 'xo:copy_of'
const TAG_COPY_SRC = 'xo:copy_of'
exports.TAG_COPY_SRC = TAG_COPY_SRC
const ensureArray = value => (value === undefined ? [] : Array.isArray(value) ? value : [value])
const resolveUuid = async (xapi, cache, uuid, type) => {
if (uuid == null) {
return uuid
}
let ref = cache.get(uuid)
if (ref === undefined) {
ref = await xapi.call(`${type}.get_by_uuid`, uuid)
cache.set(uuid, ref)
}
return ref
}
export async function exportIncrementalVm(
exports.exportIncrementalVm = async function exportIncrementalVm(
vm,
baseVm,
{
@@ -34,8 +43,6 @@ export async function exportIncrementalVm(
fullVdisRequired = new Set(),
disableBaseTags = false,
nbdConcurrency = 1,
preferNbd,
} = {}
) {
// refs of VM's VDIs → base's VDIs.
@@ -83,8 +90,6 @@ export async function exportIncrementalVm(
baseRef: baseVdi?.$ref,
cancelToken,
format: 'vhd',
nbdConcurrency,
preferNbd,
})
})
@@ -138,11 +143,11 @@ export async function exportIncrementalVm(
)
}
export const importIncrementalVm = defer(async function importIncrementalVm(
exports.importIncrementalVm = defer(async function importIncrementalVm(
$defer,
incrementalVm,
sr,
{ cancelToken = CancelToken.none, newMacAddresses = false } = {}
{ cancelToken = CancelToken.none, detectBase = true, mapVdisSrs = {}, newMacAddresses = false } = {}
) {
const { version } = incrementalVm
if (compareVersions(version, '1.0.0') < 0) {
@@ -152,6 +157,32 @@ export const importIncrementalVm = defer(async function importIncrementalVm(
const vmRecord = incrementalVm.vm
const xapi = sr.$xapi
let baseVm
if (detectBase) {
const remoteBaseVmUuid = vmRecord.other_config[TAG_BASE_DELTA]
if (remoteBaseVmUuid) {
baseVm = find(xapi.objects.all, obj => (obj = obj.other_config) && obj[TAG_COPY_SRC] === remoteBaseVmUuid)
if (!baseVm) {
throw new Error(`could not find the base VM (copy of ${remoteBaseVmUuid})`)
}
}
}
const cache = new Map()
const mapVdisSrRefs = {}
for (const [vdiUuid, srUuid] of Object.entries(mapVdisSrs)) {
mapVdisSrRefs[vdiUuid] = await resolveUuid(xapi, cache, srUuid, 'SR')
}
const baseVdis = {}
baseVm &&
baseVm.$VBDs.forEach(vbd => {
const vdi = vbd.$VDI
if (vdi !== undefined) {
baseVdis[vbd.VDI] = vbd.$VDI
}
})
const vdiRecords = incrementalVm.vdis
// 0. Create suspend_VDI
@@ -163,7 +194,18 @@ export const importIncrementalVm = defer(async function importIncrementalVm(
vm: pick(vmRecord, 'uuid', 'name_label', 'suspend_VDI'),
})
} else {
suspendVdi = await xapi.getRecord('VDI', await xapi.VDI_create(vdi))
suspendVdi = await xapi.getRecord(
'VDI',
await xapi.VDI_create({
...vdi,
other_config: {
...vdi.other_config,
[TAG_BASE_DELTA]: undefined,
[TAG_COPY_SRC]: vdi.uuid,
},
sr: mapVdisSrRefs[vdi.uuid] ?? sr.$ref,
})
)
$defer.onFailure(() => suspendVdi.$destroy())
}
}
@@ -181,6 +223,10 @@ export const importIncrementalVm = defer(async function importIncrementalVm(
ha_always_run: false,
is_a_template: false,
name_label: '[Importing…] ' + vmRecord.name_label,
other_config: {
...vmRecord.other_config,
[TAG_COPY_SRC]: vmRecord.uuid,
},
},
{
bios_strings: vmRecord.bios_strings,
@@ -201,8 +247,14 @@ export const importIncrementalVm = defer(async function importIncrementalVm(
const vdi = vdiRecords[vdiRef]
let newVdi
if (vdi.baseVdi !== undefined) {
newVdi = await xapi.getRecord('VDI', await vdi.baseVdi.$clone())
const remoteBaseVdiUuid = detectBase && vdi.other_config[TAG_BASE_DELTA]
if (remoteBaseVdiUuid) {
const baseVdi = find(baseVdis, vdi => vdi.other_config[TAG_COPY_SRC] === remoteBaseVdiUuid)
if (!baseVdi) {
throw new Error(`missing base VDI (copy of ${remoteBaseVdiUuid})`)
}
newVdi = await xapi.getRecord('VDI', await baseVdi.$clone())
$defer.onFailure(() => newVdi.$destroy())
await newVdi.update_other_config(TAG_COPY_SRC, vdi.uuid)
@@ -213,7 +265,18 @@ export const importIncrementalVm = defer(async function importIncrementalVm(
// suspendVDI has already created
newVdi = suspendVdi
} else {
newVdi = await xapi.getRecord('VDI', await xapi.VDI_create(vdi))
newVdi = await xapi.getRecord(
'VDI',
await xapi.VDI_create({
...vdi,
other_config: {
...vdi.other_config,
[TAG_BASE_DELTA]: undefined,
[TAG_COPY_SRC]: vdi.uuid,
},
SR: mapVdisSrRefs[vdi.uuid] ?? sr.$ref,
})
)
$defer.onFailure(() => newVdi.$destroy())
}
@@ -252,19 +315,13 @@ export const importIncrementalVm = defer(async function importIncrementalVm(
// Import VDI contents.
cancelableMap(cancelToken, Object.entries(newVdis), async (cancelToken, [id, vdi]) => {
for (let stream of ensureArray(streams[`${id}.vhd`])) {
if (stream === null) {
// we restore a backup and reuse completly a local snapshot
continue
}
if (typeof stream === 'function') {
stream = await stream()
}
if (stream.length === undefined) {
stream = await createVhdStreamWithLength(stream)
}
await xapi.setField('VDI', vdi.$ref, 'name_label', `[Importing] ${vdiRecords[id].name_label}`)
await vdi.$importContent(stream, { cancelToken, format: 'vhd' })
await xapi.setField('VDI', vdi.$ref, 'name_label', vdiRecords[id].name_label)
}
}),

View File

@@ -1,4 +1,6 @@
import assert from 'node:assert'
'use strict'
const assert = require('assert')
const COMPRESSED_MAGIC_NUMBERS = [
// https://tools.ietf.org/html/rfc1952.html#page-5
@@ -45,7 +47,7 @@ const isValidTar = async (handler, size, fd) => {
}
// TODO: find an heuristic for compressed files
export async function isValidXva(path) {
async function isValidXva(path) {
const handler = this._handler
// size is longer when encrypted + reading part of an encrypted file is not implemented
@@ -72,5 +74,6 @@ export async function isValidXva(path) {
return true
}
}
exports.isValidXva = isValidXva
const noop = Function.prototype

View File

@@ -1,7 +1,9 @@
import fromCallback from 'promise-toolbox/fromCallback'
import { createLogger } from '@xen-orchestra/log'
import { createParser } from 'parse-pairs'
import { execFile } from 'child_process'
'use strict'
const fromCallback = require('promise-toolbox/fromCallback')
const { createLogger } = require('@xen-orchestra/log')
const { createParser } = require('parse-pairs')
const { execFile } = require('child_process')
const { debug } = createLogger('xo:backups:listPartitions')
@@ -22,7 +24,8 @@ const IGNORED_PARTITION_TYPES = {
0x82: true, // swap
}
export const LVM_PARTITION_TYPE = 0x8e
const LVM_PARTITION_TYPE = 0x8e
exports.LVM_PARTITION_TYPE = LVM_PARTITION_TYPE
const parsePartxLine = createParser({
keyTransform: key => (key === 'UUID' ? 'id' : key.toLowerCase()),
@@ -30,7 +33,7 @@ const parsePartxLine = createParser({
})
// returns an empty array in case of a non-partitioned disk
export async function listPartitions(devicePath) {
exports.listPartitions = async function listPartitions(devicePath) {
const parts = await fromCallback(execFile, 'partx', [
'--bytes',
'--output=NR,START,SIZE,NAME,UUID,TYPE',

View File

@@ -1,6 +1,8 @@
import fromCallback from 'promise-toolbox/fromCallback'
import { createParser } from 'parse-pairs'
import { execFile } from 'child_process'
'use strict'
const fromCallback = require('promise-toolbox/fromCallback')
const { createParser } = require('parse-pairs')
const { execFile } = require('child_process')
// ===================================================================
@@ -27,5 +29,5 @@ const makeFunction =
.map(Array.isArray(fields) ? parse : line => parse(line)[fields])
}
export const lvs = makeFunction('lvs')
export const pvs = makeFunction('pvs')
exports.lvs = makeFunction('lvs')
exports.pvs = makeFunction('pvs')

View File

@@ -1,20 +1,22 @@
import { asyncMap } from '@xen-orchestra/async-map'
import Disposable from 'promise-toolbox/Disposable'
import ignoreErrors from 'promise-toolbox/ignoreErrors'
'use strict'
import { extractIdsFromSimplePattern } from '../extractIdsFromSimplePattern.mjs'
import { PoolMetadataBackup } from './_PoolMetadataBackup.mjs'
import { XoMetadataBackup } from './_XoMetadataBackup.mjs'
import { DEFAULT_SETTINGS, Abstract } from './_Abstract.mjs'
import { runTask } from './_runTask.mjs'
import { getAdaptersByRemote } from './_getAdaptersByRemote.mjs'
const { asyncMap } = require('@xen-orchestra/async-map')
const Disposable = require('promise-toolbox/Disposable')
const ignoreErrors = require('promise-toolbox/ignoreErrors')
const { extractIdsFromSimplePattern } = require('../extractIdsFromSimplePattern.js')
const { PoolMetadataBackup } = require('./_PoolMetadataBackup.js')
const { XoMetadataBackup } = require('./_XoMetadataBackup.js')
const { DEFAULT_SETTINGS, Abstract } = require('./_Abstract.js')
const { runTask } = require('./_runTask.js')
const { getAdaptersByRemote } = require('./_getAdaptersByRemote.js')
const DEFAULT_METADATA_SETTINGS = {
retentionPoolMetadata: 0,
retentionXoMetadata: 0,
}
export const Metadata = class MetadataBackupRunner extends Abstract {
exports.Metadata = class MetadataBackupRunner extends Abstract {
_computeBaseSettings(config, job) {
const baseSettings = { ...DEFAULT_SETTINGS }
Object.assign(baseSettings, DEFAULT_METADATA_SETTINGS, config.defaultSettings, config.metadata?.defaultSettings)

View File

@@ -1,15 +1,17 @@
import { asyncMapSettled } from '@xen-orchestra/async-map'
import Disposable from 'promise-toolbox/Disposable'
import { limitConcurrency } from 'limit-concurrency-decorator'
'use strict'
import { extractIdsFromSimplePattern } from '../extractIdsFromSimplePattern.mjs'
import { Task } from '../Task.mjs'
import createStreamThrottle from './_createStreamThrottle.mjs'
import { DEFAULT_SETTINGS, Abstract } from './_Abstract.mjs'
import { runTask } from './_runTask.mjs'
import { getAdaptersByRemote } from './_getAdaptersByRemote.mjs'
import { FullRemote } from './_vmRunners/FullRemote.mjs'
import { IncrementalRemote } from './_vmRunners/IncrementalRemote.mjs'
const { asyncMapSettled } = require('@xen-orchestra/async-map')
const Disposable = require('promise-toolbox/Disposable')
const { limitConcurrency } = require('limit-concurrency-decorator')
const { extractIdsFromSimplePattern } = require('../extractIdsFromSimplePattern.js')
const { Task } = require('../Task.js')
const createStreamThrottle = require('./_createStreamThrottle.js')
const { DEFAULT_SETTINGS, Abstract } = require('./_Abstract.js')
const { runTask } = require('./_runTask.js')
const { getAdaptersByRemote } = require('./_getAdaptersByRemote.js')
const { FullRemote } = require('./_vmRunners/FullRemote.js')
const { IncrementalRemote } = require('./_vmRunners/IncrementalRemote.js')
const DEFAULT_REMOTE_VM_SETTINGS = {
concurrency: 2,
@@ -25,7 +27,7 @@ const DEFAULT_REMOTE_VM_SETTINGS = {
vmTimeout: 0,
}
export const VmsRemote = class RemoteVmsBackupRunner extends Abstract {
exports.VmsRemote = class RemoteVmsBackupRunner extends Abstract {
_computeBaseSettings(config, job) {
const baseSettings = { ...DEFAULT_SETTINGS }
Object.assign(baseSettings, DEFAULT_REMOTE_VM_SETTINGS, config.defaultSettings, config.vm?.defaultSettings)

View File

@@ -1,15 +1,17 @@
import { asyncMapSettled } from '@xen-orchestra/async-map'
import Disposable from 'promise-toolbox/Disposable'
import { limitConcurrency } from 'limit-concurrency-decorator'
'use strict'
import { extractIdsFromSimplePattern } from '../extractIdsFromSimplePattern.mjs'
import { Task } from '../Task.mjs'
import createStreamThrottle from './_createStreamThrottle.mjs'
import { DEFAULT_SETTINGS, Abstract } from './_Abstract.mjs'
import { runTask } from './_runTask.mjs'
import { getAdaptersByRemote } from './_getAdaptersByRemote.mjs'
import { IncrementalXapi } from './_vmRunners/IncrementalXapi.mjs'
import { FullXapi } from './_vmRunners/FullXapi.mjs'
const { asyncMapSettled } = require('@xen-orchestra/async-map')
const Disposable = require('promise-toolbox/Disposable')
const { limitConcurrency } = require('limit-concurrency-decorator')
const { extractIdsFromSimplePattern } = require('../extractIdsFromSimplePattern.js')
const { Task } = require('../Task.js')
const createStreamThrottle = require('./_createStreamThrottle.js')
const { DEFAULT_SETTINGS, Abstract } = require('./_Abstract.js')
const { runTask } = require('./_runTask.js')
const { getAdaptersByRemote } = require('./_getAdaptersByRemote.js')
const { IncrementalXapi } = require('./_vmRunners/IncrementalXapi.js')
const { FullXapi } = require('./_vmRunners/FullXapi.js')
const DEFAULT_XAPI_VM_SETTINGS = {
bypassVdiChainsCheck: false,
@@ -34,7 +36,7 @@ const DEFAULT_XAPI_VM_SETTINGS = {
vmTimeout: 0,
}
export const VmsXapi = class VmsXapiBackupRunner extends Abstract {
exports.VmsXapi = class VmsXapiBackupRunner extends Abstract {
_computeBaseSettings(config, job) {
const baseSettings = { ...DEFAULT_SETTINGS }
Object.assign(baseSettings, DEFAULT_XAPI_VM_SETTINGS, config.defaultSettings, config.vm?.defaultSettings)

View File

@@ -1,15 +1,17 @@
import Disposable from 'promise-toolbox/Disposable'
import pTimeout from 'promise-toolbox/timeout'
import { compileTemplate } from '@xen-orchestra/template'
import { runTask } from './_runTask.mjs'
import { RemoteTimeoutError } from './_RemoteTimeoutError.mjs'
'use strict'
export const DEFAULT_SETTINGS = {
const Disposable = require('promise-toolbox/Disposable')
const pTimeout = require('promise-toolbox/timeout')
const { compileTemplate } = require('@xen-orchestra/template')
const { runTask } = require('./_runTask.js')
const { RemoteTimeoutError } = require('./_RemoteTimeoutError.js')
exports.DEFAULT_SETTINGS = {
getRemoteTimeout: 300e3,
reportWhen: 'failure',
}
export const Abstract = class AbstractRunner {
exports.Abstract = class AbstractRunner {
constructor({ config, getAdapter, getConnectedRecord, job, schedule }) {
this._config = config
this._getRecord = getConnectedRecord

View File

@@ -1,13 +1,16 @@
import { asyncMap } from '@xen-orchestra/async-map'
'use strict'
import { DIR_XO_POOL_METADATA_BACKUPS } from '../RemoteAdapter.mjs'
import { forkStreamUnpipe } from './_forkStreamUnpipe.mjs'
import { formatFilenameDate } from '../_filenameDate.mjs'
import { Task } from '../Task.mjs'
const { asyncMap } = require('@xen-orchestra/async-map')
export const PATH_DB_DUMP = '/pool/xmldbdump'
const { DIR_XO_POOL_METADATA_BACKUPS } = require('../RemoteAdapter.js')
const { forkStreamUnpipe } = require('./_forkStreamUnpipe.js')
const { formatFilenameDate } = require('../_filenameDate.js')
const { Task } = require('../Task.js')
export class PoolMetadataBackup {
const PATH_DB_DUMP = '/pool/xmldbdump'
exports.PATH_DB_DUMP = PATH_DB_DUMP
exports.PoolMetadataBackup = class PoolMetadataBackup {
constructor({ config, job, pool, remoteAdapters, schedule, settings }) {
this._config = config
this._job = job

View File

@@ -1,6 +1,8 @@
export class RemoteTimeoutError extends Error {
'use strict'
class RemoteTimeoutError extends Error {
constructor(remoteId) {
super('timeout while getting the remote ' + remoteId)
this.remoteId = remoteId
}
}
exports.RemoteTimeoutError = RemoteTimeoutError

View File

@@ -1,11 +1,13 @@
import { asyncMap } from '@xen-orchestra/async-map'
import { join } from '@xen-orchestra/fs/path'
'use strict'
import { DIR_XO_CONFIG_BACKUPS } from '../RemoteAdapter.mjs'
import { formatFilenameDate } from '../_filenameDate.mjs'
import { Task } from '../Task.mjs'
const { asyncMap } = require('@xen-orchestra/async-map')
const { join } = require('@xen-orchestra/fs/path')
export class XoMetadataBackup {
const { DIR_XO_CONFIG_BACKUPS } = require('../RemoteAdapter.js')
const { formatFilenameDate } = require('../_filenameDate.js')
const { Task } = require('../Task.js')
exports.XoMetadataBackup = class XoMetadataBackup {
constructor({ config, job, remoteAdapters, schedule, settings }) {
this._config = config
this._job = job
@@ -22,13 +24,7 @@ export class XoMetadataBackup {
const dir = `${scheduleDir}/${formatFilenameDate(timestamp)}`
const data = job.xoMetadata
let dataBaseName = './data'
// JSON data is sent as plain string, binary data is sent as an object with `data` and `encoding properties
const isJson = typeof data === 'string'
if (isJson) {
dataBaseName += '.json'
}
const dataBaseName = './data.json'
const metadata = JSON.stringify(
{
@@ -60,7 +56,7 @@ export class XoMetadataBackup {
async () => {
const handler = adapter.handler
const dirMode = this._config.dirMode
await handler.outputFile(dataFileName, isJson ? data : Buffer.from(data.data, data.encoding), { dirMode })
await handler.outputFile(dataFileName, data, { dirMode })
await handler.outputFile(metaDataFileName, metadata, {
dirMode,
})

View File

@@ -1,10 +1,12 @@
import { pipeline } from 'node:stream'
import { ThrottleGroup } from '@kldzj/stream-throttle'
import identity from 'lodash/identity.js'
'use strict'
const { pipeline } = require('node:stream')
const { ThrottleGroup } = require('@kldzj/stream-throttle')
const identity = require('lodash/identity.js')
const noop = Function.prototype
export default function createStreamThrottle(rate) {
module.exports = function createStreamThrottle(rate) {
if (rate === 0) {
return identity
}

View File

@@ -1,13 +1,14 @@
import { createLogger } from '@xen-orchestra/log'
import { finished, PassThrough } from 'node:stream'
'use strict'
const { debug } = createLogger('xo:backups:forkStreamUnpipe')
const { finished, PassThrough } = require('node:stream')
const { debug } = require('@xen-orchestra/log').createLogger('xo:backups:forkStreamUnpipe')
// create a new readable stream from an existing one which may be piped later
//
// in case of error in the new readable stream, it will simply be unpiped
// from the original one
export function forkStreamUnpipe(source) {
exports.forkStreamUnpipe = function forkStreamUnpipe(source) {
const { forks = 0 } = source
source.forks = forks + 1

View File

@@ -1,7 +1,9 @@
export function getAdaptersByRemote(adapters) {
'use strict'
const getAdaptersByRemote = adapters => {
const adaptersByRemote = {}
adapters.forEach(({ adapter, remoteId }) => {
adaptersByRemote[remoteId] = adapter
})
return adaptersByRemote
}
exports.getAdaptersByRemote = getAdaptersByRemote

View File

@@ -0,0 +1,6 @@
'use strict'
const { Task } = require('../Task.js')
const noop = Function.prototype
const runTask = (...args) => Task.run(...args).catch(noop) // errors are handled by logs
exports.runTask = runTask

View File

@@ -1,5 +0,0 @@
import { Task } from '../Task.mjs'
const noop = Function.prototype
export const runTask = (...args) => Task.run(...args).catch(noop) // errors are handled by logs

View File

@@ -1,12 +1,14 @@
import { decorateMethodsWith } from '@vates/decorate-with'
import { defer } from 'golike-defer'
import { AbstractRemote } from './_AbstractRemote.mjs'
import { FullRemoteWriter } from '../_writers/FullRemoteWriter.mjs'
import { forkStreamUnpipe } from '../_forkStreamUnpipe.mjs'
import { watchStreamSize } from '../../_watchStreamSize.mjs'
import { Task } from '../../Task.mjs'
'use strict'
export const FullRemote = class FullRemoteVmBackupRunner extends AbstractRemote {
const { decorateMethodsWith } = require('@vates/decorate-with')
const { defer } = require('golike-defer')
const { AbstractRemote } = require('./_AbstractRemote')
const { FullRemoteWriter } = require('../_writers/FullRemoteWriter')
const { forkStreamUnpipe } = require('../_forkStreamUnpipe')
const { watchStreamSize } = require('../../_watchStreamSize')
const { Task } = require('../../Task')
class FullRemoteVmBackupRunner extends AbstractRemote {
_getRemoteWriter() {
return FullRemoteWriter
}
@@ -29,8 +31,6 @@ export const FullRemote = class FullRemoteVmBackupRunner extends AbstractRemote
writer =>
writer.run({
stream: forkStreamUnpipe(stream),
// stream will be forked and transformed, it's not safe to attach additionnal properties to it
streamLength: stream.length,
timestamp: metadata.timestamp,
vm: metadata.vm,
vmSnapshot: metadata.vmSnapshot,
@@ -47,6 +47,7 @@ export const FullRemote = class FullRemoteVmBackupRunner extends AbstractRemote
}
}
decorateMethodsWith(FullRemote, {
exports.FullRemote = FullRemoteVmBackupRunner
decorateMethodsWith(FullRemoteVmBackupRunner, {
_run: defer,
})

View File

@@ -1,14 +1,16 @@
import { createLogger } from '@xen-orchestra/log'
'use strict'
import { forkStreamUnpipe } from '../_forkStreamUnpipe.mjs'
import { FullRemoteWriter } from '../_writers/FullRemoteWriter.mjs'
import { FullXapiWriter } from '../_writers/FullXapiWriter.mjs'
import { watchStreamSize } from '../../_watchStreamSize.mjs'
import { AbstractXapi } from './_AbstractXapi.mjs'
const { createLogger } = require('@xen-orchestra/log')
const { forkStreamUnpipe } = require('../_forkStreamUnpipe.js')
const { FullRemoteWriter } = require('../_writers/FullRemoteWriter.js')
const { FullXapiWriter } = require('../_writers/FullXapiWriter.js')
const { watchStreamSize } = require('../../_watchStreamSize.js')
const { AbstractXapi } = require('./_AbstractXapi.js')
const { debug } = createLogger('xo:backups:FullXapiVmBackup')
export const FullXapi = class FullXapiVmBackupRunner extends AbstractXapi {
exports.FullXapi = class FullXapiVmBackupRunner extends AbstractXapi {
_getWriters() {
return [FullRemoteWriter, FullXapiWriter]
}
@@ -35,25 +37,13 @@ export const FullXapi = class FullXapiVmBackupRunner extends AbstractXapi {
useSnapshot: false,
})
)
const vdis = await exportedVm.$getDisks()
let maxStreamLength = 1024 * 1024 // Ovf file and tar headers are a few KB, let's stay safe
for (const vdiRef of vdis) {
const vdi = await this._xapi.getRecord('VDI', vdiRef)
// the size a of fully allocated vdi will be virtual_size exaclty, it's a gross over evaluation
// of the real stream size in general, since a disk is never completly full
// vdi.physical_size seems to underevaluate a lot the real disk usage of a VDI, as of 2023-10-30
maxStreamLength += vdi.virtual_size
}
const sizeContainer = watchStreamSize(stream)
const timestamp = Date.now()
await this._callWriters(
writer =>
writer.run({
maxStreamLength,
sizeContainer,
stream: forkStreamUnpipe(stream),
timestamp,

View File

@@ -1,14 +1,15 @@
import { asyncEach } from '@vates/async-each'
import { decorateMethodsWith } from '@vates/decorate-with'
import { defer } from 'golike-defer'
import assert from 'node:assert'
import isVhdDifferencingDisk from 'vhd-lib/isVhdDifferencingDisk.js'
import mapValues from 'lodash/mapValues.js'
'use strict'
const assert = require('node:assert')
import { AbstractRemote } from './_AbstractRemote.mjs'
import { forkDeltaExport } from './_forkDeltaExport.mjs'
import { IncrementalRemoteWriter } from '../_writers/IncrementalRemoteWriter.mjs'
import { Task } from '../../Task.mjs'
const { decorateMethodsWith } = require('@vates/decorate-with')
const { defer } = require('golike-defer')
const { mapValues } = require('lodash')
const { Task } = require('../../Task')
const { AbstractRemote } = require('./_AbstractRemote')
const { IncrementalRemoteWriter } = require('../_writers/IncrementalRemoteWriter')
const { forkDeltaExport } = require('./_forkDeltaExport')
const isVhdDifferencingDisk = require('vhd-lib/isVhdDifferencingDisk')
const { asyncEach } = require('@vates/async-each')
class IncrementalRemoteVmBackupRunner extends AbstractRemote {
_getRemoteWriter() {
@@ -32,10 +33,10 @@ class IncrementalRemoteVmBackupRunner extends AbstractRemote {
useChain: false,
})
const isVhdDifferencing = {}
const differentialVhds = {}
await asyncEach(Object.entries(incrementalExport.streams), async ([key, stream]) => {
isVhdDifferencing[key] = await isVhdDifferencingDisk(stream)
differentialVhds[key] = await isVhdDifferencingDisk(stream)
})
incrementalExport.streams = mapValues(incrementalExport.streams, this._throttleStream)
@@ -43,7 +44,7 @@ class IncrementalRemoteVmBackupRunner extends AbstractRemote {
writer =>
writer.transfer({
deltaExport: forkDeltaExport(incrementalExport),
isVhdDifferencing,
differentialVhds,
timestamp: metadata.timestamp,
vm: metadata.vm,
vmSnapshot: metadata.vmSnapshot,
@@ -60,7 +61,7 @@ class IncrementalRemoteVmBackupRunner extends AbstractRemote {
}
}
export const IncrementalRemote = IncrementalRemoteVmBackupRunner
exports.IncrementalRemote = IncrementalRemoteVmBackupRunner
decorateMethodsWith(IncrementalRemoteVmBackupRunner, {
_run: defer,
})

View File

@@ -1,26 +1,28 @@
import { asyncEach } from '@vates/async-each'
import { asyncMap } from '@xen-orchestra/async-map'
import { createLogger } from '@xen-orchestra/log'
import { pipeline } from 'node:stream'
import findLast from 'lodash/findLast.js'
import isVhdDifferencingDisk from 'vhd-lib/isVhdDifferencingDisk.js'
import keyBy from 'lodash/keyBy.js'
import mapValues from 'lodash/mapValues.js'
import vhdStreamValidator from 'vhd-lib/vhdStreamValidator.js'
'use strict'
import { AbstractXapi } from './_AbstractXapi.mjs'
import { exportIncrementalVm } from '../../_incrementalVm.mjs'
import { forkDeltaExport } from './_forkDeltaExport.mjs'
import { IncrementalRemoteWriter } from '../_writers/IncrementalRemoteWriter.mjs'
import { IncrementalXapiWriter } from '../_writers/IncrementalXapiWriter.mjs'
import { Task } from '../../Task.mjs'
import { watchStreamSize } from '../../_watchStreamSize.mjs'
const findLast = require('lodash/findLast.js')
const keyBy = require('lodash/keyBy.js')
const mapValues = require('lodash/mapValues.js')
const vhdStreamValidator = require('vhd-lib/vhdStreamValidator.js')
const { asyncMap } = require('@xen-orchestra/async-map')
const { createLogger } = require('@xen-orchestra/log')
const { pipeline } = require('node:stream')
const { IncrementalRemoteWriter } = require('../_writers/IncrementalRemoteWriter.js')
const { IncrementalXapiWriter } = require('../_writers/IncrementalXapiWriter.js')
const { exportIncrementalVm } = require('../../_incrementalVm.js')
const { Task } = require('../../Task.js')
const { watchStreamSize } = require('../../_watchStreamSize.js')
const { AbstractXapi } = require('./_AbstractXapi.js')
const { forkDeltaExport } = require('./_forkDeltaExport.js')
const isVhdDifferencingDisk = require('vhd-lib/isVhdDifferencingDisk')
const { asyncEach } = require('@vates/async-each')
const { debug } = createLogger('xo:backups:IncrementalXapiVmBackup')
const noop = Function.prototype
export const IncrementalXapi = class IncrementalXapiVmBackupRunner extends AbstractXapi {
exports.IncrementalXapi = class IncrementalXapiVmBackupRunner extends AbstractXapi {
_getWriters() {
return [IncrementalRemoteWriter, IncrementalXapiWriter]
}
@@ -41,8 +43,6 @@ export const IncrementalXapi = class IncrementalXapiVmBackupRunner extends Abstr
const deltaExport = await exportIncrementalVm(exportedVm, baseVm, {
fullVdisRequired,
nbdConcurrency: this._settings.nbdConcurrency,
preferNbd: this._settings.preferNbd,
})
// since NBD is network based, if one disk use nbd , all the disk use them
// except the suspended VDI
@@ -50,11 +50,11 @@ export const IncrementalXapi = class IncrementalXapiVmBackupRunner extends Abstr
Task.info('Transfer data using NBD')
}
const isVhdDifferencing = {}
const differentialVhds = {}
// since isVhdDifferencingDisk is reading and unshifting data in stream
// it should be done BEFORE any other stream transform
await asyncEach(Object.entries(deltaExport.streams), async ([key, stream]) => {
isVhdDifferencing[key] = await isVhdDifferencingDisk(stream)
differentialVhds[key] = await isVhdDifferencingDisk(stream)
})
const sizeContainers = mapValues(deltaExport.streams, stream => watchStreamSize(stream))
@@ -69,7 +69,7 @@ export const IncrementalXapi = class IncrementalXapiVmBackupRunner extends Abstr
writer =>
writer.transfer({
deltaExport: forkDeltaExport(deltaExport),
isVhdDifferencing,
differentialVhds,
sizeContainers,
timestamp,
vm,

View File

@@ -1,6 +1,8 @@
import { asyncMap } from '@xen-orchestra/async-map'
import { createLogger } from '@xen-orchestra/log'
import { Task } from '../../Task.mjs'
'use strict'
const { asyncMap } = require('@xen-orchestra/async-map')
const { createLogger } = require('@xen-orchestra/log')
const { Task } = require('../../Task.js')
const { debug, warn } = createLogger('xo:backups:AbstractVmRunner')
@@ -17,7 +19,7 @@ const asyncEach = async (iterable, fn, thisArg = iterable) => {
}
}
export const Abstract = class AbstractVmBackupRunner {
exports.Abstract = class AbstractVmBackupRunner {
// calls fn for each function, warns of any errors, and throws only if there are no writers left
async _callWriters(fn, step, parallel = true) {
const writers = this._writers

View File

@@ -1,12 +1,11 @@
import { asyncEach } from '@vates/async-each'
import { Disposable } from 'promise-toolbox'
'use strict'
const { Abstract } = require('./_Abstract')
import { getVmBackupDir } from '../../_getVmBackupDir.mjs'
const { getVmBackupDir } = require('../../_getVmBackupDir')
const { asyncEach } = require('@vates/async-each')
const { Disposable } = require('promise-toolbox')
import { Abstract } from './_Abstract.mjs'
import { extractIdsFromSimplePattern } from '../../extractIdsFromSimplePattern.mjs'
export const AbstractRemote = class AbstractRemoteVmBackupRunner extends Abstract {
exports.AbstractRemote = class AbstractRemoteVmBackupRunner extends Abstract {
constructor({
config,
job,
@@ -35,8 +34,7 @@ export const AbstractRemote = class AbstractRemoteVmBackupRunner extends Abstrac
this._writers = writers
const RemoteWriter = this._getRemoteWriter()
extractIdsFromSimplePattern(job.remotes).forEach(remoteId => {
const adapter = remoteAdapters[remoteId]
Object.entries(remoteAdapters).forEach(([remoteId, adapter]) => {
const targetSettings = {
...settings,
...allSettings[remoteId],

View File

@@ -1,16 +1,18 @@
import assert from 'node:assert'
import groupBy from 'lodash/groupBy.js'
import ignoreErrors from 'promise-toolbox/ignoreErrors'
import { asyncMap } from '@xen-orchestra/async-map'
import { decorateMethodsWith } from '@vates/decorate-with'
import { defer } from 'golike-defer'
import { formatDateTime } from '@xen-orchestra/xapi'
'use strict'
import { getOldEntries } from '../../_getOldEntries.mjs'
import { Task } from '../../Task.mjs'
import { Abstract } from './_Abstract.mjs'
const assert = require('assert')
const groupBy = require('lodash/groupBy.js')
const ignoreErrors = require('promise-toolbox/ignoreErrors')
const { asyncMap } = require('@xen-orchestra/async-map')
const { decorateMethodsWith } = require('@vates/decorate-with')
const { defer } = require('golike-defer')
const { formatDateTime } = require('@xen-orchestra/xapi')
export const AbstractXapi = class AbstractXapiVmBackupRunner extends Abstract {
const { getOldEntries } = require('../../_getOldEntries.js')
const { Task } = require('../../Task.js')
const { Abstract } = require('./_Abstract.js')
class AbstractXapiVmBackupRunner extends Abstract {
constructor({
config,
getSnapshotNameLabel,
@@ -31,11 +33,6 @@ export const AbstractXapi = class AbstractXapiVmBackupRunner extends Abstract {
throw new Error('cannot backup a VM created by this very job')
}
const currentOperations = Object.values(vm.current_operations)
if (currentOperations.some(_ => _ === 'migrate_send' || _ === 'pool_migrate')) {
throw new Error('cannot backup a VM currently being migrated')
}
this.config = config
this.job = job
this.remoteAdapters = remoteAdapters
@@ -261,15 +258,7 @@ export const AbstractXapi = class AbstractXapiVmBackupRunner extends Abstract {
}
if (this._writers.size !== 0) {
const { pool_migrate = null, migrate_send = null } = this._exportedVm.blocked_operations
const reason = 'VM migration is blocked during backup'
await this._exportedVm.update_blocked_operations({ pool_migrate: reason, migrate_send: reason })
try {
await this._copy()
} finally {
await this._exportedVm.update_blocked_operations({ pool_migrate, migrate_send })
}
await this._copy()
}
} finally {
if (startAfter) {
@@ -282,7 +271,8 @@ export const AbstractXapi = class AbstractXapiVmBackupRunner extends Abstract {
await this._healthCheck()
}
}
exports.AbstractXapi = AbstractXapiVmBackupRunner
decorateMethodsWith(AbstractXapi, {
decorateMethodsWith(AbstractXapiVmBackupRunner, {
run: defer,
})

View File

@@ -0,0 +1,12 @@
'use strict'
const { mapValues } = require('lodash')
const { forkStreamUnpipe } = require('../_forkStreamUnpipe')
exports.forkDeltaExport = function forkDeltaExport(deltaExport) {
return Object.create(deltaExport, {
streams: {
value: mapValues(deltaExport.streams, forkStreamUnpipe),
},
})
}

View File

@@ -1,11 +0,0 @@
import cloneDeep from 'lodash/cloneDeep.js'
import mapValues from 'lodash/mapValues.js'
import { forkStreamUnpipe } from '../_forkStreamUnpipe.mjs'
export function forkDeltaExport(deltaExport) {
const { streams, ...rest } = deltaExport
const newMetadata = cloneDeep(rest)
newMetadata.streams = mapValues(streams, forkStreamUnpipe)
return newMetadata
}

View File

@@ -1,11 +1,13 @@
import { formatFilenameDate } from '../../_filenameDate.mjs'
import { getOldEntries } from '../../_getOldEntries.mjs'
import { Task } from '../../Task.mjs'
'use strict'
import { MixinRemoteWriter } from './_MixinRemoteWriter.mjs'
import { AbstractFullWriter } from './_AbstractFullWriter.mjs'
const { formatFilenameDate } = require('../../_filenameDate.js')
const { getOldEntries } = require('../../_getOldEntries.js')
const { Task } = require('../../Task.js')
export class FullRemoteWriter extends MixinRemoteWriter(AbstractFullWriter) {
const { MixinRemoteWriter } = require('./_MixinRemoteWriter.js')
const { AbstractFullWriter } = require('./_AbstractFullWriter.js')
exports.FullRemoteWriter = class FullRemoteWriter extends MixinRemoteWriter(AbstractFullWriter) {
constructor(props) {
super(props)
@@ -24,7 +26,7 @@ export class FullRemoteWriter extends MixinRemoteWriter(AbstractFullWriter) {
)
}
async _run({ maxStreamLength, timestamp, sizeContainer, stream, streamLength, vm, vmSnapshot }) {
async _run({ timestamp, sizeContainer, stream, vm, vmSnapshot }) {
const settings = this._settings
const job = this._job
const scheduleId = this._scheduleId
@@ -65,8 +67,6 @@ export class FullRemoteWriter extends MixinRemoteWriter(AbstractFullWriter) {
await Task.run({ name: 'transfer' }, async () => {
await adapter.outputStream(dataFilename, stream, {
maxStreamLength,
streamLength,
validator: tmpPath => adapter.isValidXva(tmpPath),
})
return { size: sizeContainer.size }

View File

@@ -1,16 +1,18 @@
import ignoreErrors from 'promise-toolbox/ignoreErrors'
import { asyncMap, asyncMapSettled } from '@xen-orchestra/async-map'
import { formatDateTime } from '@xen-orchestra/xapi'
'use strict'
import { formatFilenameDate } from '../../_filenameDate.mjs'
import { getOldEntries } from '../../_getOldEntries.mjs'
import { Task } from '../../Task.mjs'
const ignoreErrors = require('promise-toolbox/ignoreErrors')
const { asyncMap, asyncMapSettled } = require('@xen-orchestra/async-map')
const { formatDateTime } = require('@xen-orchestra/xapi')
import { AbstractFullWriter } from './_AbstractFullWriter.mjs'
import { MixinXapiWriter } from './_MixinXapiWriter.mjs'
import { listReplicatedVms } from './_listReplicatedVms.mjs'
const { formatFilenameDate } = require('../../_filenameDate.js')
const { getOldEntries } = require('../../_getOldEntries.js')
const { Task } = require('../../Task.js')
export class FullXapiWriter extends MixinXapiWriter(AbstractFullWriter) {
const { AbstractFullWriter } = require('./_AbstractFullWriter.js')
const { MixinXapiWriter } = require('./_MixinXapiWriter.js')
const { listReplicatedVms } = require('./_listReplicatedVms.js')
exports.FullXapiWriter = class FullXapiWriter extends MixinXapiWriter(AbstractFullWriter) {
constructor(props) {
super(props)

View File

@@ -1,28 +1,29 @@
import assert from 'node:assert'
import mapValues from 'lodash/mapValues.js'
import ignoreErrors from 'promise-toolbox/ignoreErrors'
import { asyncEach } from '@vates/async-each'
import { asyncMap } from '@xen-orchestra/async-map'
import { chainVhd, checkVhdChain, openVhd, VhdAbstract } from 'vhd-lib'
import { createLogger } from '@xen-orchestra/log'
import { decorateClass } from '@vates/decorate-with'
import { defer } from 'golike-defer'
import { dirname } from 'node:path'
'use strict'
import { formatFilenameDate } from '../../_filenameDate.mjs'
import { getOldEntries } from '../../_getOldEntries.mjs'
import { TAG_BASE_DELTA } from '../../_incrementalVm.mjs'
import { Task } from '../../Task.mjs'
const assert = require('assert')
const mapValues = require('lodash/mapValues.js')
const ignoreErrors = require('promise-toolbox/ignoreErrors')
const { asyncEach } = require('@vates/async-each')
const { asyncMap } = require('@xen-orchestra/async-map')
const { chainVhd, checkVhdChain, openVhd, VhdAbstract } = require('vhd-lib')
const { createLogger } = require('@xen-orchestra/log')
const { decorateClass } = require('@vates/decorate-with')
const { defer } = require('golike-defer')
const { dirname } = require('path')
import { MixinRemoteWriter } from './_MixinRemoteWriter.mjs'
import { AbstractIncrementalWriter } from './_AbstractIncrementalWriter.mjs'
import { checkVhd } from './_checkVhd.mjs'
import { packUuid } from './_packUuid.mjs'
import { Disposable } from 'promise-toolbox'
const { formatFilenameDate } = require('../../_filenameDate.js')
const { getOldEntries } = require('../../_getOldEntries.js')
const { Task } = require('../../Task.js')
const { MixinRemoteWriter } = require('./_MixinRemoteWriter.js')
const { AbstractIncrementalWriter } = require('./_AbstractIncrementalWriter.js')
const { checkVhd } = require('./_checkVhd.js')
const { packUuid } = require('./_packUuid.js')
const { Disposable } = require('promise-toolbox')
const { warn } = createLogger('xo:backups:DeltaBackupWriter')
export class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrementalWriter) {
class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrementalWriter) {
async checkBaseVdis(baseUuidToSrcVdi) {
const { handler } = this._adapter
const adapter = this._adapter
@@ -133,7 +134,7 @@ export class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrement
}
}
async _transfer($defer, { isVhdDifferencing, timestamp, deltaExport, vm, vmSnapshot }) {
async _transfer($defer, { differentialVhds, timestamp, deltaExport, vm, vmSnapshot }) {
const adapter = this._adapter
const job = this._job
const scheduleId = this._scheduleId
@@ -161,7 +162,6 @@ export class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrement
)
metadataContent = {
isVhdDifferencing,
jobId,
mode: job.mode,
scheduleId,
@@ -181,9 +181,9 @@ export class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrement
async ([id, vdi]) => {
const path = `${this._vmBackupDir}/${vhds[id]}`
const isDifferencing = isVhdDifferencing[`${id}.vhd`]
const isDelta = differentialVhds[`${id}.vhd`]
let parentPath
if (isDifferencing) {
if (isDelta) {
const vdiDir = dirname(path)
parentPath = (
await handler.list(vdiDir, {
@@ -197,7 +197,7 @@ export class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrement
assert.notStrictEqual(
parentPath,
undefined,
`missing parent of ${id} in ${dirname(path)}, looking for ${vdi.other_config[TAG_BASE_DELTA]}`
`missing parent of ${id} in ${dirname(path)}, looking for ${vdi.other_config['xo:base_delta']}`
)
parentPath = parentPath.slice(1) // remove leading slash
@@ -205,20 +205,16 @@ export class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrement
// TODO remove when this has been done before the export
await checkVhd(handler, parentPath)
}
// don't write it as transferSize += await async function
// since i += await asyncFun lead to race condition
// as explained : https://eslint.org/docs/latest/rules/require-atomic-updates
const transferSizeOneDisk = await adapter.writeVhd(path, deltaExport.streams[`${id}.vhd`], {
transferSize += await adapter.writeVhd(path, deltaExport.streams[`${id}.vhd`], {
// no checksum for VHDs, because they will be invalidated by
// merges and chainings
checksum: false,
validator: tmpPath => checkVhd(handler, tmpPath),
writeBlockConcurrency: this._config.writeBlockConcurrency,
})
transferSize += transferSizeOneDisk
if (isDifferencing) {
if (isDelta) {
await chainVhd(handler, parentPath, handler, path)
}
@@ -242,6 +238,6 @@ export class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrement
// TODO: run cleanup?
}
}
decorateClass(IncrementalRemoteWriter, {
exports.IncrementalRemoteWriter = decorateClass(IncrementalRemoteWriter, {
_transfer: defer,
})

View File

@@ -1,18 +1,19 @@
import { asyncMap, asyncMapSettled } from '@xen-orchestra/async-map'
import ignoreErrors from 'promise-toolbox/ignoreErrors'
import { formatDateTime } from '@xen-orchestra/xapi'
'use strict'
import { formatFilenameDate } from '../../_filenameDate.mjs'
import { getOldEntries } from '../../_getOldEntries.mjs'
import { importIncrementalVm, TAG_BACKUP_SR, TAG_BASE_DELTA, TAG_COPY_SRC } from '../../_incrementalVm.mjs'
import { Task } from '../../Task.mjs'
const { asyncMap, asyncMapSettled } = require('@xen-orchestra/async-map')
const ignoreErrors = require('promise-toolbox/ignoreErrors')
const { formatDateTime } = require('@xen-orchestra/xapi')
import { AbstractIncrementalWriter } from './_AbstractIncrementalWriter.mjs'
import { MixinXapiWriter } from './_MixinXapiWriter.mjs'
import { listReplicatedVms } from './_listReplicatedVms.mjs'
import find from 'lodash/find.js'
const { formatFilenameDate } = require('../../_filenameDate.js')
const { getOldEntries } = require('../../_getOldEntries.js')
const { importIncrementalVm, TAG_COPY_SRC } = require('../../_incrementalVm.js')
const { Task } = require('../../Task.js')
export class IncrementalXapiWriter extends MixinXapiWriter(AbstractIncrementalWriter) {
const { AbstractIncrementalWriter } = require('./_AbstractIncrementalWriter.js')
const { MixinXapiWriter } = require('./_MixinXapiWriter.js')
const { listReplicatedVms } = require('./_listReplicatedVms.js')
exports.IncrementalXapiWriter = class IncrementalXapiWriter extends MixinXapiWriter(AbstractIncrementalWriter) {
async checkBaseVdis(baseUuidToSrcVdi, baseVm) {
const sr = this._sr
const replicatedVm = listReplicatedVms(sr.$xapi, this._job.id, sr.uuid, this._vmUuid).find(
@@ -82,54 +83,6 @@ export class IncrementalXapiWriter extends MixinXapiWriter(AbstractIncrementalWr
return asyncMapSettled(this._oldEntries, vm => vm.$destroy())
}
#decorateVmMetadata(backup) {
const { _warmMigration } = this._settings
const sr = this._sr
const xapi = sr.$xapi
const vm = backup.vm
vm.other_config[TAG_COPY_SRC] = vm.uuid
const remoteBaseVmUuid = vm.other_config[TAG_BASE_DELTA]
let baseVm
if (remoteBaseVmUuid) {
baseVm = find(
xapi.objects.all,
obj => (obj = obj.other_config) && obj[TAG_COPY_SRC] === remoteBaseVmUuid && obj[TAG_BACKUP_SR] === sr.$id
)
if (!baseVm) {
throw new Error(`could not find the base VM (copy of ${remoteBaseVmUuid})`)
}
}
const baseVdis = {}
baseVm?.$VBDs.forEach(vbd => {
const vdi = vbd.$VDI
if (vdi !== undefined) {
baseVdis[vbd.VDI] = vbd.$VDI
}
})
vm.other_config[TAG_COPY_SRC] = vm.uuid
if (!_warmMigration) {
vm.tags.push('Continuous Replication')
}
Object.values(backup.vdis).forEach(vdi => {
vdi.other_config[TAG_COPY_SRC] = vdi.uuid
vdi.SR = sr.$ref
// vdi.other_config[TAG_BASE_DELTA] is never defined on a suspend vdi
if (vdi.other_config[TAG_BASE_DELTA]) {
const remoteBaseVdiUuid = vdi.other_config[TAG_BASE_DELTA]
const baseVdi = find(baseVdis, vdi => vdi.other_config[TAG_COPY_SRC] === remoteBaseVdiUuid)
if (!baseVdi) {
throw new Error(`missing base VDI (copy of ${remoteBaseVdiUuid})`)
}
vdi.baseVdi = baseVdi
}
})
return backup
}
async _transfer({ timestamp, deltaExport, sizeContainers, vm }) {
const { _warmMigration } = this._settings
const sr = this._sr
@@ -140,7 +93,16 @@ export class IncrementalXapiWriter extends MixinXapiWriter(AbstractIncrementalWr
let targetVmRef
await Task.run({ name: 'transfer' }, async () => {
targetVmRef = await importIncrementalVm(this.#decorateVmMetadata(deltaExport), sr)
targetVmRef = await importIncrementalVm(
{
__proto__: deltaExport,
vm: {
...deltaExport.vm,
tags: _warmMigration ? deltaExport.vm.tags : [...deltaExport.vm.tags, 'Continuous Replication'],
},
},
sr
)
return {
size: Object.values(sizeContainers).reduce((sum, { size }) => sum + size, 0),
}
@@ -161,13 +123,13 @@ export class IncrementalXapiWriter extends MixinXapiWriter(AbstractIncrementalWr
)
),
targetVm.update_other_config({
[TAG_BACKUP_SR]: srUuid,
'xo:backup:sr': srUuid,
// these entries need to be added in case of offline backup
'xo:backup:datetime': formatDateTime(timestamp),
'xo:backup:job': job.id,
'xo:backup:schedule': scheduleId,
[TAG_BASE_DELTA]: vm.uuid,
'xo:backup:vm': vm.uuid,
}),
])
}

View File

@@ -0,0 +1,14 @@
'use strict'
const { AbstractWriter } = require('./_AbstractWriter.js')
exports.AbstractFullWriter = class AbstractFullWriter extends AbstractWriter {
async run({ timestamp, sizeContainer, stream, vm, vmSnapshot }) {
try {
return await this._run({ timestamp, sizeContainer, stream, vm, vmSnapshot })
} finally {
// ensure stream is properly closed
stream.destroy()
}
}
}

View File

@@ -1,12 +0,0 @@
import { AbstractWriter } from './_AbstractWriter.mjs'
export class AbstractFullWriter extends AbstractWriter {
async run({ maxStreamLength, timestamp, sizeContainer, stream, streamLength, vm, vmSnapshot }) {
try {
return await this._run({ maxStreamLength, timestamp, sizeContainer, stream, streamLength, vm, vmSnapshot })
} finally {
// ensure stream is properly closed
stream.destroy()
}
}
}

View File

@@ -1,6 +1,8 @@
import { AbstractWriter } from './_AbstractWriter.mjs'
'use strict'
export class AbstractIncrementalWriter extends AbstractWriter {
const { AbstractWriter } = require('./_AbstractWriter.js')
exports.AbstractIncrementalWriter = class AbstractIncrementalWriter extends AbstractWriter {
checkBaseVdis(baseUuidToSrcVdi, baseVm) {
throw new Error('Not implemented')
}

View File

@@ -1,7 +1,9 @@
import { formatFilenameDate } from '../../_filenameDate.mjs'
import { getVmBackupDir } from '../../_getVmBackupDir.mjs'
'use strict'
export class AbstractWriter {
const { formatFilenameDate } = require('../../_filenameDate')
const { getVmBackupDir } = require('../../_getVmBackupDir')
exports.AbstractWriter = class AbstractWriter {
constructor({ config, healthCheckSr, job, vmUuid, scheduleId, settings }) {
this._config = config
this._healthCheckSr = healthCheckSr

View File

@@ -1,17 +1,19 @@
import { createLogger } from '@xen-orchestra/log'
import { join } from 'node:path'
import assert from 'node:assert'
'use strict'
import { formatFilenameDate } from '../../_filenameDate.mjs'
import { getVmBackupDir } from '../../_getVmBackupDir.mjs'
import { HealthCheckVmBackup } from '../../HealthCheckVmBackup.mjs'
import { ImportVmBackup } from '../../ImportVmBackup.mjs'
import { Task } from '../../Task.mjs'
import * as MergeWorker from '../../merge-worker/index.mjs'
const { createLogger } = require('@xen-orchestra/log')
const { join } = require('path')
const assert = require('assert')
const { formatFilenameDate } = require('../../_filenameDate.js')
const { getVmBackupDir } = require('../../_getVmBackupDir.js')
const { HealthCheckVmBackup } = require('../../HealthCheckVmBackup.js')
const { ImportVmBackup } = require('../../ImportVmBackup.js')
const { Task } = require('../../Task.js')
const MergeWorker = require('../../merge-worker/index.js')
const { info, warn } = createLogger('xo:backups:MixinBackupWriter')
export const MixinRemoteWriter = (BaseClass = Object) =>
exports.MixinRemoteWriter = (BaseClass = Object) =>
class MixinRemoteWriter extends BaseClass {
#lock
@@ -96,9 +98,6 @@ export const MixinRemoteWriter = (BaseClass = Object) =>
metadata,
srUuid,
xapi,
settings: {
additionnalVmTag: 'xo:no-bak=Health Check',
},
}).run()
const restoredVm = xapi.getObject(restoredId)
try {

View File

@@ -1,10 +1,12 @@
import { extractOpaqueRef } from '@xen-orchestra/xapi'
import assert from 'node:assert/strict'
'use strict'
import { HealthCheckVmBackup } from '../../HealthCheckVmBackup.mjs'
import { Task } from '../../Task.mjs'
const { extractOpaqueRef } = require('@xen-orchestra/xapi')
export const MixinXapiWriter = (BaseClass = Object) =>
const { Task } = require('../../Task')
const assert = require('node:assert/strict')
const { HealthCheckVmBackup } = require('../../HealthCheckVmBackup')
exports.MixinXapiWriter = (BaseClass = Object) =>
class MixinXapiWriter extends BaseClass {
constructor({ sr, ...rest }) {
super(rest)
@@ -18,7 +20,7 @@ export const MixinXapiWriter = (BaseClass = Object) =>
const vdiRefs = await xapi.VM_getDisks(baseVm.$ref)
for (const vdiRef of vdiRefs) {
const vdi = xapi.getObject(vdiRef)
if (vdi.$SR.uuid !== this._healthCheckSr.uuid) {
if (vdi.$SR.uuid !== this._heathCheckSr.uuid) {
return false
}
}
@@ -58,7 +60,7 @@ export const MixinXapiWriter = (BaseClass = Object) =>
)
}
const healthCheckVm = xapi.getObject(healthCheckVmRef) ?? (await xapi.waitObject(healthCheckVmRef))
await healthCheckVm.add_tag('xo:no-bak=Health Check')
await new HealthCheckVmBackup({
restoredVm: healthCheckVm,
xapi,

View File

@@ -0,0 +1,8 @@
'use strict'
const openVhd = require('vhd-lib').openVhd
const Disposable = require('promise-toolbox/Disposable')
exports.checkVhd = async function checkVhd(handler, path) {
await Disposable.use(openVhd(handler, path), () => {})
}

View File

@@ -1,6 +0,0 @@
import { openVhd } from 'vhd-lib'
import Disposable from 'promise-toolbox/Disposable'
export async function checkVhd(handler, path) {
await Disposable.use(openVhd(handler, path), () => {})
}

View File

@@ -1,3 +1,5 @@
'use strict'
const getReplicatedVmDatetime = vm => {
const { 'xo:backup:datetime': datetime = vm.name_label.slice(-17, -1) } = vm.other_config
return datetime
@@ -5,7 +7,7 @@ const getReplicatedVmDatetime = vm => {
const compareReplicatedVmDatetime = (a, b) => (getReplicatedVmDatetime(a) < getReplicatedVmDatetime(b) ? -1 : 1)
export function listReplicatedVms(xapi, scheduleOrJobId, srUuid, vmUuid) {
exports.listReplicatedVms = function listReplicatedVms(xapi, scheduleOrJobId, srUuid, vmUuid) {
const { all } = xapi.objects
const vms = {}
for (const key in all) {

View File

@@ -1,5 +1,7 @@
'use strict'
const PARSE_UUID_RE = /-/g
export function packUuid(uuid) {
exports.packUuid = function packUuid(uuid) {
return Buffer.from(uuid.replace(PARSE_UUID_RE, ''), 'hex')
}

View File

@@ -1,4 +1,6 @@
export function watchStreamSize(stream, container = { size: 0 }) {
'use strict'
exports.watchStreamSize = function watchStreamSize(stream, container = { size: 0 }) {
stream.on('data', data => {
container.size += data.length
})

View File

@@ -221,7 +221,7 @@ For multiple objects:
### Settings
Settings are described in [`@xen-orchestra/backups/\_runners/VmsXapi.mjs``](https://github.com/vatesfr/xen-orchestra/blob/master/%40xen-orchestra/backups/_runners/VmsXapi.mjs).
Settings are described in [`@xen-orchestra/backups/Backup.js](https://github.com/vatesfr/xen-orchestra/blob/master/%40xen-orchestra/backups/Backup.js).
## Writer API

View File

@@ -1,4 +1,6 @@
export function extractIdsFromSimplePattern(pattern) {
'use strict'
exports.extractIdsFromSimplePattern = function extractIdsFromSimplePattern(pattern) {
if (pattern === undefined) {
return []
}

View File

@@ -1,9 +1,9 @@
import mapValues from 'lodash/mapValues.js'
import { dirname } from 'node:path'
'use strict'
const mapValues = require('lodash/mapValues.js')
const { dirname } = require('path')
function formatVmBackup(backup) {
const { isVhdDifferencing } = backup
return {
disks:
backup.vhds === undefined
@@ -27,14 +27,10 @@ function formatVmBackup(backup) {
name_description: backup.vm.name_description,
name_label: backup.vm.name_label,
},
// isVhdDifferencing is either undefined or an object
differencingVhds: isVhdDifferencing && Object.values(isVhdDifferencing).filter(t => t).length,
dynamicVhds: isVhdDifferencing && Object.values(isVhdDifferencing).filter(t => !t).length,
}
}
// format all backups as returned by RemoteAdapter#listAllVmBackups()
export function formatVmBackups(backupsByVM) {
exports.formatVmBackups = function formatVmBackups(backupsByVM) {
return mapValues(backupsByVM, backups => backups.map(formatVmBackup))
}

View File

@@ -0,0 +1,92 @@
#!/usr/bin/env node
// eslint-disable-next-line eslint-comments/disable-enable-pair
/* eslint-disable n/shebang */
'use strict'
const { catchGlobalErrors } = require('@xen-orchestra/log/configure')
const { createLogger } = require('@xen-orchestra/log')
const { getSyncedHandler } = require('@xen-orchestra/fs')
const { join } = require('path')
const Disposable = require('promise-toolbox/Disposable')
const min = require('lodash/min')
const { getVmBackupDir } = require('../_getVmBackupDir.js')
const { RemoteAdapter } = require('../RemoteAdapter.js')
const { CLEAN_VM_QUEUE } = require('./index.js')
// -------------------------------------------------------------------
catchGlobalErrors(createLogger('xo:backups:mergeWorker'))
const { fatal, info, warn } = createLogger('xo:backups:mergeWorker')
// -------------------------------------------------------------------
const main = Disposable.wrap(async function* main(args) {
const handler = yield getSyncedHandler({ url: 'file://' + process.cwd() })
yield handler.lock(CLEAN_VM_QUEUE)
const adapter = new RemoteAdapter(handler)
const listRetry = async () => {
const timeoutResolver = resolve => setTimeout(resolve, 10e3)
for (let i = 0; i < 10; ++i) {
const entries = await handler.list(CLEAN_VM_QUEUE)
if (entries.length !== 0) {
return entries
}
await new Promise(timeoutResolver)
}
}
let taskFiles
while ((taskFiles = await listRetry()) !== undefined) {
const taskFileBasename = min(taskFiles)
const previousTaskFile = join(CLEAN_VM_QUEUE, taskFileBasename)
const taskFile = join(CLEAN_VM_QUEUE, '_' + taskFileBasename)
// move this task to the end
try {
await handler.rename(previousTaskFile, taskFile)
} catch (error) {
// this error occurs if the task failed too many times (i.e. too many `_` prefixes)
// there is nothing more that can be done
if (error.code === 'ENAMETOOLONG') {
await handler.unlink(previousTaskFile)
}
throw error
}
try {
const vmDir = getVmBackupDir(String(await handler.readFile(taskFile)))
try {
await adapter.cleanVm(vmDir, { merge: true, logInfo: info, logWarn: warn, remove: true })
} catch (error) {
// consider the clean successful if the VM dir is missing
if (error.code !== 'ENOENT') {
throw error
}
}
handler.unlink(taskFile).catch(error => warn('deleting task failure', { error }))
} catch (error) {
warn('failure handling task', { error })
}
}
})
info('starting')
main(process.argv.slice(2)).then(
() => {
info('bye :-)')
},
error => {
fatal(error)
process.exit(1)
}
)

View File

@@ -1,103 +0,0 @@
#!/usr/bin/env node
// eslint-disable-next-line eslint-comments/disable-enable-pair
/* eslint-disable n/shebang */
import { asyncEach } from '@vates/async-each'
import { catchGlobalErrors } from '@xen-orchestra/log/configure'
import { createLogger } from '@xen-orchestra/log'
import { getSyncedHandler } from '@xen-orchestra/fs'
import { join } from 'node:path'
import { load as loadConfig } from 'app-conf'
import Disposable from 'promise-toolbox/Disposable'
import { getVmBackupDir } from '../_getVmBackupDir.mjs'
import { RemoteAdapter } from '../RemoteAdapter.mjs'
import { CLEAN_VM_QUEUE } from './index.mjs'
const APP_NAME = 'xo-merge-worker'
const APP_DIR = new URL('.', import.meta.url).pathname
// -------------------------------------------------------------------
catchGlobalErrors(createLogger('xo:backups:mergeWorker'))
const { fatal, info, warn } = createLogger('xo:backups:mergeWorker')
// -------------------------------------------------------------------
const main = Disposable.wrap(async function* main(args) {
const handler = yield getSyncedHandler({ url: 'file://' + process.cwd() })
yield handler.lock(CLEAN_VM_QUEUE)
const adapter = new RemoteAdapter(handler)
const listRetry = async () => {
const timeoutResolver = resolve => setTimeout(resolve, 10e3)
for (let i = 0; i < 10; ++i) {
const entries = await handler.list(CLEAN_VM_QUEUE)
if (entries.length !== 0) {
entries.sort()
return entries
}
await new Promise(timeoutResolver)
}
}
let taskFiles
while ((taskFiles = await listRetry()) !== undefined) {
const { concurrency } = await loadConfig(APP_NAME, {
appDir: APP_DIR,
ignoreUnknownFormats: true,
})
await asyncEach(
taskFiles,
async taskFileBasename => {
const previousTaskFile = join(CLEAN_VM_QUEUE, taskFileBasename)
const taskFile = join(CLEAN_VM_QUEUE, '_' + taskFileBasename)
// move this task to the end
try {
await handler.rename(previousTaskFile, taskFile)
} catch (error) {
// this error occurs if the task failed too many times (i.e. too many `_` prefixes)
// there is nothing more that can be done
if (error.code === 'ENAMETOOLONG') {
await handler.unlink(previousTaskFile)
}
throw error
}
try {
const vmDir = getVmBackupDir(String(await handler.readFile(taskFile)))
try {
await adapter.cleanVm(vmDir, { merge: true, logInfo: info, logWarn: warn, remove: true })
} catch (error) {
// consider the clean successful if the VM dir is missing
if (error.code !== 'ENOENT') {
throw error
}
}
handler.unlink(taskFile).catch(error => warn('deleting task failure', { error }))
} catch (error) {
warn('failure handling task', { error })
}
},
{ concurrency }
)
}
})
info('starting')
main(process.argv.slice(2)).then(
() => {
info('bye :-)')
},
error => {
fatal(error)
process.exit(1)
}
)

View File

@@ -1 +0,0 @@
concurrency = 1

View File

@@ -1,12 +1,13 @@
import { join } from 'node:path'
import { spawn } from 'child_process'
import { check } from 'proper-lockfile'
'use strict'
export const CLEAN_VM_QUEUE = '/xo-vm-backups/.queue/clean-vm/'
const { join, resolve } = require('path')
const { spawn } = require('child_process')
const { check } = require('proper-lockfile')
const CLI_PATH = new URL('cli.mjs', import.meta.url).pathname
const CLEAN_VM_QUEUE = (exports.CLEAN_VM_QUEUE = '/xo-vm-backups/.queue/clean-vm/')
export const run = async function runMergeWorker(remotePath) {
const CLI_PATH = resolve(__dirname, 'cli.js')
exports.run = async function runMergeWorker(remotePath) {
try {
// TODO: find a way to pass the acquire the lock and then pass it down the worker
if (await check(join(remotePath, CLEAN_VM_QUEUE))) {

View File

@@ -8,33 +8,32 @@
"type": "git",
"url": "https://github.com/vatesfr/xen-orchestra.git"
},
"version": "0.44.3",
"version": "0.39.0",
"engines": {
"node": ">=14.18"
},
"scripts": {
"postversion": "npm publish --access public",
"test-integration": "node--test *.integ.mjs"
"test-integration": "node--test *.integ.js"
},
"dependencies": {
"@iarna/toml": "^2.2.5",
"@kldzj/stream-throttle": "^1.1.1",
"@vates/async-each": "^1.0.0",
"@vates/cached-dns.lookup": "^1.0.0",
"@vates/compose": "^2.1.0",
"@vates/decorate-with": "^2.0.0",
"@vates/disposable": "^0.1.5",
"@vates/fuse-vhd": "^2.0.0",
"@vates/nbd-client": "^3.0.0",
"@vates/disposable": "^0.1.4",
"@vates/fuse-vhd": "^1.0.0",
"@vates/nbd-client": "^1.2.1",
"@vates/parse-duration": "^0.1.1",
"@xen-orchestra/async-map": "^0.1.2",
"@xen-orchestra/fs": "^4.1.3",
"@xen-orchestra/fs": "^4.0.1",
"@xen-orchestra/log": "^0.6.0",
"@xen-orchestra/template": "^0.1.0",
"app-conf": "^2.3.0",
"compare-versions": "^6.0.0",
"d3-time-format": "^4.1.0",
"compare-versions": "^5.0.1",
"d3-time-format": "^3.0.0",
"decorator-synchronized": "^0.6.0",
"fs-extra": "^11.1.0",
"golike-defer": "^0.5.1",
"limit-concurrency-decorator": "^0.5.0",
"lodash": "^4.17.20",
@@ -42,21 +41,19 @@
"parse-pairs": "^2.0.0",
"promise-toolbox": "^0.21.0",
"proper-lockfile": "^4.1.2",
"tar": "^6.1.15",
"uuid": "^9.0.0",
"vhd-lib": "^4.8.0",
"xen-api": "^2.0.0",
"vhd-lib": "^4.5.0",
"xen-api": "^1.3.3",
"yazl": "^2.5.1"
},
"devDependencies": {
"fs-extra": "^11.1.0",
"rimraf": "^5.0.1",
"sinon": "^17.0.1",
"sinon": "^15.0.1",
"test": "^3.2.1",
"tmp": "^0.2.1"
},
"peerDependencies": {
"@xen-orchestra/xapi": "^4.1.0"
"@xen-orchestra/xapi": "^2.2.1"
},
"license": "AGPL-3.0-or-later",
"author": {

Some files were not shown because too many files have changed in this diff Show More