Compare commits

...

1 Commits

Author SHA1 Message Date
Julien Fontanet
7174499228 WiP 2021-05-25 16:25:39 +02:00
10 changed files with 155 additions and 13 deletions

View File

@@ -18,6 +18,17 @@ const wrapCall = (fn, arg, thisArg) => {
* @returns {Promise<Item[]>}
*/
exports.asyncMap = function asyncMap(iterable, mapFn, thisArg = iterable) {
let onError
if (onError !== undefined) {
const original = mapFn
mapFn = async function () {
try {
return await original.apply(this, arguments)
} catch (error) {
return onError.call(this, error, ...arguments)
}
}
}
return Promise.all(Array.from(iterable, mapFn, thisArg))
}

View File

@@ -543,6 +543,40 @@ class RemoteAdapter {
async readVmBackupMetadata(path) {
return Object.defineProperty(JSON.parse(await this._handler.readFile(path)), '_filename', { value: path })
}
async writeFullVmBackup({ jobId, mode, scheduleId, timestamp, vm, vmSnapshot, xva }, sizeContainer, stream) {
const basename = formatFilenameDate(timestamp)
const dataBasename = basename + '.xva'
const dataFilename = backupDir + '/' + dataBasename
const metadataFilename = `${backupDir}/${basename}.json`
const metadata = {
jobId: job.id,
mode: job.mode,
scheduleId,
timestamp,
version: '2.0.0',
vm,
vmSnapshot: this._backup.exportedVm,
xva: './' + dataBasename,
}
const { deleteFirst } = settings
if (deleteFirst) {
await deleteOldBackups()
}
await adapter.outputStream(stream, dataFilename, {
validator: tmpPath => {
if (handler._getFilePath !== undefined) {
return isValidXva(handler._getFilePath('/' + tmpPath))
}
},
})
metadata.size = sizeContainer.size
await handler.outputFile(metadataFilename, JSON.stringify(metadata))
}
}
Object.assign(RemoteAdapter.prototype, {

View File

@@ -1,8 +1,11 @@
const CancelToken = require('promise-toolbox/CancelToken.js')
const Zone = require('node-zone')
const logAfterEnd = () => {
throw new Error('task has already ended')
const logAfterEnd = function (log) {
const error = new Error('task has already ended:' + this.id)
error.result = log.result
error.log = log
throw error
}
const noop = Function.prototype
@@ -44,11 +47,19 @@ class Task {
}
}
get id() {
return this.#id
}
#cancelToken
#id = Math.random().toString(36).slice(2)
#onLog
#zone
get id() {
return this.#id
}
constructor({ name, data, onLog }) {
let parentCancelToken, parentId
if (onLog === undefined) {
@@ -100,6 +111,8 @@ class Task {
run(fn, last = false) {
return this.#zone.run(() => {
try {
this.#cancelToken.throwIfRequested()
const result = fn()
let then
if (result != null && typeof (then = result.then) === 'function') {

View File

@@ -1,4 +1,5 @@
const assert = require('assert')
// const asyncFn = require('promise-toolbox/asyncFn')
const findLast = require('lodash/findLast.js')
const ignoreErrors = require('promise-toolbox/ignoreErrors.js')
const keyBy = require('lodash/keyBy.js')
@@ -143,6 +144,7 @@ exports.VmBackup = class VmBackup {
const doSnapshot =
this._isDelta || (!settings.offlineBackup && vm.power_state === 'Running') || settings.snapshotRetention !== 0
console.log({ doSnapshot })
if (doSnapshot) {
await Task.run({ name: 'snapshot' }, async () => {
if (!settings.bypassVdiChainsCheck) {
@@ -181,6 +183,7 @@ exports.VmBackup = class VmBackup {
await this._callWriters(writer => writer.prepare({ isFull }), 'writer.prepare()')
const deltaExport = await exportDeltaVm(exportedVm, baseVm, {
cancelToken: Task.cancelToken,
fullVdisRequired,
})
const sizeContainers = mapValues(deltaExport.streams, stream => watchStreamSize(stream))
@@ -226,6 +229,7 @@ exports.VmBackup = class VmBackup {
async _copyFull() {
const { compression } = this.job
const stream = await this._xapi.VM_export(this.exportedVm.$ref, {
cancelToken: Task.cancelToken,
compress: Boolean(compression) && (compression === 'native' ? 'gzip' : 'zstd'),
useSnapshot: false,
})
@@ -330,10 +334,22 @@ exports.VmBackup = class VmBackup {
this._baseVm = baseVm
this._fullVdisRequired = fullVdisRequired
Task.info('base data', {
vm: baseVm.uuid,
fullVdisRequired: Array.from(fullVdisRequired),
})
}
run = defer(this.run)
async run($defer) {
this.exportedVm = this.vm
this.timestamp = Date.now()
const doSnapshot = this._isDelta || vm.power_state === 'Running' || settings.snapshotRetention !== 0
if (!this._isDelta) {
}
const settings = this._settings
assert(
!settings.offlineBackup || settings.snapshotRetention === 0,
@@ -380,3 +396,6 @@ exports.VmBackup = class VmBackup {
}
}
}
// const { prototype } = exports.VmBackup
// prototype.run = asyncFn.cancelable(prototype.run)

View File

@@ -140,6 +140,16 @@ ${pkg.name} v${pkg.version}`
}
}
const $import = ({ $import: path }) => {
const data = fs.readFileSync(path, 'utf8')
const ext = extname(path).slice(1).toLowerCase()
const parse = FORMATS[ext]
if (parse === undefined) {
throw new Error(`unsupported file: ${path}`)
}
return visit(parse(data))
}
const seq = async seq => {
const j = callPath.length
for (let i = 0, n = seq.length; i < n; ++i) {
@@ -153,17 +163,13 @@ ${pkg.name} v${pkg.version}`
if (Array.isArray(node)) {
return seq(node)
}
return call(node)
const keys = Object.keys(node)
return keys.length === 1 && keys[0] === '$import' ? $import(node) : call(node)
}
let node
if (file !== '') {
const data = fs.readFileSync(file, 'utf8')
const ext = extname(file).slice(1).toLowerCase()
const parse = FORMATS[ext]
if (parse === undefined) {
throw new Error(`unsupported file: ${file}`)
}
await visit(parse(data))
node = { $import: file }
} else {
const method = args[0]
const params = {}
@@ -176,8 +182,9 @@ ${pkg.name} v${pkg.version}`
params[param.slice(0, j)] = parseValue(param.slice(j + 1))
}
await call({ method, params })
node = { method, params }
}
await visit(node)
}
main(process.argv.slice(2)).then(
() => {

View File

@@ -1,3 +1,5 @@
import Cancel from 'promise-toolbox/Cancel'
import CancelToken from 'promise-toolbox/CancelToken'
import Disposable from 'promise-toolbox/Disposable.js'
import fromCallback from 'promise-toolbox/fromCallback.js'
import { asyncMap } from '@xen-orchestra/async-map'
@@ -95,7 +97,8 @@ export default class Backups {
error.jobId = jobId
throw error
}
runningJobs[jobId] = true
const source = CancelToken.source()
runningJobs[jobId] = source.cancel
try {
return await run.apply(this, arguments)
} finally {

View File

@@ -0,0 +1,38 @@
import { asyncMapSettled } from '@xen-orchestra/async-map'
export default class Task {
#tasks = new Map()
constructor(app) {
const tasks = new Map()
this.#tasks = tasks
app.api.addMethods({
task: {
*list() {
for (const id of tasks.keys()) {
yield { id }
}
},
cancel: [
({ taskId }) => this.cancel(taskId),
{
params: {
taskId: { type: 'string' },
},
},
],
},
})
app.hooks.on('stop', () => asyncMapSettled(tasks.values(), task => task.cancel()))
}
async cancel(taskId) {
await this.tasks.get(taskId).cancel()
}
register(task) {
this.#tasks.set(task.id, task)
}
}

View File

@@ -22,7 +22,13 @@
> Packages will be released in the order they are here, therefore, they should
> be listed by inverse order of dependency.
>
> Rule of thumb: add packages on top.
> Global order:
>
> - @vates/...
> - @xen-orchestra/...
> - xo-server-...
> - xo-server
> - xo-web
>
> The format is the following: - `$packageName` `$version`
>

View File

@@ -291,6 +291,16 @@ export class Xapi extends EventEmitter {
return this._roCall(`${type}.get_${field}`, [ref])
}
async getFields(type, ref, fields) {
const values = {}
await Promise.all(
fields.map(async field => {
values[field] = await this._sessionCall(`${type}.get_${field}`, [ref])
})
)
return this._wrapRecord(type, ref, values)
}
setField(type, ref, field, value) {
return this.call(`${type}.set_${field}`, ref, value).then(noop)
}

View File

@@ -1,4 +1,5 @@
.*
gitignore-
/benchmark/
/benchmarks/