feat(backups/delta writers): split run method in prepare/transfer/cleanup
Fixes xoa-support#3523 This avoids starting the transfer before the writers are ready, which caused it to failed with `deleteFirst` when deletion was so long that the transfer stalled.
This commit is contained in:
parent
cf320c08c5
commit
f5024f0e75
@ -14,7 +14,7 @@ exports.ContinuousReplicationWriter = class ContinuousReplicationWriter {
|
|||||||
this._settings = settings
|
this._settings = settings
|
||||||
this._sr = sr
|
this._sr = sr
|
||||||
|
|
||||||
this.run = Task.wrapFn(
|
this.transfer = Task.wrapFn(
|
||||||
{
|
{
|
||||||
name: 'export',
|
name: 'export',
|
||||||
data: ({ deltaExport }) => ({
|
data: ({ deltaExport }) => ({
|
||||||
@ -23,8 +23,10 @@ exports.ContinuousReplicationWriter = class ContinuousReplicationWriter {
|
|||||||
type: 'SR',
|
type: 'SR',
|
||||||
}),
|
}),
|
||||||
},
|
},
|
||||||
this.run
|
this.transfer
|
||||||
)
|
)
|
||||||
|
|
||||||
|
this[settings.deleteFirst ? 'prepare' : 'cleanup'] = this._deleteOld
|
||||||
}
|
}
|
||||||
|
|
||||||
async checkBaseVdis(baseUuidToSrcVdi, baseVm) {
|
async checkBaseVdis(baseUuidToSrcVdi, baseVm) {
|
||||||
@ -51,9 +53,17 @@ exports.ContinuousReplicationWriter = class ContinuousReplicationWriter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async run({ timestamp, deltaExport, sizeContainers }) {
|
async _deleteOld() {
|
||||||
|
const { uuid: srUuid, $xapi: xapi } = this._sr
|
||||||
|
const { scheduleId, vm } = this._backup
|
||||||
|
|
||||||
|
const oldVms = getOldEntries(this._settings.copyRetention - 1, listReplicatedVms(xapi, scheduleId, srUuid, vm.uuid))
|
||||||
|
|
||||||
|
return asyncMapSettled(oldVms, vm => xapi.VM_destroy(vm.$ref))
|
||||||
|
}
|
||||||
|
|
||||||
|
async transfer({ timestamp, deltaExport, sizeContainers }) {
|
||||||
const sr = this._sr
|
const sr = this._sr
|
||||||
const settings = this._settings
|
|
||||||
const { job, scheduleId, vm } = this._backup
|
const { job, scheduleId, vm } = this._backup
|
||||||
|
|
||||||
const { uuid: srUuid, $xapi: xapi } = sr
|
const { uuid: srUuid, $xapi: xapi } = sr
|
||||||
@ -63,14 +73,6 @@ exports.ContinuousReplicationWriter = class ContinuousReplicationWriter {
|
|||||||
asyncMapSettled(listReplicatedVms(xapi, scheduleId, undefined, vm.uuid), vm => xapi.VM_destroy(vm.$ref))
|
asyncMapSettled(listReplicatedVms(xapi, scheduleId, undefined, vm.uuid), vm => xapi.VM_destroy(vm.$ref))
|
||||||
)
|
)
|
||||||
|
|
||||||
const oldVms = getOldEntries(settings.copyRetention - 1, listReplicatedVms(xapi, scheduleId, srUuid, vm.uuid))
|
|
||||||
|
|
||||||
const deleteOldBackups = () => asyncMapSettled(oldVms, vm => xapi.VM_destroy(vm.$ref))
|
|
||||||
const { deleteFirst } = settings
|
|
||||||
if (deleteFirst) {
|
|
||||||
await deleteOldBackups()
|
|
||||||
}
|
|
||||||
|
|
||||||
let targetVmRef
|
let targetVmRef
|
||||||
await Task.run({ name: 'transfer' }, async () => {
|
await Task.run({ name: 'transfer' }, async () => {
|
||||||
targetVmRef = await importDeltaVm(
|
targetVmRef = await importDeltaVm(
|
||||||
@ -108,9 +110,5 @@ exports.ContinuousReplicationWriter = class ContinuousReplicationWriter {
|
|||||||
'xo:backup:vm': vm.uuid,
|
'xo:backup:vm': vm.uuid,
|
||||||
}),
|
}),
|
||||||
])
|
])
|
||||||
|
|
||||||
if (!deleteFirst) {
|
|
||||||
await deleteOldBackups()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -22,7 +22,7 @@ exports.DeltaBackupWriter = class DeltaBackupWriter {
|
|||||||
this._backup = backup
|
this._backup = backup
|
||||||
this._settings = settings
|
this._settings = settings
|
||||||
|
|
||||||
this.run = Task.wrapFn(
|
this.transfer = Task.wrapFn(
|
||||||
{
|
{
|
||||||
name: 'export',
|
name: 'export',
|
||||||
data: ({ deltaExport }) => ({
|
data: ({ deltaExport }) => ({
|
||||||
@ -31,8 +31,10 @@ exports.DeltaBackupWriter = class DeltaBackupWriter {
|
|||||||
type: 'remote',
|
type: 'remote',
|
||||||
}),
|
}),
|
||||||
},
|
},
|
||||||
this.run
|
this.transfer
|
||||||
)
|
)
|
||||||
|
|
||||||
|
this[settings.deleteFirst ? 'prepare' : 'cleanup'] = this._deleteOld
|
||||||
}
|
}
|
||||||
|
|
||||||
async checkBaseVdis(baseUuidToSrcVdi) {
|
async checkBaseVdis(baseUuidToSrcVdi) {
|
||||||
@ -70,21 +72,12 @@ exports.DeltaBackupWriter = class DeltaBackupWriter {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async run({ timestamp, deltaExport, sizeContainers }) {
|
async _deleteOld() {
|
||||||
const adapter = this._adapter
|
const adapter = this._adapter
|
||||||
const backup = this._backup
|
const { scheduleId, vm } = this._backup
|
||||||
const settings = this._settings
|
|
||||||
|
|
||||||
const { job, scheduleId, vm } = backup
|
|
||||||
|
|
||||||
const jobId = job.id
|
|
||||||
const handler = adapter.handler
|
|
||||||
const backupDir = getVmBackupDir(vm.uuid)
|
|
||||||
|
|
||||||
// TODO: clean VM backup directory
|
|
||||||
|
|
||||||
const oldBackups = getOldEntries(
|
const oldBackups = getOldEntries(
|
||||||
settings.exportRetention - 1,
|
this._settings.exportRetention - 1,
|
||||||
await adapter.listVmBackups(vm.uuid, _ => _.mode === 'delta' && _.scheduleId === scheduleId)
|
await adapter.listVmBackups(vm.uuid, _ => _.mode === 'delta' && _.scheduleId === scheduleId)
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -102,17 +95,29 @@ exports.DeltaBackupWriter = class DeltaBackupWriter {
|
|||||||
oldBackups.length = maxMergedDeltasPerRun
|
oldBackups.length = maxMergedDeltasPerRun
|
||||||
}
|
}
|
||||||
|
|
||||||
const deleteOldBackups = () =>
|
return Task.run({ name: 'merge' }, async () => {
|
||||||
Task.run({ name: 'merge' }, async () => {
|
let size = 0
|
||||||
let size = 0
|
// delete sequentially from newest to oldest to avoid unnecessary merges
|
||||||
// delete sequentially from newest to oldest to avoid unnecessary merges
|
for (let i = oldBackups.length; i-- > 0; ) {
|
||||||
for (let i = oldBackups.length; i-- > 0; ) {
|
size += await adapter.deleteDeltaVmBackups([oldBackups[i]])
|
||||||
size += await adapter.deleteDeltaVmBackups([oldBackups[i]])
|
}
|
||||||
}
|
return {
|
||||||
return {
|
size,
|
||||||
size,
|
}
|
||||||
}
|
})
|
||||||
})
|
}
|
||||||
|
|
||||||
|
async transfer({ timestamp, deltaExport, sizeContainers }) {
|
||||||
|
const adapter = this._adapter
|
||||||
|
const backup = this._backup
|
||||||
|
|
||||||
|
const { job, scheduleId, vm } = backup
|
||||||
|
|
||||||
|
const jobId = job.id
|
||||||
|
const handler = adapter.handler
|
||||||
|
const backupDir = getVmBackupDir(vm.uuid)
|
||||||
|
|
||||||
|
// TODO: clean VM backup directory
|
||||||
|
|
||||||
const basename = formatFilenameDate(timestamp)
|
const basename = formatFilenameDate(timestamp)
|
||||||
const vhds = mapValues(
|
const vhds = mapValues(
|
||||||
@ -142,11 +147,6 @@ exports.DeltaBackupWriter = class DeltaBackupWriter {
|
|||||||
vmSnapshot: this._backup.exportedVm,
|
vmSnapshot: this._backup.exportedVm,
|
||||||
}
|
}
|
||||||
|
|
||||||
const { deleteFirst } = settings
|
|
||||||
if (deleteFirst) {
|
|
||||||
await deleteOldBackups()
|
|
||||||
}
|
|
||||||
|
|
||||||
const { size } = await Task.run({ name: 'transfer' }, async () => {
|
const { size } = await Task.run({ name: 'transfer' }, async () => {
|
||||||
await Promise.all(
|
await Promise.all(
|
||||||
map(deltaExport.vdis, async (vdi, id) => {
|
map(deltaExport.vdis, async (vdi, id) => {
|
||||||
@ -201,10 +201,6 @@ exports.DeltaBackupWriter = class DeltaBackupWriter {
|
|||||||
dirMode: backup.config.dirMode,
|
dirMode: backup.config.dirMode,
|
||||||
})
|
})
|
||||||
|
|
||||||
if (!deleteFirst) {
|
|
||||||
await deleteOldBackups()
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: run cleanup?
|
// TODO: run cleanup?
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -147,6 +147,8 @@ exports.VmBackup = class VmBackup {
|
|||||||
const { exportedVm } = this
|
const { exportedVm } = this
|
||||||
const baseVm = this._baseVm
|
const baseVm = this._baseVm
|
||||||
|
|
||||||
|
await asyncMap(this._writers, writer => writer.prepare && writer.prepare())
|
||||||
|
|
||||||
const deltaExport = await exportDeltaVm(exportedVm, baseVm, {
|
const deltaExport = await exportDeltaVm(exportedVm, baseVm, {
|
||||||
fullVdisRequired: this._fullVdisRequired,
|
fullVdisRequired: this._fullVdisRequired,
|
||||||
})
|
})
|
||||||
@ -156,7 +158,7 @@ exports.VmBackup = class VmBackup {
|
|||||||
|
|
||||||
await asyncMap(this._writers, async writer => {
|
await asyncMap(this._writers, async writer => {
|
||||||
try {
|
try {
|
||||||
await writer.run({
|
await writer.transfer({
|
||||||
deltaExport: forkDeltaExport(deltaExport),
|
deltaExport: forkDeltaExport(deltaExport),
|
||||||
sizeContainers,
|
sizeContainers,
|
||||||
timestamp,
|
timestamp,
|
||||||
@ -192,6 +194,8 @@ exports.VmBackup = class VmBackup {
|
|||||||
speed: duration !== 0 ? (size * 1e3) / 1024 / 1024 / duration : 0,
|
speed: duration !== 0 ? (size * 1e3) / 1024 / 1024 / duration : 0,
|
||||||
size,
|
size,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
await asyncMap(this._writers, writer => writer.cleanup && writer.cleanup())
|
||||||
}
|
}
|
||||||
|
|
||||||
async _copyFull() {
|
async _copyFull() {
|
||||||
|
Loading…
Reference in New Issue
Block a user