Compare commits
10 Commits
pool-autop
...
feat_cbt
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
26c9338f54 | ||
|
|
4ae0e3912f | ||
|
|
c3571325c5 | ||
|
|
9d56ab2a00 | ||
|
|
b3e163d090 | ||
|
|
1e0c411d5f | ||
|
|
d30b5950fc | ||
|
|
98ec3f4c5e | ||
|
|
ff57bc2a0b | ||
|
|
ee0fd9ab8e |
@@ -61,22 +61,23 @@ export default class MultiNbdClient {
|
||||
async *readBlocks(indexGenerator) {
|
||||
// default : read all blocks
|
||||
const readAhead = []
|
||||
const makeReadBlockPromise = (index, size) => {
|
||||
const promise = this.readBlock(index, size)
|
||||
const makeReadBlockPromise = (index, buffer, size) => {
|
||||
// pass through any pre loaded buffer
|
||||
const promise = buffer ? Promise.resolve(buffer) : this.readBlock(index, size)
|
||||
// error is handled during unshift
|
||||
promise.catch(() => {})
|
||||
return promise
|
||||
}
|
||||
|
||||
// read all blocks, but try to keep readAheadMaxLength promise waiting ahead
|
||||
for (const { index, size } of indexGenerator()) {
|
||||
for (const { index, buffer, size } of indexGenerator()) {
|
||||
// stack readAheadMaxLength promises before starting to handle the results
|
||||
if (readAhead.length === this.#readAhead) {
|
||||
// any error will stop reading blocks
|
||||
yield readAhead.shift()
|
||||
}
|
||||
|
||||
readAhead.push(makeReadBlockPromise(index, size))
|
||||
readAhead.push(makeReadBlockPromise(index, buffer, size))
|
||||
}
|
||||
while (readAhead.length > 0) {
|
||||
yield readAhead.shift()
|
||||
|
||||
@@ -79,9 +79,16 @@ export async function exportIncrementalVm(
|
||||
$SR$uuid: vdi.$SR.uuid,
|
||||
}
|
||||
|
||||
let changedBlocks
|
||||
console.log('CBT ? ', vdi.cbt_enabled,vdiRef,baseVdi?.$ref)
|
||||
if (vdi.cbt_enabled && baseVdi?.$ref) {
|
||||
// @todo log errors and fallback to default mode
|
||||
changedBlocks = await vdi.$listChangedBlock(baseVdi?.$ref)
|
||||
}
|
||||
streams[`${vdiRef}.vhd`] = await vdi.$exportContent({
|
||||
baseRef: baseVdi?.$ref,
|
||||
cancelToken,
|
||||
changedBlocks,
|
||||
format: 'vhd',
|
||||
nbdConcurrency,
|
||||
preferNbd,
|
||||
|
||||
@@ -193,6 +193,17 @@ export const AbstractXapi = class AbstractXapiVmBackupRunner extends Abstract {
|
||||
const allSettings = this.job.settings
|
||||
const baseSettings = this._baseSettings
|
||||
const baseVmRef = this._baseVm?.$ref
|
||||
if (this._settings.deltaComputeMode === 'CBT' && this._exportedVm?.$ref && this._exportedVm?.$ref != this._vm.$ref) {
|
||||
console.log('WILL PURGE',this._exportedVm?.$ref)
|
||||
const xapi = this._xapi
|
||||
const vdiRefs = await this._xapi.VM_getDisks(this._exportedVm?.$ref)
|
||||
await xapi.call('VM.destroy',this._exportedVm.$ref)
|
||||
// @todo: ensure it is really the snapshot
|
||||
for (const vdiRef of vdiRefs) {
|
||||
// @todo handle error
|
||||
await xapi.VDI_dataDestroy(vdiRef)
|
||||
}
|
||||
}
|
||||
|
||||
const snapshotsPerSchedule = groupBy(this._jobSnapshots, _ => _.other_config['xo:backup:schedule'])
|
||||
const xapi = this._xapi
|
||||
@@ -208,6 +219,8 @@ export const AbstractXapi = class AbstractXapiVmBackupRunner extends Abstract {
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
|
||||
}
|
||||
|
||||
async copy() {
|
||||
@@ -226,6 +239,22 @@ export const AbstractXapi = class AbstractXapiVmBackupRunner extends Abstract {
|
||||
throw new Error('Not implemented')
|
||||
}
|
||||
|
||||
async enableCbt() {
|
||||
// for each disk of the VM , enable CBT
|
||||
if (this._settings.deltaComputeMode !== 'CBT') {
|
||||
return
|
||||
}
|
||||
const vm = this._vm
|
||||
const xapi = this._xapi
|
||||
console.log(vm.VBDs)
|
||||
const vdiRefs = await vm.$getDisks(vm.VBDs)
|
||||
for (const vdiRef of vdiRefs) {
|
||||
// @todo handle error
|
||||
await xapi.VDI_enableChangeBlockTracking(vdiRef)
|
||||
}
|
||||
// @todo : when do we disable CBT ?
|
||||
}
|
||||
|
||||
async run($defer) {
|
||||
const settings = this._settings
|
||||
assert(
|
||||
@@ -246,7 +275,7 @@ export const AbstractXapi = class AbstractXapiVmBackupRunner extends Abstract {
|
||||
|
||||
await this._cleanMetadata()
|
||||
await this._removeUnusedSnapshots()
|
||||
|
||||
await this.enableCbt()
|
||||
const vm = this._vm
|
||||
const isRunning = vm.power_state === 'Running'
|
||||
const startAfter = isRunning && (settings.offlineBackup ? 'backup' : settings.offlineSnapshot && 'snapshot')
|
||||
@@ -267,6 +296,7 @@ export const AbstractXapi = class AbstractXapiVmBackupRunner extends Abstract {
|
||||
await this._exportedVm.update_blocked_operations({ pool_migrate: reason, migrate_send: reason })
|
||||
try {
|
||||
await this._copy()
|
||||
// @todo if CBT is enabled : should call vdi.datadestroy on snapshot here
|
||||
} finally {
|
||||
await this._exportedVm.update_blocked_operations({ pool_migrate, migrate_send })
|
||||
}
|
||||
|
||||
@@ -83,9 +83,31 @@ class Vdi {
|
||||
}
|
||||
}
|
||||
|
||||
// return an buffer with 0/1 bit, showing if the 64KB block corresponding
|
||||
// in the raw vdi has changed
|
||||
async listChangedBlock(ref, baseRef){
|
||||
console.log('listchanged blocks', ref, baseRef)
|
||||
const encoded = await this.call('VDI.list_changed_blocks', baseRef, ref)
|
||||
console.log({encoded})
|
||||
const buf = Buffer.from(encoded, 'base64')
|
||||
console.log({buf})
|
||||
return buf
|
||||
}
|
||||
|
||||
async enableChangeBlockTracking(ref){
|
||||
return this.call('VDI.enable_cbt', ref)
|
||||
}
|
||||
async disableChangeBlockTracking(ref){
|
||||
return this.call('VDI.disable_cbt', ref)
|
||||
}
|
||||
|
||||
async dataDestroy(ref){
|
||||
return this.call('VDI.data_destroy', ref)
|
||||
}
|
||||
|
||||
async exportContent(
|
||||
ref,
|
||||
{ baseRef, cancelToken = CancelToken.none, format, nbdConcurrency = 1, preferNbd = this._preferNbd }
|
||||
{ baseRef, cancelToken = CancelToken.none, changedBlocks, format, nbdConcurrency = 1, preferNbd = this._preferNbd }
|
||||
) {
|
||||
const query = {
|
||||
format,
|
||||
@@ -114,7 +136,7 @@ class Vdi {
|
||||
})
|
||||
if (nbdClient !== undefined && format === VDI_FORMAT_VHD) {
|
||||
const taskRef = await this.task_create(`Exporting content of VDI ${vdiName} using NBD`)
|
||||
stream = await createNbdVhdStream(nbdClient, stream)
|
||||
stream = await createNbdVhdStream(nbdClient, stream, {changedBlocks})
|
||||
stream.on('progress', progress => this.call('task.set_progress', taskRef, progress))
|
||||
finished(stream, () => this.task_destroy(taskRef))
|
||||
}
|
||||
|
||||
@@ -42,7 +42,8 @@
|
||||
|
||||
- @xen-orchestra/backups patch
|
||||
- @xen-orchestra/fs patch
|
||||
- @xen-orchestra/xapi patch
|
||||
- @xen-orchestra/xapi minor
|
||||
- @vates/nbd-client minor
|
||||
- vhd-lib patch
|
||||
- xo-server minor
|
||||
- xo-server-audit patch
|
||||
|
||||
@@ -14,6 +14,7 @@ const {
|
||||
const { fuHeader, checksumStruct } = require('./_structs')
|
||||
const assert = require('node:assert')
|
||||
|
||||
const NBD_DEFAULT_BLOCK_SIZE = 64 * 1024
|
||||
const MAX_DURATION_BETWEEN_PROGRESS_EMIT = 5e3
|
||||
const MIN_TRESHOLD_PERCENT_BETWEEN_PROGRESS_EMIT = 1
|
||||
|
||||
@@ -34,10 +35,42 @@ exports.createNbdRawStream = function createRawStream(nbdClient) {
|
||||
return stream
|
||||
}
|
||||
|
||||
function batContainsBlock(bat, blockId) {
|
||||
const entry = bat.readUInt32BE(blockId * 4)
|
||||
if (entry !== BLOCK_UNUSED) {
|
||||
return [{ blockId, size: DEFAULT_BLOCK_SIZE }]
|
||||
}
|
||||
}
|
||||
// one 2MB VHD block is in 32 blocks of 64KB
|
||||
// 32 bits are written in 8 4bytes uint32
|
||||
const EMPTY_NBD_BUFFER = Buffer.alloc(NBD_DEFAULT_BLOCK_SIZE, 0)
|
||||
function cbtContainsBlock(cbt, blockId) {
|
||||
const subBlocks = []
|
||||
let hasOne = false
|
||||
for (let i = 0; i < 32; i++) {
|
||||
const position = blockId * 32 + i
|
||||
const bitOffset = position & 7 // in byte
|
||||
const byteIndex = position >> 3 // in buffer
|
||||
const bit = (cbt[byteIndex] >> bitOffset) & 1
|
||||
if (bit === 1) {
|
||||
console.log('CBT contains block', blockId)
|
||||
console.log({position,bitOffset,byteIndex, cbt:cbt[byteIndex],bit})
|
||||
subBlocks.push({ blockId: position, size: NBD_DEFAULT_BLOCK_SIZE })
|
||||
hasOne = true
|
||||
} else {
|
||||
// don't read empty blocks
|
||||
subBlocks.push({ buffer: EMPTY_NBD_BUFFER })
|
||||
}
|
||||
}
|
||||
if (hasOne) {
|
||||
return subBlocks
|
||||
}
|
||||
}
|
||||
exports.createNbdVhdStream = async function createVhdStream(
|
||||
nbdClient,
|
||||
sourceStream,
|
||||
{
|
||||
changedBlocks,
|
||||
maxDurationBetweenProgressEmit = MAX_DURATION_BETWEEN_PROGRESS_EMIT,
|
||||
minTresholdPercentBetweenProgressEmit = MIN_TRESHOLD_PERCENT_BETWEEN_PROGRESS_EMIT,
|
||||
} = {}
|
||||
@@ -51,7 +84,10 @@ exports.createNbdVhdStream = async function createVhdStream(
|
||||
await skipStrict(sourceStream, header.tableOffset - (FOOTER_SIZE + HEADER_SIZE))
|
||||
// new table offset
|
||||
header.tableOffset = FOOTER_SIZE + HEADER_SIZE
|
||||
const streamBat = await readChunkStrict(sourceStream, batSize)
|
||||
let streamBat
|
||||
if (changedBlocks === undefined) {
|
||||
streamBat = await readChunkStrict(sourceStream, batSize)
|
||||
}
|
||||
let offset = FOOTER_SIZE + HEADER_SIZE + batSize
|
||||
// check if parentlocator are ordered
|
||||
let precLocator = 0
|
||||
@@ -79,14 +115,14 @@ exports.createNbdVhdStream = async function createVhdStream(
|
||||
// compute a BAT with the position that the block will have in the resulting stream
|
||||
// blocks starts directly after parent locator entries
|
||||
const entries = []
|
||||
for (let i = 0; i < header.maxTableEntries; i++) {
|
||||
const entry = streamBat.readUInt32BE(i * 4)
|
||||
if (entry !== BLOCK_UNUSED) {
|
||||
bat.writeUInt32BE(offsetSector, i * 4)
|
||||
entries.push(i)
|
||||
for (let blockId = 0; blockId < header.maxTableEntries; blockId++) {
|
||||
const subBlocks = changedBlocks ? cbtContainsBlock(changedBlocks, blockId) : batContainsBlock(streamBat, blockId)
|
||||
if (subBlocks !== undefined) {
|
||||
bat.writeUInt32BE(offsetSector, blockId * 4)
|
||||
entries.push({ blockId, subBlocks })
|
||||
offsetSector += blockSizeInSectors
|
||||
} else {
|
||||
bat.writeUInt32BE(BLOCK_UNUSED, i * 4)
|
||||
bat.writeUInt32BE(BLOCK_UNUSED, blockId * 4)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -137,8 +173,10 @@ exports.createNbdVhdStream = async function createVhdStream(
|
||||
|
||||
// yield blocks from nbd
|
||||
const nbdIterator = nbdClient.readBlocks(function* () {
|
||||
for (const entry of entries) {
|
||||
yield { index: entry, size: DEFAULT_BLOCK_SIZE }
|
||||
for (const { subBlocks } of entries) {
|
||||
for (const { blockId, buffer, size } of subBlocks) {
|
||||
yield { index: blockId, buffer, size }
|
||||
}
|
||||
}
|
||||
})
|
||||
const bitmap = Buffer.alloc(SECTOR_SIZE, 255)
|
||||
|
||||
@@ -615,7 +615,7 @@ const TRANSFORMS = {
|
||||
vdi(obj) {
|
||||
const vdi = {
|
||||
type: 'VDI',
|
||||
|
||||
cbt_enabled: obj.cbt_enabled,
|
||||
missing: obj.missing,
|
||||
name_description: obj.name_description,
|
||||
name_label: obj.name_label,
|
||||
|
||||
@@ -590,6 +590,9 @@ const messages = {
|
||||
preferNbd: 'Use NBD protocol to transfer disk if available',
|
||||
preferNbdInformation: 'A network accessible by XO or the proxy must have NBD enabled',
|
||||
nbdConcurrency: 'Number of NBD connexion per disk',
|
||||
deltaComputationMode: 'Delta computation mode',
|
||||
deltaComputationModeSnapshot: 'Snapshot comparison',
|
||||
deltaComputationModeCbt: 'Change Block Tracking',
|
||||
|
||||
// ------ New Remote -----
|
||||
newRemote: 'New file system remote',
|
||||
|
||||
@@ -45,6 +45,7 @@ import { RemoteProxy, RemoteProxyWarning } from './_remoteProxy'
|
||||
|
||||
import getSettingsWithNonDefaultValue from '../_getSettingsWithNonDefaultValue'
|
||||
import { canDeltaBackup, constructPattern, destructPattern, FormFeedback, FormGroup, Input, Li, Ul } from './../utils'
|
||||
import Select from '../../../common/form/select'
|
||||
|
||||
export NewMetadataBackup from './metadata'
|
||||
export NewMirrorBackup from './mirror'
|
||||
@@ -635,11 +636,18 @@ const New = decorate([
|
||||
nbdConcurrency,
|
||||
})
|
||||
},
|
||||
setDeltaComputationMode({ setGlobalSettings }, deltaComputeMode) {
|
||||
console.log({deltaComputeMode})
|
||||
setGlobalSettings({
|
||||
deltaComputeMode: deltaComputeMode.value,
|
||||
})
|
||||
},
|
||||
},
|
||||
computed: {
|
||||
compressionId: generateId,
|
||||
formId: generateId,
|
||||
inputConcurrencyId: generateId,
|
||||
inputDeltaComputationMode: generateId,
|
||||
inputFullIntervalId: generateId,
|
||||
inputMaxExportRate: generateId,
|
||||
inputPreferNbd: generateId,
|
||||
@@ -753,6 +761,7 @@ const New = decorate([
|
||||
const {
|
||||
checkpointSnapshot,
|
||||
concurrency,
|
||||
deltaComputationMode = 'AGAINST_PREVIOUS_SNAPSHOT',
|
||||
fullInterval,
|
||||
maxExportRate,
|
||||
nbdConcurrency = 1,
|
||||
@@ -1107,6 +1116,24 @@ const New = decorate([
|
||||
offlineSnapshot={offlineSnapshot}
|
||||
setGlobalSettings={effects.setGlobalSettings}
|
||||
/>
|
||||
|
||||
{state.isDelta && (
|
||||
<FormGroup>
|
||||
<label htmlFor={state.inputDeltaComputationMode}>
|
||||
<strong>{_('deltaComputationMode')}</strong>
|
||||
</label>
|
||||
<Select
|
||||
id={state.inputDeltaComputationMode}
|
||||
onChange={effects.setDeltaComputationMode}
|
||||
value={deltaComputationMode}
|
||||
disabled={!state.inputPreferNbd}
|
||||
options={[
|
||||
{ label: _('deltaComputationModeSnapshot'), value: 'AGAINST_PREVIOUS_SNAPSHOT' },
|
||||
{ label: _('deltaComputationModeCbt'), value: 'CBT' },
|
||||
]}
|
||||
/>
|
||||
</FormGroup>
|
||||
)}
|
||||
</div>
|
||||
)}
|
||||
</CardBlock>
|
||||
|
||||
Reference in New Issue
Block a user