feat(xo-server#importVmBackupNg): use @xen-orchestra/backups lib (#5630)

This commit is contained in:
badrAZ 2021-03-01 13:36:23 +01:00 committed by GitHub
parent d166073b16
commit d9ce1b3a97
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 86 additions and 164 deletions

View File

@ -27,3 +27,5 @@
> - major: if the change breaks compatibility
>
> In case of conflict, the highest (lowest in previous list) `$version` wins.
- xo-server minor

View File

@ -138,6 +138,11 @@ timeout = 600e3
ignoreVmSnapshotResources = false
[xapiOptions]
# VDIs with `[NOBAK]` flag can be ignored while snapshotting an halted VM.
#
# This is disabled by default for the time being but will be turned on after enough testing.
ignoreNobakVdis = false
# The duration XO will wait for a host to be live before assuming it failed to
# restart
restartHostTimeout = '20 minutes'

View File

@ -48,6 +48,7 @@
"@xen-orchestra/mixin": "^0.0.0",
"@xen-orchestra/self-signed": "^0.1.0",
"@xen-orchestra/template": "^0.1.0",
"@xen-orchestra/xapi": "^0.4.1",
"ajv": "^6.1.1",
"app-conf": "^0.9.0",
"archiver": "^5.0.0",

View File

@ -0,0 +1,24 @@
// it records logs generated by `@xen-orchestra/backups/Task#run`
export const handleBackupLog = (log, { logger, localTaskIds, handleRootTaskId }) => {
const { event, message, taskId } = log
const common = {
data: log.data,
event: 'task.' + event,
result: log.result,
status: log.status,
}
if (event === 'start') {
const { parentId } = log
if (parentId === undefined) {
handleRootTaskId((localTaskIds[taskId] = logger.notice(message, common)))
} else {
common.parentId = localTaskIds[parentId]
localTaskIds[taskId] = logger.notice(message, common)
}
} else {
common.taskId = localTaskIds[taskId]
logger.notice(message, common)
}
}

View File

@ -14,7 +14,8 @@ import { cancelable, defer, fromEvents, ignoreErrors, pCatch, pRetry } from 'pro
import { parseDuration } from '@vates/parse-duration'
import { PassThrough } from 'stream'
import { forbiddenOperation } from 'xo-common/api-errors'
import { Xapi as XapiBase, NULL_REF } from 'xen-api'
import { NULL_REF } from 'xen-api'
import { Xapi as XapiBase } from '@xen-orchestra/xapi'
import {
every,
filter,

View File

@ -13,9 +13,11 @@ import { AssertionError } from 'assert'
import { basename, dirname } from 'path'
import { decorateWith } from '@vates/decorate-with'
import { formatVmBackups } from '@xen-orchestra/backups/formatVmBackups'
import { ImportVmBackup } from '@xen-orchestra/backups/ImportVmBackup'
import { invalidParameters } from 'xo-common/api-errors'
import { isValidXva } from '@xen-orchestra/backups/isValidXva'
import { parseDuration } from '@vates/parse-duration'
import { Task } from '@xen-orchestra/backups/Task'
import {
countBy,
findLast,
@ -32,7 +34,7 @@ import {
values,
} from 'lodash'
import { CancelToken, ignoreErrors, timeout, using } from 'promise-toolbox'
import Vhd, { chainVhd, checkVhdChain, createSyntheticStream as createVhdReadStream } from 'vhd-lib'
import Vhd, { chainVhd, checkVhdChain } from 'vhd-lib'
import type Logger from '../logs/loggers/abstract'
import { type CallJob, type Executor, type Job } from '../jobs'
@ -40,8 +42,9 @@ import { type Schedule } from '../scheduling'
import createSizeStream from '../../size-stream'
import { debounceWithKey, REMOVE_CACHE_ENTRY } from '../../_pDebounceWithKey'
import { handleBackupLog } from '../../_handleBackupLog'
import { waitAll } from '../../_waitAll'
import { type DeltaVmExport, type DeltaVmImport, type Vdi, type Vm, type Xapi, TAG_COPY_SRC } from '../../xapi'
import { type DeltaVmExport, type Vdi, type Vm, type Xapi, TAG_COPY_SRC } from '../../xapi'
import { formatDateTime, getVmDisks } from '../../xapi/utils'
import {
resolveRelativeFromFile,
@ -197,86 +200,6 @@ const listReplicatedVms = (xapi: Xapi, scheduleOrJobId: string, srUuid?: string,
return values(vms).sort(compareReplicatedVmDatetime)
}
const importers: $Dict<
(
handler: RemoteHandler,
metadataFilename: string,
metadata: Metadata,
xapi: Xapi,
sr: { $id: string },
taskId: string,
logger: Logger
) => Promise<string>,
Mode
> = {
async delta(handler, metadataFilename, metadata, xapi, sr, taskId, logger) {
metadata = ((metadata: any): MetadataDelta)
const { vdis, vhds, vm } = metadata
const streams = {}
await asyncMap(vdis, async (vdi, id) => {
streams[`${id}.vhd`] = await createVhdReadStream(handler, resolveRelativeFromFile(metadataFilename, vhds[id]))
})
const delta: DeltaVmImport = {
streams,
vbds: metadata.vbds,
vdis,
version: '1.0.0',
vifs: metadata.vifs,
vm: {
...vm,
name_label: `${vm.name_label} (${safeDateFormat(metadata.timestamp)})`,
tags: [...vm.tags, 'restored from backup'],
},
}
const { vm: newVm } = await wrapTask(
{
logger,
message: 'transfer',
parentId: taskId,
result: ({ transferSize, vm: { $id: id } }) => ({
size: transferSize,
id,
}),
},
xapi.importDeltaVm(delta, {
detectBase: false,
disableStartAfterImport: false,
srId: sr,
// TODO: support mapVdisSrs
})
)
return newVm.$id
},
async full(handler, metadataFilename, metadata, xapi, sr, taskId, logger) {
metadata = ((metadata: any): MetadataFull)
const xva = await handler.createReadStream(resolveRelativeFromFile(metadataFilename, metadata.xva), {
checksum: true,
ignoreMissingChecksum: true, // provide an easy way to opt-out
})
const vm = await wrapTask(
{
logger,
message: 'transfer',
parentId: taskId,
result: ({ $id: id }) => ({ size: xva.length, id }),
},
xapi.importVm(xva, { srId: sr.$id })
)
await Promise.all([
vm.add_tags('restored from backup'),
xapi.editVm(vm.$id, {
name_label: `${metadata.vm.name_label} (${safeDateFormat(metadata.timestamp)})`,
}),
])
return vm.$id
},
}
const PARSE_UUID_RE = /-/g
const parseUuid = (uuid: string) => Buffer.from(uuid.replace(PARSE_UUID_RE, ''), 'hex')
@ -810,12 +733,12 @@ export default class BackupNg {
const sr = xapi.getObject(srId)
const { metadataFilename, remoteId } = parseVmBackupId(id)
const { proxy, url, options } = await app.getRemoteWithCredentials(remoteId)
const remote = await app.getRemoteWithCredentials(remoteId)
let rootTaskId
const logger = this._logger
try {
if (proxy !== undefined) {
if (remote.proxy !== undefined) {
const { allowUnauthorized, host, password, username } = await app.getXenServer(
app.getXenServerIdByObject(sr.$id)
)
@ -823,8 +746,8 @@ export default class BackupNg {
const params = {
backupId: metadataFilename,
remote: {
url,
options,
url: remote.url,
options: remote.options,
},
srUuid: sr.uuid,
streamLogs: true,
@ -839,71 +762,60 @@ export default class BackupNg {
}
try {
const logsStream = await app.callProxyMethod(proxy, 'backup.importVmBackup', params, {
const logsStream = await app.callProxyMethod(remote.proxy, 'backup.importVmBackup', params, {
assertType: 'iterator',
})
const localTaskIds = { __proto__: null }
for await (const log of logsStream) {
const { event, message, taskId } = log
const common = {
data: log.data,
event: 'task.' + event,
result: log.result,
status: log.status,
handleBackupLog(log, {
logger,
localTaskIds,
handleRootTaskId: id => {
this._runningRestores.add(id)
rootTaskId = id
},
})
}
if (event === 'start') {
const { parentId } = log
if (parentId === undefined) {
rootTaskId = localTaskIds[taskId] = logger.notice(message, common)
this._runningRestores.add(rootTaskId)
} else {
common.parentId = localTaskIds[parentId]
localTaskIds[taskId] = logger.notice(message, common)
}
} else {
common.taskId = localTaskIds[taskId]
logger.notice(message, common)
}
}
return
} catch (error) {
if (invalidParameters.is(error)) {
delete params.streamLogs
return app.callProxyMethod(proxy, 'backup.importVmBackup', params)
return app.callProxyMethod(remote.proxy, 'backup.importVmBackup', params)
}
throw error
}
}
const handler = await app.getRemoteHandler(remoteId)
const metadata: Metadata = JSON.parse(String(await handler.readFile(metadataFilename)))
const importer = importers[metadata.mode]
if (importer === undefined) {
throw new Error(`no importer for backup mode ${metadata.mode}`)
}
const { jobId, timestamp: time } = metadata
return wrapTaskFn(
} else {
await using(app.getBackupsRemoteAdapter(remote), async adapter => {
const metadata: Metadata = await adapter.readVmBackupMetadata(metadataFilename)
const localTaskIds = { __proto__: null }
return Task.run(
{
data: {
jobId,
jobId: metadata.jobId,
srId,
time,
time: metadata.timestamp,
},
name: 'restore',
onLog: log =>
handleBackupLog(log, {
logger,
message: 'restore',
localTaskIds,
handleRootTaskId: id => {
this._runningRestores.add(id)
rootTaskId = id
},
taskId => {
rootTaskId = taskId
this._runningRestores.add(taskId)
return importer(handler, metadataFilename, metadata, xapi, sr, taskId, logger)
}),
},
async () =>
new ImportVmBackup({
adapter,
metadata,
srUuid: srId,
xapi: await app.getXapi(srId),
}).run()
)
})
}
)()
} finally {
this._runningRestores.delete(rootTaskId)
}

View File

@ -8,6 +8,7 @@ import { RestoreMetadataBackup } from '@xen-orchestra/backups/RestoreMetadataBac
import { Task } from '@xen-orchestra/backups/Task'
import { debounceWithKey, REMOVE_CACHE_ENTRY } from '../_pDebounceWithKey'
import { handleBackupLog } from '../_handleBackupLog'
import { waitAll } from '../_waitAll'
import { type Xapi } from '../xapi'
import { safeDateFormat, serializeError, type SimpleIdPattern, unboxIdsFromPattern } from '../utils'
@ -67,30 +68,6 @@ const deleteOldBackups = (handler, dir, retention, handleError) =>
)
}, handleError)
const handleLog = (log, { logger, localTaskIds, handleRootTaskId }) => {
const { event, message, taskId } = log
const common = {
data: log.data,
event: 'task.' + event,
result: log.result,
status: log.status,
}
if (event === 'start') {
const { parentId } = log
if (parentId === undefined) {
handleRootTaskId((localTaskIds[taskId] = logger.notice(message, common)))
} else {
common.parentId = localTaskIds[parentId]
localTaskIds[taskId] = logger.notice(message, common)
}
} else {
common.taskId = localTaskIds[taskId]
logger.notice(message, common)
}
}
// metadata.json
//
// {
@ -805,7 +782,7 @@ export default class metadataBackup {
}
}
handleLog(log, {
handleBackupLog(log, {
logger,
localTaskIds,
handleRootTaskId: id => {