This commit is contained in:
Florent Beauchamp
2024-02-21 13:09:09 +00:00
parent b89195eb80
commit ca8d6b4885
3 changed files with 83 additions and 37 deletions

View File

@@ -141,6 +141,7 @@ export class Task {
}
#log(event, props) {
console.log({taskId: this.#id,event})
this.#onLog({
...props,
event,

View File

@@ -88,49 +88,89 @@ export const VmsXapi = class VmsXapiBackupRunner extends Abstract {
const allSettings = this._job.settings
const baseSettings = this._baseSettings
const handleVm = vmUuid => {
const taskStart = { name: 'backup VM', data: { type: 'VM', id: vmUuid } }
return this._getRecord('VM', vmUuid).then(
disposableVm =>
Disposable.use(disposableVm, vm => {
taskStart.data.name_label = vm.name_label
return runTask(taskStart, () => {
const opts = {
baseSettings,
config,
getSnapshotNameLabel,
healthCheckSr,
job,
remoteAdapters,
schedule,
settings: { ...settings, ...allSettings[vm.uuid] },
srs,
throttleStream,
vm,
}
let vmBackup
if (job.mode === 'delta') {
vmBackup = new IncrementalXapi(opts)
} else {
if (job.mode === 'full') {
vmBackup = new FullXapi(opts)
} else {
throw new Error(`Job mode ${job.mode} not implemented`)
}
}
return vmBackup.run()
})
}),
error =>
runTask(taskStart, () => {
throw error
})
)
const task = new Task(taskStart)
let nbRun = 0
console.log('runonce', vmUuid)
// ensure all the eecution are run in the same task
const runOnce = async ()=> {
nbRun ++
console.log('Will run backup ', vmUuid, nbRun)
return this._getRecord('VM', vmUuid).then(
disposableVm => Disposable.use(disposableVm, async vm => {
taskStart.data.name_label = vm.name_label
const opts = {
baseSettings,
config,
getSnapshotNameLabel,
healthCheckSr,
job,
remoteAdapters,
schedule,
settings: { ...settings, ...allSettings[vm.uuid] },
srs,
throttleStream,
vm,
}
let vmBackup
if (job.mode === 'delta') {
vmBackup = new IncrementalXapi(opts)
} else {
if (job.mode === 'full') {
vmBackup = new FullXapi(opts)
} else {
throw new Error(`Job mode ${job.mode} not implemented`)
}
}
const res = await vmBackup.run()
console.log(' backup run successfully ', vmUuid, res)
return res
}))
}
// ensure the same task is reused
return task.wrapFn(runOnce)
}
let toHandle = []
console.log('prepare to run ')
for(const vmUuid of vmIds){
// prepare a collection of task to bound task to run
toHandle.push( handleVm(vmUuid))
}
console.log({toHandle})
for(let i=0; i < 4 && toHandle.length >0; i++){
console.log('RUN ', i)
const currentRun = [...toHandle]
toHandle = []
await asyncMapSettled(currentRun, async fn=>{
console.log('got fn ', fn)
try{
await fn()
console.log('run done')
}catch(error){
console.log('will retry ')
toHandle.push(fn)
}
})
}
const { concurrency } = settings
await asyncMapSettled(vmIds, concurrency === 0 ? handleVm : limitConcurrency(concurrency)(handleVm))
if(toHandle.length > 0){
console.log('LAST RUN ')
// last run will really fail this time
await asyncMapSettled(toHandle, fn=>fn())
}
}
)
}

View File

@@ -64,6 +64,11 @@ export class FullRemoteWriter extends MixinRemoteWriter(AbstractFullWriter) {
}
await Task.run({ name: 'transfer' }, async () => {
console.log('run ')
if(Math.random() < 0.8){
throw new Error('NOPE')
}
console.log('OK ')
await adapter.outputStream(dataFilename, stream, {
maxStreamLength,
streamLength,