fix(xen-api/getResource): softer backpressure to prevent XS75 error (#3234)

Fixes #3205

https://bugs.xenserver.org/browse/XSO-873
This commit is contained in:
Pierre Donias 2018-07-26 13:53:00 +02:00 committed by GitHub
parent 9c4bd0d0dd
commit b9f3313903
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 222 additions and 14 deletions

View File

@ -19,9 +19,11 @@
- [Backup NG] Correctly migrate report setting from legacy jobs [#3180](https://github.com/vatesfr/xen-orchestra/issues/3180) (PR [#3206](https://github.com/vatesfr/xen-orchestra/pull/3206))
- [Backup NG] remove incomplete XVA files [#3159](https://github.com/vatesfr/xen-orchestra/issues/3159) (PR [#3215](https://github.com/vatesfr/xen-orchestra/pull/3215))
- [Backup NG form] Ability to edit a schedule's state [#3223](https://github.com/vatesfr/xen-orchestra/issues/3223) (PR [#3228](https://github.com/vatesfr/xen-orchestra/pull/3228))
- Workaround to prevent XenServer 7.5 VDI\_IO\_ERROR during backup [#3205](https://github.com/vatesfr/xen-orchestra/issues/3205) (PR [#3234](https://github.com/vatesfr/xen-orchestra/pull/3234))
### Released packages
- xen-api v0.16.12
- xo-remote-parser v0.5.0
- complex-matcher v0.4.0
- xo-server-backup-reports v0.12.3

View File

@ -5,12 +5,37 @@ process.env.DEBUG = '*'
const defer = require('golike-defer').default
const pump = require('pump')
const { CancelToken, fromCallback } = require('promise-toolbox')
const { Transform, Writable } = require('stream')
const { createClient } = require('../')
const { createOutputStream, resolveRef } = require('./utils')
// based on implementation in https://github.com/ForbesLindesay/throat
function Queue () {
this._s1 = [] // stack to push to
this._s2 = [] // stack to pop from
}
Queue.prototype.push = function (value) {
this._s1.push(value)
}
Queue.prototype.pop = function () {
let s2 = this._s2
if (s2.length === 0) {
const s1 = this._s1
if (s1.length === 0) {
return
}
this._s1 = s2
s2 = this._s2 = s1.reverse()
}
return s2.pop()
}
defer(async ($defer, args) => {
// ARGS ----------------------------------------------------------------------
let raw = false
if (args[0] === '--raw') {
raw = true
@ -18,36 +43,127 @@ defer(async ($defer, args) => {
}
if (args.length < 2) {
return console.log('Usage: export-vdi [--raw] <XS URL> <VDI identifier> [<VHD file>]')
return console.log(
'Usage: export-vdi [--raw] <XS URL> <VDI identifier> [<VHD file>]'
)
}
// XAPI ----------------------------------------------------------------------
const xapi = createClient({
allowUnauthorized: true,
url: args[0],
watchEvents: false
// watchEvents: false,
})
await xapi.connect()
$defer(() => xapi.disconnect())
const { cancel, token } = CancelToken.source()
process.on('SIGINT', cancel)
process.on('SIGINT', function onSigInt () {
cancel()
process.removeListener('SIGINT', onSigInt)
})
// EXPORT --------------------------------------------------------------------
// https://xapi-project.github.io/xen-api/snapshots.html#downloading-a-disk-or-snapshot
const exportStream = await xapi.getResource(token, '/export_raw_vdi/', {
query: {
format: raw ? 'raw' : 'vhd',
vdi: await resolveRef(xapi, 'VDI', args[1])
}
vdi: await resolveRef(xapi, 'VDI', args[1]),
},
})
console.warn('Export task:', exportStream.headers['task-id'])
await fromCallback(cb => pump(
exportStream.task.then(
console.log.bind(console, 'task success'),
error => {
console.error('task failure', Date.now(), error)
},
)
// WORKAROUND ----------------------------------------------------------------
const makeXs75WorkAround = stream => {
const cache = new Queue()
let canContinue = true
let finished = false
const drain = () => {
const next = cache.pop()
if (next === undefined) {
if (finished) {
stream.end()
} else {
canContinue = true
}
return
}
const { chunk, encoding, callback, timeout } = next
const canDrain = stream.write(chunk, encoding)
if (!timeout._called) {
clearTimeout(timeout)
callback()
}
if (canDrain) {
drain()
}
}
stream.on('drain', drain)
const cacheStream = new Writable({
final (callback) {
callback()
if (canContinue) {
stream.end()
} else {
// We need to empty the queue before calling stream.end
finished = true
}
},
write (chunk, encoding, callback) {
if (canContinue) {
canContinue = stream.write(chunk, encoding)
callback()
} else {
// wait AMAP without breaking the export
cache.push({
chunk,
encoding,
callback,
timeout: setTimeout(callback, 1e2),
})
}
},
})
cacheStream.readAll = stream.readAll
return cacheStream
}
// IMPORT --------------------------------------------------------------------
let n = 0
await Promise.all([
// exportStream.task,
fromCallback(cb =>
pump(
exportStream,
createOutputStream(args[2]),
makeXs75WorkAround(new Writable({
highWaterMark: 0,
write (chunk, encoding, callback) {
n += chunk.length
if (Math.random() < 0.0001) {
console.log(n / 2 ** 30)
setTimeout(callback, Math.floor(Math.random() * 1e3))
} else {
callback()
}
}
})),
cb
))
})(process.argv.slice(2)).catch(
console.error.bind(console, 'error')
)
)
),
])
console.log('Done')
})(process.argv.slice(2)).catch(console.error.bind(console, 'error'))

View File

@ -6,6 +6,7 @@ import httpRequest from 'http-request-plus'
import { BaseError } from 'make-error'
import { EventEmitter } from 'events'
import { fibonacci } from 'iterable-backoff'
import { PassThrough, Writable } from 'stream'
import {
filter,
forEach,
@ -34,6 +35,90 @@ import autoTransport from './transports/auto'
const debug = createDebug('xen-api')
// ===================================================================
// XS 7.5 export bug workaround
function Queue () {
this._s1 = [] // stack to push to
this._s2 = [] // stack to pop from
}
Queue.prototype.push = function (value) {
this._s1.push(value)
}
Queue.prototype.pop = function () {
let s2 = this._s2
if (s2.length === 0) {
const s1 = this._s1
if (s1.length === 0) {
return
}
this._s1 = s2
s2 = this._s2 = s1.reverse()
}
return s2.pop()
}
const makeXs75WorkAround = stream => {
const cache = new Queue()
let canContinue = true
let finished = false
const drain = () => {
const next = cache.pop()
if (next === undefined) {
if (finished) {
stream.end()
} else {
canContinue = true
}
return
}
const { chunk, encoding, callback, timeout } = next
const canDrain = stream.write(chunk, encoding)
if (!timeout._called) {
clearTimeout(timeout)
callback()
}
if (canDrain) {
drain()
}
}
stream.on('drain', drain)
const cacheStream = new Writable({
final (callback) {
callback()
if (canContinue) {
stream.end()
} else {
// We need to empty the queue before calling stream.end
finished = true
}
},
write (chunk, encoding, callback) {
if (canContinue) {
canContinue = stream.write(chunk, encoding)
callback()
} else {
// wait AMAP without breaking the export
cache.push({
chunk,
encoding,
callback,
timeout: setTimeout(callback, 1e2),
})
}
},
})
cacheStream.readAll = stream.readAll
return cacheStream
}
// ===================================================================
// in seconds
@ -515,7 +600,12 @@ export class Xapi extends EventEmitter {
query,
rejectUnauthorized: !this._allowUnauthorized,
}
)
).then(exportStream => {
const stream = new PassThrough()
exportStream.pipe(makeXs75WorkAround(stream))
stream.readAll = exportStream.readAll
return stream
})
if (taskResult !== undefined) {
promise = promise.then(response => {