feat(delta NG): check VDIs before export (#3069)
This commit is contained in:
@@ -18,6 +18,7 @@
|
||||
- Fix Nagios backup reports [#2991](https://github.com/vatesfr/xen-orchestra/issues/2991)
|
||||
- Fix the retry of a single failed/interrupted VM backup [#2912](https://github.com/vatesfr/xen-orchestra/issues/2912#issuecomment-395480321)
|
||||
- New VM with Self: filter out networks that are not in the template's pool [#3011](https://github.com/vatesfr/xen-orchestra/issues/3011)
|
||||
- [Backup NG] Auto-detect when a full export is necessary.
|
||||
|
||||
## **5.20.0** (2018-05-31)
|
||||
|
||||
|
||||
4
flow-typed/lodash.js
vendored
4
flow-typed/lodash.js
vendored
@@ -1,4 +1,8 @@
|
||||
declare module 'lodash' {
|
||||
declare export function countBy<K, V>(
|
||||
object: { [K]: V },
|
||||
iteratee: K | ((V, K) => string)
|
||||
): { [string]: number }
|
||||
declare export function forEach<K, V>(
|
||||
object: { [K]: V },
|
||||
iteratee: (V, K) => void
|
||||
|
||||
@@ -70,7 +70,7 @@ import {
|
||||
// ===================================================================
|
||||
|
||||
const TAG_BASE_DELTA = 'xo:base_delta'
|
||||
const TAG_COPY_SRC = 'xo:copy_of'
|
||||
export const TAG_COPY_SRC = 'xo:copy_of'
|
||||
|
||||
// ===================================================================
|
||||
|
||||
|
||||
@@ -41,12 +41,18 @@ declare export class Vbd extends XapiObject {
|
||||
VDI: string;
|
||||
}
|
||||
|
||||
declare export class Vdi extends XapiObject {
|
||||
$snapshot_of: Vdi;
|
||||
uuid: string;
|
||||
}
|
||||
|
||||
declare export class Vm extends XapiObject {
|
||||
$snapshots: Vm[];
|
||||
$VBDs: Vbd[];
|
||||
is_a_snapshot: boolean;
|
||||
is_a_template: boolean;
|
||||
name_label: string;
|
||||
power_state: 'Running' | 'Halted' | 'Paused' | 'Suspended';
|
||||
other_config: $Dict<string>;
|
||||
snapshot_time: number;
|
||||
uuid: string;
|
||||
@@ -74,21 +80,24 @@ declare export class Xapi {
|
||||
_snapshotVm(cancelToken: mixed, vm: Vm, nameLabel?: string): Promise<Vm>;
|
||||
|
||||
addTag(object: Id, tag: string): Promise<void>;
|
||||
barrier(): void;
|
||||
barrier(ref: string): XapiObject;
|
||||
barrier(): Promise<void>;
|
||||
barrier(ref: string): Promise<XapiObject>;
|
||||
deleteVm(vm: Id): Promise<void>;
|
||||
editVm(vm: Id, $Dict<mixed>): Promise<void>;
|
||||
exportDeltaVm(
|
||||
cancelToken: mixed,
|
||||
snapshot: Id,
|
||||
baseSnapshot ?: Id
|
||||
): Promise<DeltaVmExport>;
|
||||
exportVm(
|
||||
cancelToken: mixed,
|
||||
vm: Vm,
|
||||
options ?: Object
|
||||
): Promise<AugmentedReadable>;
|
||||
getObject(object: Id): XapiObject;
|
||||
importDeltaVm(data: DeltaVmImport, options: Object): Promise<{ vm: Vm }>;
|
||||
importVm(stream: AugmentedReadable, options: Object): Promise<Vm>;
|
||||
exportDeltaVm(
|
||||
cancelToken: mixed,
|
||||
snapshot: Id,
|
||||
baseSnapshot ?: Id,
|
||||
opts?: { fullVdisRequired?: string[] }
|
||||
): Promise<DeltaVmExport>;
|
||||
exportVm(
|
||||
cancelToken: mixed,
|
||||
vm: Vm,
|
||||
options ?: Object
|
||||
): Promise<AugmentedReadable>;
|
||||
getObject(object: Id): XapiObject;
|
||||
importDeltaVm(data: DeltaVmImport, options: Object): Promise<{ vm: Vm }>;
|
||||
importVm(stream: AugmentedReadable, options: Object): Promise<Vm>;
|
||||
shutdownVm(object: Id): Promise<void>;
|
||||
startVm(object: Id): Promise<void>;
|
||||
}
|
||||
|
||||
@@ -6,8 +6,19 @@ import defer from 'golike-defer'
|
||||
import limitConcurrency from 'limit-concurrency-decorator'
|
||||
import { type Pattern, createPredicate } from 'value-matcher'
|
||||
import { type Readable, PassThrough } from 'stream'
|
||||
import { AssertionError } from 'assert'
|
||||
import { basename, dirname } from 'path'
|
||||
import { isEmpty, last, mapValues, noop, some, sum, values } from 'lodash'
|
||||
import {
|
||||
countBy,
|
||||
forEach,
|
||||
isEmpty,
|
||||
last,
|
||||
mapValues,
|
||||
noop,
|
||||
some,
|
||||
sum,
|
||||
values,
|
||||
} from 'lodash'
|
||||
import {
|
||||
fromEvent as pFromEvent,
|
||||
ignoreErrors,
|
||||
@@ -25,9 +36,12 @@ import createSizeStream from '../../size-stream'
|
||||
import {
|
||||
type DeltaVmExport,
|
||||
type DeltaVmImport,
|
||||
type Vdi,
|
||||
type Vm,
|
||||
type Xapi,
|
||||
TAG_COPY_SRC,
|
||||
} from '../../xapi'
|
||||
import { getVmDisks } from '../../xapi/utils'
|
||||
import {
|
||||
asyncMap,
|
||||
resolveRelativeFromFile,
|
||||
@@ -419,7 +433,8 @@ export default class BackupNg {
|
||||
}
|
||||
|
||||
const job: BackupJob = (job_: any)
|
||||
let vms: $Dict<Vm>
|
||||
|
||||
let vms: $Dict<Vm> | void
|
||||
if (vmId === undefined) {
|
||||
vms = app.getObjects({
|
||||
filter: createPredicate({
|
||||
@@ -433,6 +448,20 @@ export default class BackupNg {
|
||||
}
|
||||
const jobId = job.id
|
||||
const scheduleId = schedule.id
|
||||
const srs = unboxIds(job.srs).map(id => {
|
||||
const xapi = app.getXapi(id)
|
||||
return {
|
||||
__proto__: xapi.getObject(id),
|
||||
xapi,
|
||||
}
|
||||
})
|
||||
const remotes = await Promise.all(
|
||||
unboxIds(job.remotes).map(async id => ({
|
||||
id,
|
||||
handler: await app.getRemoteHandler(id),
|
||||
}))
|
||||
)
|
||||
|
||||
let handleVm = async vm => {
|
||||
const { name_label: name, uuid } = vm
|
||||
const taskId: string = logger.notice(
|
||||
@@ -455,7 +484,9 @@ export default class BackupNg {
|
||||
job,
|
||||
schedule,
|
||||
logger,
|
||||
taskId
|
||||
taskId,
|
||||
srs,
|
||||
remotes
|
||||
)
|
||||
const vmTimeout: number = getSetting(job.settings, 'vmTimeout', [
|
||||
uuid,
|
||||
@@ -483,15 +514,13 @@ export default class BackupNg {
|
||||
}
|
||||
}
|
||||
|
||||
if (vmId !== undefined) {
|
||||
if (vms === undefined) {
|
||||
return handleVm(await app.getObject(vmId))
|
||||
}
|
||||
|
||||
const concurrency: number | void = getSetting(
|
||||
job.settings,
|
||||
'concurrency',
|
||||
['']
|
||||
)
|
||||
const concurrency: number = getSetting(job.settings, 'concurrency', [
|
||||
'',
|
||||
])
|
||||
if (concurrency !== 0) {
|
||||
handleVm = limitConcurrency(concurrency)(handleVm)
|
||||
}
|
||||
@@ -692,7 +721,9 @@ export default class BackupNg {
|
||||
job: BackupJob,
|
||||
schedule: Schedule,
|
||||
logger: any,
|
||||
taskId: string
|
||||
taskId: string,
|
||||
srs: any[],
|
||||
remotes: any[]
|
||||
): Promise<void> {
|
||||
const app = this._app
|
||||
const xapi = app.getXapi(vmUuid)
|
||||
@@ -725,11 +756,10 @@ export default class BackupNg {
|
||||
scheduleId,
|
||||
])
|
||||
|
||||
const remotes = unboxIds(job.remotes)
|
||||
if (copyRetention === undefined) {
|
||||
// if copyRetention is not defined, it uses exportRetention's value due to
|
||||
// previous implementation which did not support copyRetention
|
||||
copyRetention = exportRetention
|
||||
copyRetention = srs.length === 0 ? 0 : exportRetention
|
||||
|
||||
if (remotes.length === 0) {
|
||||
exportRetention = 0
|
||||
@@ -738,7 +768,6 @@ export default class BackupNg {
|
||||
throw new Error('export retention must be 0 without remotes')
|
||||
}
|
||||
|
||||
const srs = unboxIds(job.srs)
|
||||
if (copyRetention !== 0 && srs.length === 0) {
|
||||
throw new Error('copy retention must be 0 without SRs')
|
||||
}
|
||||
@@ -848,7 +877,7 @@ export default class BackupNg {
|
||||
xapi.barrier(snapshot.$ref)
|
||||
): any): Vm)
|
||||
|
||||
if (exportRetention === 0) {
|
||||
if (copyRetention === 0 && exportRetention === 0) {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -910,17 +939,15 @@ export default class BackupNg {
|
||||
[
|
||||
...remotes.map(
|
||||
wrapTaskFn(
|
||||
id => ({
|
||||
({ id }) => ({
|
||||
data: { id, type: 'remote' },
|
||||
logger,
|
||||
message: 'export',
|
||||
parentId: taskId,
|
||||
}),
|
||||
async (taskId, remoteId) => {
|
||||
async (taskId, { handler, id: remoteId }) => {
|
||||
const fork = forkExport()
|
||||
|
||||
const handler = await app.getRemoteHandler(remoteId)
|
||||
|
||||
const oldBackups: MetadataFull[] = (getOldEntries(
|
||||
exportRetention,
|
||||
await this._listVmBackups(
|
||||
@@ -957,17 +984,16 @@ export default class BackupNg {
|
||||
),
|
||||
...srs.map(
|
||||
wrapTaskFn(
|
||||
id => ({
|
||||
({ $id: id }) => ({
|
||||
data: { id, type: 'SR' },
|
||||
logger,
|
||||
message: 'export',
|
||||
parentId: taskId,
|
||||
}),
|
||||
async (taskId, srId) => {
|
||||
async (taskId, sr) => {
|
||||
const fork = forkExport()
|
||||
|
||||
const xapi = app.getXapi(srId)
|
||||
const sr = xapi.getObject(srId)
|
||||
const { $id: srId, xapi } = sr
|
||||
|
||||
const oldVms = getOldEntries(
|
||||
copyRetention,
|
||||
@@ -1023,12 +1049,78 @@ export default class BackupNg {
|
||||
$defer.onFailure.call(xapi, 'deleteVm', snapshot)
|
||||
}
|
||||
|
||||
const baseSnapshot = last(snapshots)
|
||||
if (baseSnapshot !== undefined) {
|
||||
console.log(baseSnapshot.$id) // TODO: remove
|
||||
// check current state
|
||||
// await Promise.all([asyncMap(remotes, remoteId => {})])
|
||||
}
|
||||
let baseSnapshot, fullVdisRequired
|
||||
await (async () => {
|
||||
baseSnapshot = (last(snapshots): Vm | void)
|
||||
if (baseSnapshot === undefined) {
|
||||
return
|
||||
}
|
||||
|
||||
const fullRequired = { __proto__: null }
|
||||
const vdis: $Dict<Vdi> = getVmDisks(baseSnapshot)
|
||||
|
||||
for (const { $id: srId, xapi } of srs) {
|
||||
const replicatedVm = listReplicatedVms(
|
||||
xapi,
|
||||
scheduleId,
|
||||
srId,
|
||||
vmUuid
|
||||
).find(vm => vm.other_config[TAG_COPY_SRC] === baseSnapshot.uuid)
|
||||
if (replicatedVm === undefined) {
|
||||
baseSnapshot = undefined
|
||||
return
|
||||
}
|
||||
|
||||
const replicatedVdis = countBy(
|
||||
getVmDisks(replicatedVm),
|
||||
vdi => vdi.other_config[TAG_COPY_SRC]
|
||||
)
|
||||
forEach(vdis, vdi => {
|
||||
if (!(vdi.uuid in replicatedVdis)) {
|
||||
fullRequired[vdi.$snapshot_of.$id] = true
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
await asyncMap(remotes, ({ handler }) => {
|
||||
return asyncMap(vdis, async vdi => {
|
||||
const snapshotOf = vdi.$snapshot_of
|
||||
const dir = `${vmDir}/vdis/${jobId}/${snapshotOf.uuid}`
|
||||
const files = await handler
|
||||
.list(dir, { filter: isVhd })
|
||||
.catch(_ => [])
|
||||
let full = true
|
||||
await asyncMap(files, async file => {
|
||||
if (file[0] !== '.') {
|
||||
try {
|
||||
const vhd = new Vhd(handler, `${dir}/${file}`)
|
||||
await vhd.readHeaderAndFooter()
|
||||
|
||||
if (
|
||||
Buffer.from(vhd.footer.uuid).toString('hex') ===
|
||||
vdi.uuid.split('-').join('')
|
||||
) {
|
||||
full = false
|
||||
}
|
||||
|
||||
return
|
||||
} catch (error) {
|
||||
if (!(error instanceof AssertionError)) {
|
||||
throw error
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// either a temporary file or an invalid VHD
|
||||
await handler.unlink(`${dir}/${file}`)
|
||||
})
|
||||
if (full) {
|
||||
fullRequired[snapshotOf.$id] = true
|
||||
}
|
||||
})
|
||||
})
|
||||
fullVdisRequired = Object.keys(fullRequired)
|
||||
})()
|
||||
|
||||
const deltaExport = await wrapTask(
|
||||
{
|
||||
@@ -1036,7 +1128,9 @@ export default class BackupNg {
|
||||
message: 'start snapshot export',
|
||||
parentId: taskId,
|
||||
},
|
||||
xapi.exportDeltaVm($cancelToken, snapshot, baseSnapshot)
|
||||
xapi.exportDeltaVm($cancelToken, snapshot, baseSnapshot, {
|
||||
fullVdisRequired,
|
||||
})
|
||||
)
|
||||
|
||||
const metadata: MetadataDelta = {
|
||||
@@ -1099,17 +1193,15 @@ export default class BackupNg {
|
||||
[
|
||||
...remotes.map(
|
||||
wrapTaskFn(
|
||||
id => ({
|
||||
({ id }) => ({
|
||||
data: { id, isFull, type: 'remote' },
|
||||
logger,
|
||||
message: 'export',
|
||||
parentId: taskId,
|
||||
}),
|
||||
async (taskId, remoteId) => {
|
||||
async (taskId, { handler, id: remoteId }) => {
|
||||
const fork = forkExport()
|
||||
|
||||
const handler = await app.getRemoteHandler(remoteId)
|
||||
|
||||
const oldBackups: MetadataDelta[] = (getOldEntries(
|
||||
exportRetention,
|
||||
await this._listVmBackups(
|
||||
@@ -1182,6 +1274,16 @@ export default class BackupNg {
|
||||
await chainVhd(handler, parentPath, handler, path)
|
||||
}
|
||||
|
||||
// set the correct UUID in the VHD
|
||||
const vhd = new Vhd(handler, path)
|
||||
await vhd.readHeaderAndFooter()
|
||||
vhd.footer.uuid = Buffer.from(
|
||||
vdi.uuid.split('-').join(''),
|
||||
'hex'
|
||||
)
|
||||
await vhd.readBlockAllocationTable() // required by writeFooter()
|
||||
await vhd.writeFooter()
|
||||
|
||||
return handler.getSize(path)
|
||||
})
|
||||
).then(sum)
|
||||
@@ -1196,17 +1298,16 @@ export default class BackupNg {
|
||||
),
|
||||
...srs.map(
|
||||
wrapTaskFn(
|
||||
id => ({
|
||||
({ $id: id }) => ({
|
||||
data: { id, isFull, type: 'SR' },
|
||||
logger,
|
||||
message: 'export',
|
||||
parentId: taskId,
|
||||
}),
|
||||
async (taskId, srId) => {
|
||||
async (taskId, sr) => {
|
||||
const fork = forkExport()
|
||||
|
||||
const xapi = app.getXapi(srId)
|
||||
const sr = xapi.getObject(srId)
|
||||
const { $id: srId, xapi } = sr
|
||||
|
||||
const oldVms = getOldEntries(
|
||||
copyRetention,
|
||||
@@ -1230,7 +1331,7 @@ export default class BackupNg {
|
||||
name_label: `${metadata.vm.name_label} (${safeDateFormat(
|
||||
metadata.timestamp
|
||||
)})`,
|
||||
srId: sr.$id,
|
||||
srId,
|
||||
})
|
||||
)
|
||||
|
||||
|
||||
@@ -60,6 +60,7 @@ export type CallJob = {|
|
||||
export type Executor = ({|
|
||||
app: Object,
|
||||
cancelToken: any,
|
||||
data: any,
|
||||
job: Job,
|
||||
logger: Logger,
|
||||
runJobId: string,
|
||||
|
||||
Reference in New Issue
Block a user