Compare commits
2 Commits
feat_retry
...
feat_remov
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
698086f4b2 | ||
|
|
039d5687c0 |
@@ -141,7 +141,6 @@ export class Task {
|
||||
}
|
||||
|
||||
#log(event, props) {
|
||||
console.log({taskId: this.#id,event})
|
||||
this.#onLog({
|
||||
...props,
|
||||
event,
|
||||
|
||||
@@ -88,89 +88,49 @@ export const VmsXapi = class VmsXapiBackupRunner extends Abstract {
|
||||
|
||||
const allSettings = this._job.settings
|
||||
const baseSettings = this._baseSettings
|
||||
|
||||
|
||||
const handleVm = vmUuid => {
|
||||
const taskStart = { name: 'backup VM', data: { type: 'VM', id: vmUuid } }
|
||||
|
||||
const task = new Task(taskStart)
|
||||
let nbRun = 0
|
||||
|
||||
console.log('runonce', vmUuid)
|
||||
// ensure all the eecution are run in the same task
|
||||
const runOnce = async ()=> {
|
||||
nbRun ++
|
||||
console.log('Will run backup ', vmUuid, nbRun)
|
||||
|
||||
return this._getRecord('VM', vmUuid).then(
|
||||
disposableVm => Disposable.use(disposableVm, async vm => {
|
||||
taskStart.data.name_label = vm.name_label
|
||||
const opts = {
|
||||
baseSettings,
|
||||
config,
|
||||
getSnapshotNameLabel,
|
||||
healthCheckSr,
|
||||
job,
|
||||
remoteAdapters,
|
||||
schedule,
|
||||
settings: { ...settings, ...allSettings[vm.uuid] },
|
||||
srs,
|
||||
throttleStream,
|
||||
vm,
|
||||
}
|
||||
let vmBackup
|
||||
if (job.mode === 'delta') {
|
||||
vmBackup = new IncrementalXapi(opts)
|
||||
} else {
|
||||
if (job.mode === 'full') {
|
||||
vmBackup = new FullXapi(opts)
|
||||
} else {
|
||||
throw new Error(`Job mode ${job.mode} not implemented`)
|
||||
}
|
||||
}
|
||||
|
||||
const res = await vmBackup.run()
|
||||
console.log(' backup run successfully ', vmUuid, res)
|
||||
return res
|
||||
}))
|
||||
}
|
||||
// ensure the same task is reused
|
||||
return task.wrapFn(runOnce)
|
||||
|
||||
|
||||
}
|
||||
|
||||
let toHandle = []
|
||||
console.log('prepare to run ')
|
||||
for(const vmUuid of vmIds){
|
||||
// prepare a collection of task to bound task to run
|
||||
toHandle.push( handleVm(vmUuid))
|
||||
}
|
||||
console.log({toHandle})
|
||||
|
||||
for(let i=0; i < 4 && toHandle.length >0; i++){
|
||||
console.log('RUN ', i)
|
||||
const currentRun = [...toHandle]
|
||||
toHandle = []
|
||||
await asyncMapSettled(currentRun, async fn=>{
|
||||
console.log('got fn ', fn)
|
||||
try{
|
||||
await fn()
|
||||
console.log('run done')
|
||||
}catch(error){
|
||||
console.log('will retry ')
|
||||
toHandle.push(fn)
|
||||
}
|
||||
})
|
||||
return this._getRecord('VM', vmUuid).then(
|
||||
disposableVm =>
|
||||
Disposable.use(disposableVm, vm => {
|
||||
taskStart.data.name_label = vm.name_label
|
||||
return runTask(taskStart, () => {
|
||||
const opts = {
|
||||
baseSettings,
|
||||
config,
|
||||
getSnapshotNameLabel,
|
||||
healthCheckSr,
|
||||
job,
|
||||
remoteAdapters,
|
||||
schedule,
|
||||
settings: { ...settings, ...allSettings[vm.uuid] },
|
||||
srs,
|
||||
throttleStream,
|
||||
vm,
|
||||
}
|
||||
let vmBackup
|
||||
if (job.mode === 'delta') {
|
||||
vmBackup = new IncrementalXapi(opts)
|
||||
} else {
|
||||
if (job.mode === 'full') {
|
||||
vmBackup = new FullXapi(opts)
|
||||
} else {
|
||||
throw new Error(`Job mode ${job.mode} not implemented`)
|
||||
}
|
||||
}
|
||||
return vmBackup.run()
|
||||
})
|
||||
}),
|
||||
error =>
|
||||
runTask(taskStart, () => {
|
||||
throw error
|
||||
})
|
||||
)
|
||||
}
|
||||
const { concurrency } = settings
|
||||
if(toHandle.length > 0){
|
||||
console.log('LAST RUN ')
|
||||
// last run will really fail this time
|
||||
await asyncMapSettled(toHandle, fn=>fn())
|
||||
|
||||
}
|
||||
|
||||
await asyncMapSettled(vmIds, concurrency === 0 ? handleVm : limitConcurrency(concurrency)(handleVm))
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ import assert from 'node:assert'
|
||||
import groupBy from 'lodash/groupBy.js'
|
||||
import ignoreErrors from 'promise-toolbox/ignoreErrors'
|
||||
import { asyncMap } from '@xen-orchestra/async-map'
|
||||
import { createLogger } from '@xen-orchestra/log'
|
||||
import { decorateMethodsWith } from '@vates/decorate-with'
|
||||
import { defer } from 'golike-defer'
|
||||
import { formatDateTime } from '@xen-orchestra/xapi'
|
||||
@@ -10,6 +11,8 @@ import { getOldEntries } from '../../_getOldEntries.mjs'
|
||||
import { Task } from '../../Task.mjs'
|
||||
import { Abstract } from './_Abstract.mjs'
|
||||
|
||||
const { info, warn } = createLogger('xo:backups:AbstractXapi')
|
||||
|
||||
export const AbstractXapi = class AbstractXapiVmBackupRunner extends Abstract {
|
||||
constructor({
|
||||
config,
|
||||
@@ -171,6 +174,20 @@ export const AbstractXapi = class AbstractXapiVmBackupRunner extends Abstract {
|
||||
}
|
||||
}
|
||||
|
||||
// this will delete current snapshot in case of failure
|
||||
// to ensure any retry will start with a clean state, especially in the case of rolling snapshots
|
||||
#removeCurrentSnapshotOnFailure() {
|
||||
if (this._mustDoSnapshot() && this._exportedVm !== undefined) {
|
||||
info('will delete snapshot on failure', { vm: this._vm, snapshot: this._exportedVm })
|
||||
assert.notStrictEqual(
|
||||
this._vm.$ref,
|
||||
this._exportedVm.$ref,
|
||||
'there should have a snapshot, but vm and snapshot have the same ref'
|
||||
)
|
||||
return this._xapi.VM_destroy(this._exportedVm.$ref)
|
||||
}
|
||||
}
|
||||
|
||||
async _fetchJobSnapshots() {
|
||||
const jobId = this._jobId
|
||||
const vmRef = this._vm.$ref
|
||||
@@ -271,11 +288,17 @@ export const AbstractXapi = class AbstractXapiVmBackupRunner extends Abstract {
|
||||
await this._exportedVm.update_blocked_operations({ pool_migrate, migrate_send })
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
try {
|
||||
await this.#removeCurrentSnapshotOnFailure()
|
||||
} catch (removeSnapshotError) {
|
||||
warn('fail removing current snapshot', { error: removeSnapshotError })
|
||||
}
|
||||
throw error
|
||||
} finally {
|
||||
if (startAfter) {
|
||||
ignoreErrors.call(vm.$callAsync('start', false, false))
|
||||
}
|
||||
|
||||
await this._fetchJobSnapshots()
|
||||
await this._removeUnusedSnapshots()
|
||||
}
|
||||
|
||||
@@ -64,11 +64,6 @@ export class FullRemoteWriter extends MixinRemoteWriter(AbstractFullWriter) {
|
||||
}
|
||||
|
||||
await Task.run({ name: 'transfer' }, async () => {
|
||||
console.log('run ')
|
||||
if(Math.random() < 0.8){
|
||||
throw new Error('NOPE')
|
||||
}
|
||||
console.log('OK ')
|
||||
await adapter.outputStream(dataFilename, stream, {
|
||||
maxStreamLength,
|
||||
streamLength,
|
||||
|
||||
@@ -22,6 +22,7 @@
|
||||
- [Plugins/audit] Don't log `tag.getAllConfigured` calls
|
||||
- [Remotes] Correctly clear error when the remote is tested with success
|
||||
- [Import/VMWare] Fix importing last snapshot (PR [#7370](https://github.com/vatesfr/xen-orchestra/pull/7370))
|
||||
- [Host/Reboot] Fix false positive warning when restarting an host after updates (PR [#7366](https://github.com/vatesfr/xen-orchestra/pull/7366))
|
||||
|
||||
### Packages to release
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import semver from 'semver'
|
||||
import { createLogger } from '@xen-orchestra/log'
|
||||
import assert from 'assert'
|
||||
import { format } from 'json-rpc-peer'
|
||||
@@ -136,13 +137,38 @@ export async function restart({
|
||||
const pool = this.getObject(host.$poolId, 'pool')
|
||||
const master = this.getObject(pool.master, 'host')
|
||||
const hostRebootRequired = host.rebootRequired
|
||||
if (hostRebootRequired && host.id !== master.id && host.version === master.version) {
|
||||
throw incorrectState({
|
||||
actual: hostRebootRequired,
|
||||
expected: false,
|
||||
object: master.id,
|
||||
property: 'rebootRequired',
|
||||
})
|
||||
|
||||
// we are currently in an host upgrade process
|
||||
if (hostRebootRequired && host.id !== master.id) {
|
||||
// this error is not ideal but it means that the pool master must be fully upgraded/rebooted before the current host can be rebooted.
|
||||
//
|
||||
// there is a single error for the 3 cases because the client must handle them the same way
|
||||
const throwError = () =>
|
||||
incorrectState({
|
||||
actual: hostRebootRequired,
|
||||
expected: false,
|
||||
object: master.id,
|
||||
property: 'rebootRequired',
|
||||
})
|
||||
|
||||
if (semver.lt(master.version, host.version)) {
|
||||
log.error(`master version (${master.version}) is older than the host version (${host.version})`, {
|
||||
masterId: master.id,
|
||||
hostId: host.id,
|
||||
})
|
||||
throwError()
|
||||
}
|
||||
|
||||
if (semver.eq(master.version, host.version)) {
|
||||
if ((await this.getXapi(host).listMissingPatches(master._xapiId)).length > 0) {
|
||||
log.error('master has missing patches', { masterId: master.id })
|
||||
throwError()
|
||||
}
|
||||
if (master.rebootRequired) {
|
||||
log.error('master needs to reboot')
|
||||
throwError()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user