Compare commits
1 Commits
pool-autop
...
jft-wip
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7174499228 |
@@ -18,6 +18,17 @@ const wrapCall = (fn, arg, thisArg) => {
|
||||
* @returns {Promise<Item[]>}
|
||||
*/
|
||||
exports.asyncMap = function asyncMap(iterable, mapFn, thisArg = iterable) {
|
||||
let onError
|
||||
if (onError !== undefined) {
|
||||
const original = mapFn
|
||||
mapFn = async function () {
|
||||
try {
|
||||
return await original.apply(this, arguments)
|
||||
} catch (error) {
|
||||
return onError.call(this, error, ...arguments)
|
||||
}
|
||||
}
|
||||
}
|
||||
return Promise.all(Array.from(iterable, mapFn, thisArg))
|
||||
}
|
||||
|
||||
|
||||
@@ -543,6 +543,40 @@ class RemoteAdapter {
|
||||
async readVmBackupMetadata(path) {
|
||||
return Object.defineProperty(JSON.parse(await this._handler.readFile(path)), '_filename', { value: path })
|
||||
}
|
||||
|
||||
async writeFullVmBackup({ jobId, mode, scheduleId, timestamp, vm, vmSnapshot, xva }, sizeContainer, stream) {
|
||||
const basename = formatFilenameDate(timestamp)
|
||||
|
||||
const dataBasename = basename + '.xva'
|
||||
const dataFilename = backupDir + '/' + dataBasename
|
||||
|
||||
const metadataFilename = `${backupDir}/${basename}.json`
|
||||
const metadata = {
|
||||
jobId: job.id,
|
||||
mode: job.mode,
|
||||
scheduleId,
|
||||
timestamp,
|
||||
version: '2.0.0',
|
||||
vm,
|
||||
vmSnapshot: this._backup.exportedVm,
|
||||
xva: './' + dataBasename,
|
||||
}
|
||||
|
||||
const { deleteFirst } = settings
|
||||
if (deleteFirst) {
|
||||
await deleteOldBackups()
|
||||
}
|
||||
|
||||
await adapter.outputStream(stream, dataFilename, {
|
||||
validator: tmpPath => {
|
||||
if (handler._getFilePath !== undefined) {
|
||||
return isValidXva(handler._getFilePath('/' + tmpPath))
|
||||
}
|
||||
},
|
||||
})
|
||||
metadata.size = sizeContainer.size
|
||||
await handler.outputFile(metadataFilename, JSON.stringify(metadata))
|
||||
}
|
||||
}
|
||||
|
||||
Object.assign(RemoteAdapter.prototype, {
|
||||
|
||||
@@ -1,8 +1,11 @@
|
||||
const CancelToken = require('promise-toolbox/CancelToken.js')
|
||||
const Zone = require('node-zone')
|
||||
|
||||
const logAfterEnd = () => {
|
||||
throw new Error('task has already ended')
|
||||
const logAfterEnd = function (log) {
|
||||
const error = new Error('task has already ended:' + this.id)
|
||||
error.result = log.result
|
||||
error.log = log
|
||||
throw error
|
||||
}
|
||||
|
||||
const noop = Function.prototype
|
||||
@@ -44,11 +47,19 @@ class Task {
|
||||
}
|
||||
}
|
||||
|
||||
get id() {
|
||||
return this.#id
|
||||
}
|
||||
|
||||
#cancelToken
|
||||
#id = Math.random().toString(36).slice(2)
|
||||
#onLog
|
||||
#zone
|
||||
|
||||
get id() {
|
||||
return this.#id
|
||||
}
|
||||
|
||||
constructor({ name, data, onLog }) {
|
||||
let parentCancelToken, parentId
|
||||
if (onLog === undefined) {
|
||||
@@ -100,6 +111,8 @@ class Task {
|
||||
run(fn, last = false) {
|
||||
return this.#zone.run(() => {
|
||||
try {
|
||||
this.#cancelToken.throwIfRequested()
|
||||
|
||||
const result = fn()
|
||||
let then
|
||||
if (result != null && typeof (then = result.then) === 'function') {
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
const assert = require('assert')
|
||||
// const asyncFn = require('promise-toolbox/asyncFn')
|
||||
const findLast = require('lodash/findLast.js')
|
||||
const ignoreErrors = require('promise-toolbox/ignoreErrors.js')
|
||||
const keyBy = require('lodash/keyBy.js')
|
||||
@@ -143,6 +144,7 @@ exports.VmBackup = class VmBackup {
|
||||
|
||||
const doSnapshot =
|
||||
this._isDelta || (!settings.offlineBackup && vm.power_state === 'Running') || settings.snapshotRetention !== 0
|
||||
console.log({ doSnapshot })
|
||||
if (doSnapshot) {
|
||||
await Task.run({ name: 'snapshot' }, async () => {
|
||||
if (!settings.bypassVdiChainsCheck) {
|
||||
@@ -181,6 +183,7 @@ exports.VmBackup = class VmBackup {
|
||||
await this._callWriters(writer => writer.prepare({ isFull }), 'writer.prepare()')
|
||||
|
||||
const deltaExport = await exportDeltaVm(exportedVm, baseVm, {
|
||||
cancelToken: Task.cancelToken,
|
||||
fullVdisRequired,
|
||||
})
|
||||
const sizeContainers = mapValues(deltaExport.streams, stream => watchStreamSize(stream))
|
||||
@@ -226,6 +229,7 @@ exports.VmBackup = class VmBackup {
|
||||
async _copyFull() {
|
||||
const { compression } = this.job
|
||||
const stream = await this._xapi.VM_export(this.exportedVm.$ref, {
|
||||
cancelToken: Task.cancelToken,
|
||||
compress: Boolean(compression) && (compression === 'native' ? 'gzip' : 'zstd'),
|
||||
useSnapshot: false,
|
||||
})
|
||||
@@ -330,10 +334,22 @@ exports.VmBackup = class VmBackup {
|
||||
|
||||
this._baseVm = baseVm
|
||||
this._fullVdisRequired = fullVdisRequired
|
||||
|
||||
Task.info('base data', {
|
||||
vm: baseVm.uuid,
|
||||
fullVdisRequired: Array.from(fullVdisRequired),
|
||||
})
|
||||
}
|
||||
|
||||
run = defer(this.run)
|
||||
async run($defer) {
|
||||
this.exportedVm = this.vm
|
||||
this.timestamp = Date.now()
|
||||
|
||||
const doSnapshot = this._isDelta || vm.power_state === 'Running' || settings.snapshotRetention !== 0
|
||||
if (!this._isDelta) {
|
||||
}
|
||||
|
||||
const settings = this._settings
|
||||
assert(
|
||||
!settings.offlineBackup || settings.snapshotRetention === 0,
|
||||
@@ -380,3 +396,6 @@ exports.VmBackup = class VmBackup {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// const { prototype } = exports.VmBackup
|
||||
// prototype.run = asyncFn.cancelable(prototype.run)
|
||||
|
||||
@@ -140,6 +140,16 @@ ${pkg.name} v${pkg.version}`
|
||||
}
|
||||
}
|
||||
|
||||
const $import = ({ $import: path }) => {
|
||||
const data = fs.readFileSync(path, 'utf8')
|
||||
const ext = extname(path).slice(1).toLowerCase()
|
||||
const parse = FORMATS[ext]
|
||||
if (parse === undefined) {
|
||||
throw new Error(`unsupported file: ${path}`)
|
||||
}
|
||||
return visit(parse(data))
|
||||
}
|
||||
|
||||
const seq = async seq => {
|
||||
const j = callPath.length
|
||||
for (let i = 0, n = seq.length; i < n; ++i) {
|
||||
@@ -153,17 +163,13 @@ ${pkg.name} v${pkg.version}`
|
||||
if (Array.isArray(node)) {
|
||||
return seq(node)
|
||||
}
|
||||
return call(node)
|
||||
const keys = Object.keys(node)
|
||||
return keys.length === 1 && keys[0] === '$import' ? $import(node) : call(node)
|
||||
}
|
||||
|
||||
let node
|
||||
if (file !== '') {
|
||||
const data = fs.readFileSync(file, 'utf8')
|
||||
const ext = extname(file).slice(1).toLowerCase()
|
||||
const parse = FORMATS[ext]
|
||||
if (parse === undefined) {
|
||||
throw new Error(`unsupported file: ${file}`)
|
||||
}
|
||||
await visit(parse(data))
|
||||
node = { $import: file }
|
||||
} else {
|
||||
const method = args[0]
|
||||
const params = {}
|
||||
@@ -176,8 +182,9 @@ ${pkg.name} v${pkg.version}`
|
||||
params[param.slice(0, j)] = parseValue(param.slice(j + 1))
|
||||
}
|
||||
|
||||
await call({ method, params })
|
||||
node = { method, params }
|
||||
}
|
||||
await visit(node)
|
||||
}
|
||||
main(process.argv.slice(2)).then(
|
||||
() => {
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import Cancel from 'promise-toolbox/Cancel'
|
||||
import CancelToken from 'promise-toolbox/CancelToken'
|
||||
import Disposable from 'promise-toolbox/Disposable.js'
|
||||
import fromCallback from 'promise-toolbox/fromCallback.js'
|
||||
import { asyncMap } from '@xen-orchestra/async-map'
|
||||
@@ -95,7 +97,8 @@ export default class Backups {
|
||||
error.jobId = jobId
|
||||
throw error
|
||||
}
|
||||
runningJobs[jobId] = true
|
||||
const source = CancelToken.source()
|
||||
runningJobs[jobId] = source.cancel
|
||||
try {
|
||||
return await run.apply(this, arguments)
|
||||
} finally {
|
||||
|
||||
38
@xen-orchestra/proxy/src/app/mixins/task.mjs
Normal file
38
@xen-orchestra/proxy/src/app/mixins/task.mjs
Normal file
@@ -0,0 +1,38 @@
|
||||
import { asyncMapSettled } from '@xen-orchestra/async-map'
|
||||
|
||||
export default class Task {
|
||||
#tasks = new Map()
|
||||
|
||||
constructor(app) {
|
||||
const tasks = new Map()
|
||||
this.#tasks = tasks
|
||||
|
||||
app.api.addMethods({
|
||||
task: {
|
||||
*list() {
|
||||
for (const id of tasks.keys()) {
|
||||
yield { id }
|
||||
}
|
||||
},
|
||||
cancel: [
|
||||
({ taskId }) => this.cancel(taskId),
|
||||
{
|
||||
params: {
|
||||
taskId: { type: 'string' },
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
})
|
||||
|
||||
app.hooks.on('stop', () => asyncMapSettled(tasks.values(), task => task.cancel()))
|
||||
}
|
||||
|
||||
async cancel(taskId) {
|
||||
await this.tasks.get(taskId).cancel()
|
||||
}
|
||||
|
||||
register(task) {
|
||||
this.#tasks.set(task.id, task)
|
||||
}
|
||||
}
|
||||
@@ -22,7 +22,13 @@
|
||||
> Packages will be released in the order they are here, therefore, they should
|
||||
> be listed by inverse order of dependency.
|
||||
>
|
||||
> Rule of thumb: add packages on top.
|
||||
> Global order:
|
||||
>
|
||||
> - @vates/...
|
||||
> - @xen-orchestra/...
|
||||
> - xo-server-...
|
||||
> - xo-server
|
||||
> - xo-web
|
||||
>
|
||||
> The format is the following: - `$packageName` `$version`
|
||||
>
|
||||
|
||||
@@ -291,6 +291,16 @@ export class Xapi extends EventEmitter {
|
||||
return this._roCall(`${type}.get_${field}`, [ref])
|
||||
}
|
||||
|
||||
async getFields(type, ref, fields) {
|
||||
const values = {}
|
||||
await Promise.all(
|
||||
fields.map(async field => {
|
||||
values[field] = await this._sessionCall(`${type}.get_${field}`, [ref])
|
||||
})
|
||||
)
|
||||
return this._wrapRecord(type, ref, values)
|
||||
}
|
||||
|
||||
setField(type, ref, field, value) {
|
||||
return this.call(`${type}.set_${field}`, ref, value).then(noop)
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
.*
|
||||
gitignore-
|
||||
|
||||
/benchmark/
|
||||
/benchmarks/
|
||||
|
||||
Reference in New Issue
Block a user