Compare commits

..

1 Commits

Author SHA1 Message Date
Florent Beauchamp
698086f4b2 fix(backups): clean snapshot correctly on failure 2024-02-21 14:38:53 +00:00
9 changed files with 42 additions and 148 deletions

View File

@@ -61,23 +61,22 @@ export default class MultiNbdClient {
async *readBlocks(indexGenerator) {
// default : read all blocks
const readAhead = []
const makeReadBlockPromise = (index, buffer, size) => {
// pass through any pre loaded buffer
const promise = buffer ? Promise.resolve(buffer) : this.readBlock(index, size)
const makeReadBlockPromise = (index, size) => {
const promise = this.readBlock(index, size)
// error is handled during unshift
promise.catch(() => {})
return promise
}
// read all blocks, but try to keep readAheadMaxLength promise waiting ahead
for (const { index, buffer, size } of indexGenerator()) {
for (const { index, size } of indexGenerator()) {
// stack readAheadMaxLength promises before starting to handle the results
if (readAhead.length === this.#readAhead) {
// any error will stop reading blocks
yield readAhead.shift()
}
readAhead.push(makeReadBlockPromise(index, buffer, size))
readAhead.push(makeReadBlockPromise(index, size))
}
while (readAhead.length > 0) {
yield readAhead.shift()

View File

@@ -79,16 +79,9 @@ export async function exportIncrementalVm(
$SR$uuid: vdi.$SR.uuid,
}
let changedBlocks
console.log('CBT ? ', vdi.cbt_enabled,vdiRef,baseVdi?.$ref)
if (vdi.cbt_enabled && baseVdi?.$ref) {
// @todo log errors and fallback to default mode
changedBlocks = await vdi.$listChangedBlock(baseVdi?.$ref)
}
streams[`${vdiRef}.vhd`] = await vdi.$exportContent({
baseRef: baseVdi?.$ref,
cancelToken,
changedBlocks,
format: 'vhd',
nbdConcurrency,
preferNbd,

View File

@@ -2,6 +2,7 @@ import assert from 'node:assert'
import groupBy from 'lodash/groupBy.js'
import ignoreErrors from 'promise-toolbox/ignoreErrors'
import { asyncMap } from '@xen-orchestra/async-map'
import { createLogger } from '@xen-orchestra/log'
import { decorateMethodsWith } from '@vates/decorate-with'
import { defer } from 'golike-defer'
import { formatDateTime } from '@xen-orchestra/xapi'
@@ -10,6 +11,8 @@ import { getOldEntries } from '../../_getOldEntries.mjs'
import { Task } from '../../Task.mjs'
import { Abstract } from './_Abstract.mjs'
const { info, warn } = createLogger('xo:backups:AbstractXapi')
export const AbstractXapi = class AbstractXapiVmBackupRunner extends Abstract {
constructor({
config,
@@ -171,6 +174,20 @@ export const AbstractXapi = class AbstractXapiVmBackupRunner extends Abstract {
}
}
// this will delete current snapshot in case of failure
// to ensure any retry will start with a clean state, especially in the case of rolling snapshots
#removeCurrentSnapshotOnFailure() {
if (this._mustDoSnapshot() && this._exportedVm !== undefined) {
info('will delete snapshot on failure', { vm: this._vm, snapshot: this._exportedVm })
assert.notStrictEqual(
this._vm.$ref,
this._exportedVm.$ref,
'there should have a snapshot, but vm and snapshot have the same ref'
)
return this._xapi.VM_destroy(this._exportedVm.$ref)
}
}
async _fetchJobSnapshots() {
const jobId = this._jobId
const vmRef = this._vm.$ref
@@ -193,17 +210,6 @@ export const AbstractXapi = class AbstractXapiVmBackupRunner extends Abstract {
const allSettings = this.job.settings
const baseSettings = this._baseSettings
const baseVmRef = this._baseVm?.$ref
if (this._settings.deltaComputeMode === 'CBT' && this._exportedVm?.$ref && this._exportedVm?.$ref != this._vm.$ref) {
console.log('WILL PURGE',this._exportedVm?.$ref)
const xapi = this._xapi
const vdiRefs = await this._xapi.VM_getDisks(this._exportedVm?.$ref)
await xapi.call('VM.destroy',this._exportedVm.$ref)
// @todo: ensure it is really the snapshot
for (const vdiRef of vdiRefs) {
// @todo handle error
await xapi.VDI_dataDestroy(vdiRef)
}
}
const snapshotsPerSchedule = groupBy(this._jobSnapshots, _ => _.other_config['xo:backup:schedule'])
const xapi = this._xapi
@@ -219,8 +225,6 @@ export const AbstractXapi = class AbstractXapiVmBackupRunner extends Abstract {
}
})
})
}
async copy() {
@@ -239,22 +243,6 @@ export const AbstractXapi = class AbstractXapiVmBackupRunner extends Abstract {
throw new Error('Not implemented')
}
async enableCbt() {
// for each disk of the VM , enable CBT
if (this._settings.deltaComputeMode !== 'CBT') {
return
}
const vm = this._vm
const xapi = this._xapi
console.log(vm.VBDs)
const vdiRefs = await vm.$getDisks(vm.VBDs)
for (const vdiRef of vdiRefs) {
// @todo handle error
await xapi.VDI_enableChangeBlockTracking(vdiRef)
}
// @todo : when do we disable CBT ?
}
async run($defer) {
const settings = this._settings
assert(
@@ -275,7 +263,7 @@ export const AbstractXapi = class AbstractXapiVmBackupRunner extends Abstract {
await this._cleanMetadata()
await this._removeUnusedSnapshots()
await this.enableCbt()
const vm = this._vm
const isRunning = vm.power_state === 'Running'
const startAfter = isRunning && (settings.offlineBackup ? 'backup' : settings.offlineSnapshot && 'snapshot')
@@ -296,16 +284,21 @@ export const AbstractXapi = class AbstractXapiVmBackupRunner extends Abstract {
await this._exportedVm.update_blocked_operations({ pool_migrate: reason, migrate_send: reason })
try {
await this._copy()
// @todo if CBT is enabled : should call vdi.datadestroy on snapshot here
} finally {
await this._exportedVm.update_blocked_operations({ pool_migrate, migrate_send })
}
}
} catch (error) {
try {
await this.#removeCurrentSnapshotOnFailure()
} catch (removeSnapshotError) {
warn('fail removing current snapshot', { error: removeSnapshotError })
}
throw error
} finally {
if (startAfter) {
ignoreErrors.call(vm.$callAsync('start', false, false))
}
await this._fetchJobSnapshots()
await this._removeUnusedSnapshots()
}

View File

@@ -83,31 +83,9 @@ class Vdi {
}
}
// return an buffer with 0/1 bit, showing if the 64KB block corresponding
// in the raw vdi has changed
async listChangedBlock(ref, baseRef){
console.log('listchanged blocks', ref, baseRef)
const encoded = await this.call('VDI.list_changed_blocks', baseRef, ref)
console.log({encoded})
const buf = Buffer.from(encoded, 'base64')
console.log({buf})
return buf
}
async enableChangeBlockTracking(ref){
return this.call('VDI.enable_cbt', ref)
}
async disableChangeBlockTracking(ref){
return this.call('VDI.disable_cbt', ref)
}
async dataDestroy(ref){
return this.call('VDI.data_destroy', ref)
}
async exportContent(
ref,
{ baseRef, cancelToken = CancelToken.none, changedBlocks, format, nbdConcurrency = 1, preferNbd = this._preferNbd }
{ baseRef, cancelToken = CancelToken.none, format, nbdConcurrency = 1, preferNbd = this._preferNbd }
) {
const query = {
format,
@@ -136,7 +114,7 @@ class Vdi {
})
if (nbdClient !== undefined && format === VDI_FORMAT_VHD) {
const taskRef = await this.task_create(`Exporting content of VDI ${vdiName} using NBD`)
stream = await createNbdVhdStream(nbdClient, stream, {changedBlocks})
stream = await createNbdVhdStream(nbdClient, stream)
stream.on('progress', progress => this.call('task.set_progress', taskRef, progress))
finished(stream, () => this.task_destroy(taskRef))
}

View File

@@ -42,8 +42,7 @@
- @xen-orchestra/backups patch
- @xen-orchestra/fs patch
- @xen-orchestra/xapi minor
- @vates/nbd-client minor
- @xen-orchestra/xapi patch
- vhd-lib patch
- xo-server minor
- xo-server-audit patch

View File

@@ -14,7 +14,6 @@ const {
const { fuHeader, checksumStruct } = require('./_structs')
const assert = require('node:assert')
const NBD_DEFAULT_BLOCK_SIZE = 64 * 1024
const MAX_DURATION_BETWEEN_PROGRESS_EMIT = 5e3
const MIN_TRESHOLD_PERCENT_BETWEEN_PROGRESS_EMIT = 1
@@ -35,42 +34,10 @@ exports.createNbdRawStream = function createRawStream(nbdClient) {
return stream
}
function batContainsBlock(bat, blockId) {
const entry = bat.readUInt32BE(blockId * 4)
if (entry !== BLOCK_UNUSED) {
return [{ blockId, size: DEFAULT_BLOCK_SIZE }]
}
}
// one 2MB VHD block is in 32 blocks of 64KB
// 32 bits are written in 8 4bytes uint32
const EMPTY_NBD_BUFFER = Buffer.alloc(NBD_DEFAULT_BLOCK_SIZE, 0)
function cbtContainsBlock(cbt, blockId) {
const subBlocks = []
let hasOne = false
for (let i = 0; i < 32; i++) {
const position = blockId * 32 + i
const bitOffset = position & 7 // in byte
const byteIndex = position >> 3 // in buffer
const bit = (cbt[byteIndex] >> bitOffset) & 1
if (bit === 1) {
console.log('CBT contains block', blockId)
console.log({position,bitOffset,byteIndex, cbt:cbt[byteIndex],bit})
subBlocks.push({ blockId: position, size: NBD_DEFAULT_BLOCK_SIZE })
hasOne = true
} else {
// don't read empty blocks
subBlocks.push({ buffer: EMPTY_NBD_BUFFER })
}
}
if (hasOne) {
return subBlocks
}
}
exports.createNbdVhdStream = async function createVhdStream(
nbdClient,
sourceStream,
{
changedBlocks,
maxDurationBetweenProgressEmit = MAX_DURATION_BETWEEN_PROGRESS_EMIT,
minTresholdPercentBetweenProgressEmit = MIN_TRESHOLD_PERCENT_BETWEEN_PROGRESS_EMIT,
} = {}
@@ -84,10 +51,7 @@ exports.createNbdVhdStream = async function createVhdStream(
await skipStrict(sourceStream, header.tableOffset - (FOOTER_SIZE + HEADER_SIZE))
// new table offset
header.tableOffset = FOOTER_SIZE + HEADER_SIZE
let streamBat
if (changedBlocks === undefined) {
streamBat = await readChunkStrict(sourceStream, batSize)
}
const streamBat = await readChunkStrict(sourceStream, batSize)
let offset = FOOTER_SIZE + HEADER_SIZE + batSize
// check if parentlocator are ordered
let precLocator = 0
@@ -115,14 +79,14 @@ exports.createNbdVhdStream = async function createVhdStream(
// compute a BAT with the position that the block will have in the resulting stream
// blocks starts directly after parent locator entries
const entries = []
for (let blockId = 0; blockId < header.maxTableEntries; blockId++) {
const subBlocks = changedBlocks ? cbtContainsBlock(changedBlocks, blockId) : batContainsBlock(streamBat, blockId)
if (subBlocks !== undefined) {
bat.writeUInt32BE(offsetSector, blockId * 4)
entries.push({ blockId, subBlocks })
for (let i = 0; i < header.maxTableEntries; i++) {
const entry = streamBat.readUInt32BE(i * 4)
if (entry !== BLOCK_UNUSED) {
bat.writeUInt32BE(offsetSector, i * 4)
entries.push(i)
offsetSector += blockSizeInSectors
} else {
bat.writeUInt32BE(BLOCK_UNUSED, blockId * 4)
bat.writeUInt32BE(BLOCK_UNUSED, i * 4)
}
}
@@ -173,10 +137,8 @@ exports.createNbdVhdStream = async function createVhdStream(
// yield blocks from nbd
const nbdIterator = nbdClient.readBlocks(function* () {
for (const { subBlocks } of entries) {
for (const { blockId, buffer, size } of subBlocks) {
yield { index: blockId, buffer, size }
}
for (const entry of entries) {
yield { index: entry, size: DEFAULT_BLOCK_SIZE }
}
})
const bitmap = Buffer.alloc(SECTOR_SIZE, 255)

View File

@@ -615,7 +615,7 @@ const TRANSFORMS = {
vdi(obj) {
const vdi = {
type: 'VDI',
cbt_enabled: obj.cbt_enabled,
missing: obj.missing,
name_description: obj.name_description,
name_label: obj.name_label,

View File

@@ -590,9 +590,6 @@ const messages = {
preferNbd: 'Use NBD protocol to transfer disk if available',
preferNbdInformation: 'A network accessible by XO or the proxy must have NBD enabled',
nbdConcurrency: 'Number of NBD connexion per disk',
deltaComputationMode: 'Delta computation mode',
deltaComputationModeSnapshot: 'Snapshot comparison',
deltaComputationModeCbt: 'Change Block Tracking',
// ------ New Remote -----
newRemote: 'New file system remote',

View File

@@ -45,7 +45,6 @@ import { RemoteProxy, RemoteProxyWarning } from './_remoteProxy'
import getSettingsWithNonDefaultValue from '../_getSettingsWithNonDefaultValue'
import { canDeltaBackup, constructPattern, destructPattern, FormFeedback, FormGroup, Input, Li, Ul } from './../utils'
import Select from '../../../common/form/select'
export NewMetadataBackup from './metadata'
export NewMirrorBackup from './mirror'
@@ -636,18 +635,11 @@ const New = decorate([
nbdConcurrency,
})
},
setDeltaComputationMode({ setGlobalSettings }, deltaComputeMode) {
console.log({deltaComputeMode})
setGlobalSettings({
deltaComputeMode: deltaComputeMode.value,
})
},
},
computed: {
compressionId: generateId,
formId: generateId,
inputConcurrencyId: generateId,
inputDeltaComputationMode: generateId,
inputFullIntervalId: generateId,
inputMaxExportRate: generateId,
inputPreferNbd: generateId,
@@ -761,7 +753,6 @@ const New = decorate([
const {
checkpointSnapshot,
concurrency,
deltaComputationMode = 'AGAINST_PREVIOUS_SNAPSHOT',
fullInterval,
maxExportRate,
nbdConcurrency = 1,
@@ -1116,24 +1107,6 @@ const New = decorate([
offlineSnapshot={offlineSnapshot}
setGlobalSettings={effects.setGlobalSettings}
/>
{state.isDelta && (
<FormGroup>
<label htmlFor={state.inputDeltaComputationMode}>
<strong>{_('deltaComputationMode')}</strong>
</label>
<Select
id={state.inputDeltaComputationMode}
onChange={effects.setDeltaComputationMode}
value={deltaComputationMode}
disabled={!state.inputPreferNbd}
options={[
{ label: _('deltaComputationModeSnapshot'), value: 'AGAINST_PREVIOUS_SNAPSHOT' },
{ label: _('deltaComputationModeCbt'), value: 'CBT' },
]}
/>
</FormGroup>
)}
</div>
)}
</CardBlock>